You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ji...@apache.org on 2021/05/12 01:35:02 UTC
[openwhisk] branch master updated: [New Scheduler] Implement
FunctionPullingContainerPool (#5102)
This is an automated email from the ASF dual-hosted git repository.
jiangpengcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 3802374 [New Scheduler] Implement FunctionPullingContainerPool (#5102)
3802374 is described below
commit 3802374d58d87fc6a95477929fc67269d6dcfe2c
Author: ningyougang <41...@qq.com>
AuthorDate: Wed May 12 09:34:06 2021 +0800
[New Scheduler] Implement FunctionPullingContainerPool (#5102)
* Implement FunctionPullingContainerPool
* Fix review points
---
ansible/roles/invoker/tasks/deploy.yml | 6 +-
.../org/apache/openwhisk/common/Logging.scala | 4 +
.../apache/openwhisk/core/connector/Message.scala | 162 +++
.../core/containerpool/ContainerFactory.scala | 12 +-
.../org/apache/openwhisk/core/entity/DocInfo.scala | 22 +-
.../org/apache/openwhisk/core/entity/Size.scala | 2 +-
core/invoker/src/main/resources/application.conf | 6 +-
.../core/containerpool/ContainerPool.scala | 12 +-
.../v2/FunctionPullingContainerPool.scala | 857 +++++++++++++++
.../v2/FunctionPullingContainerProxy.scala | 13 +-
.../containerpool/v2/InvokerHealthManager.scala | 5 +-
.../core/connector/test/TestConnector.scala | 2 +
.../mesos/test/MesosContainerFactoryTest.scala | 2 +-
.../containerpool/test/ContainerPoolTests.scala | 34 +-
.../containerpool/test/ContainerProxyTests.scala | 2 +-
.../test/FunctionPullingContainerPoolTests.scala | 1152 ++++++++++++++++++++
16 files changed, 2269 insertions(+), 24 deletions(-)
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index fe79439..591a07e 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -282,7 +282,11 @@
"CONFIG_whisk_invoker_https_keystorePassword": "{{ invoker.ssl.keystore.password }}"
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
- "CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
+ "CONFIG_whisk_containerPool_prewarmExpirationCheckInitDelay": "{{ container_pool_prewarm_expirationCheckInitDelay | default('10 minutes') }}"
+ "CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('10 minutes') }}"
+ "CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{ container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds') }}"
+ "CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict | default('false') | lower }}"
+ "CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{ container_pool_prewarm_max_retry_limit | default(5) }}"
- name: extend invoker dns env
set_fact:
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 885b2a2..afce8fb 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -388,6 +388,10 @@ object LoggingMarkers {
// Time that is needed to produce message in kafka
val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka, start)(MeasurementUnit.time.milliseconds)
+ def INVOKER_SHAREDPACKAGE(path: String) =
+ LogMarkerToken(invoker, "sharedPackage", counter, None, Map("path" -> path))(MeasurementUnit.none)
+ def INVOKER_CONTAINERPOOL_MEMORY(state: String) =
+ LogMarkerToken(invoker, "containerPoolMemory", counter, Some(state), Map("state" -> state))(MeasurementUnit.none)
// System overload and random invoker assignment
val MANAGED_SYSTEM_OVERLOAD =
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index bcd56e0..88f4f11 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -484,3 +484,165 @@ object StatusData extends DefaultJsonProtocol {
implicit val serdes =
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
}
+
+case class ContainerCreationMessage(override val transid: TransactionId,
+ invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ whiskActionMetaData: WhiskActionMetaData,
+ rootSchedulerIndex: SchedulerInstanceId,
+ schedulerHost: String,
+ rpcPort: Int,
+ retryCount: Int = 0,
+ creationId: CreationId = CreationId.generate())
+ extends ContainerMessage(transid) {
+
+ override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)
+ override def serialize: String = toJson.compactPrint
+}
+
+object ContainerCreationMessage extends DefaultJsonProtocol {
+ def parse(msg: String): Try[ContainerCreationMessage] = Try(serdes.read(msg.parseJson))
+
+ private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+ private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
+ private implicit val byteSizeSerdes = size.serdes
+ implicit val serdes = jsonFormat10(
+ ContainerCreationMessage.apply(
+ _: TransactionId,
+ _: String,
+ _: FullyQualifiedEntityName,
+ _: DocRevision,
+ _: WhiskActionMetaData,
+ _: SchedulerInstanceId,
+ _: String,
+ _: Int,
+ _: Int,
+ _: CreationId))
+}
+
+case class ContainerDeletionMessage(override val transid: TransactionId,
+ invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ whiskActionMetaData: WhiskActionMetaData)
+ extends ContainerMessage(transid) {
+ override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)
+ override def serialize: String = toJson.compactPrint
+}
+
+object ContainerDeletionMessage extends DefaultJsonProtocol {
+ def parse(msg: String): Try[ContainerDeletionMessage] = Try(serdes.read(msg.parseJson))
+
+ private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+ private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
+ private implicit val byteSizeSerdes = size.serdes
+ implicit val serdes = jsonFormat5(
+ ContainerDeletionMessage
+ .apply(_: TransactionId, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData))
+}
+
+abstract class ContainerMessage(private val tid: TransactionId) extends Message {
+ override val transid: TransactionId = tid
+ override def serialize: String = ContainerMessage.serdes.write(this).compactPrint
+
+ /** Serializes the message to JSON. */
+ def toJson: JsValue
+}
+
+object ContainerMessage extends DefaultJsonProtocol {
+ def parse(msg: String): Try[ContainerMessage] = Try(serdes.read(msg.parseJson))
+
+ implicit val serdes = new RootJsonFormat[ContainerMessage] {
+ override def write(m: ContainerMessage): JsValue = m.toJson
+
+ override def read(json: JsValue): ContainerMessage = {
+ val JsObject(fields) = json
+ val creation = fields.contains("creationId")
+ if (creation) {
+ json.convertTo[ContainerCreationMessage]
+ } else {
+ json.convertTo[ContainerDeletionMessage]
+ }
+ }
+ }
+}
+
+sealed trait ContainerCreationError
+object ContainerCreationError extends Enumeration {
+ case object NoAvailableInvokersError extends ContainerCreationError
+ case object NoAvailableResourceInvokersError extends ContainerCreationError
+ case object ResourceNotEnoughError extends ContainerCreationError
+ case object WhiskError extends ContainerCreationError
+ case object UnknownError extends ContainerCreationError
+ case object TimeoutError extends ContainerCreationError
+ case object ShuttingDownError extends ContainerCreationError
+ case object NonExecutableActionError extends ContainerCreationError
+ case object DBFetchError extends ContainerCreationError
+ case object BlackBoxError extends ContainerCreationError
+ case object ZeroNamespaceLimit extends ContainerCreationError
+ case object TooManyConcurrentRequests extends ContainerCreationError
+
+ val whiskErrors: Set[ContainerCreationError] =
+ Set(
+ NoAvailableInvokersError,
+ NoAvailableResourceInvokersError,
+ ResourceNotEnoughError,
+ WhiskError,
+ ShuttingDownError,
+ UnknownError,
+ TimeoutError,
+ ZeroNamespaceLimit)
+
+ def fromName(name: String) = name.toUpperCase match {
+ case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
+ case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
+ case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
+ case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
+ case "DBFETCHERROR" => DBFetchError
+ case "WHISKERROR" => WhiskError
+ case "BLACKBOXERROR" => BlackBoxError
+ case "TIMEOUTERROR" => TimeoutError
+ case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
+ case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
+ case "UNKNOWNERROR" => UnknownError
+ }
+
+ implicit val serds = new RootJsonFormat[ContainerCreationError] {
+ override def write(error: ContainerCreationError): JsValue = JsString(error.toString)
+ override def read(json: JsValue): ContainerCreationError =
+ Try {
+ val JsString(str) = json
+ ContainerCreationError.fromName(str.trim.toUpperCase)
+ } getOrElse {
+ throw deserializationError("ContainerCreationError must be a valid string")
+ }
+ }
+}
+
+case class ContainerCreationAckMessage(override val transid: TransactionId,
+ creationId: CreationId,
+ invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ actionMetaData: WhiskActionMetaData,
+ rootInvokerIndex: InvokerInstanceId,
+ schedulerHost: String,
+ rpcPort: Int,
+ retryCount: Int = 0,
+ error: Option[ContainerCreationError] = None,
+ reason: Option[String] = None)
+ extends Message {
+
+ /**
+ * Serializes message to string. Must be idempotent.
+ */
+ override def serialize: String = ContainerCreationAckMessage.serdes.write(this).compactPrint
+}
+
+object ContainerCreationAckMessage extends DefaultJsonProtocol {
+ def parse(msg: String): Try[ContainerCreationAckMessage] = Try(serdes.read(msg.parseJson))
+ private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+ private implicit val byteSizeSerdes = size.serdes
+ implicit val serdes = jsonFormat12(ContainerCreationAckMessage.apply)
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index 1603ba2..3b5a6c4 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -47,9 +47,14 @@ case class ContainerArgsConfig(network: String,
case class ContainerPoolConfig(userMemory: ByteSize,
concurrentPeekFactor: Double,
akkaClient: Boolean,
+ prewarmExpirationCheckInitDelay: FiniteDuration,
prewarmExpirationCheckInterval: FiniteDuration,
prewarmExpirationCheckIntervalVariance: Option[FiniteDuration],
- prewarmExpirationLimit: Int) {
+ prewarmExpirationLimit: Int,
+ prewarmMaxRetryLimit: Int,
+ prewarmPromotion: Boolean,
+ memorySyncInterval: FiniteDuration,
+ prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
@@ -68,6 +73,11 @@ case class ContainerPoolConfig(userMemory: ByteSize,
max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2) // The minimum allowed cpu-shares is 2
}
+case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay: FiniteDuration) {
+ require(maxConcurrent > 0, "maxConcurrent for per invoker must be > 0")
+ require(creationDelay.toSeconds > 0, "creationDelay must be > 0")
+}
+
case class RuntimesRegistryCredentials(user: String, password: String)
case class RuntimesRegistryConfig(url: String, credentials: Option[RuntimesRegistryCredentials])
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
index 77e2008..d632f12 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
@@ -17,8 +17,9 @@
package org.apache.openwhisk.core.entity
-import scala.util.Try
+import org.apache.commons.lang3.StringUtils
+import scala.util.Try
import spray.json.DefaultJsonProtocol
import spray.json.JsNull
import spray.json.JsString
@@ -26,7 +27,6 @@ import spray.json.JsValue
import spray.json.RootJsonFormat
import spray.json.deserializationError
import spray.json._
-
import org.apache.openwhisk.core.entity.ArgNormalizer.trim
/**
@@ -56,11 +56,27 @@ protected[core] class DocId(val id: String) extends AnyVal {
*
* @param rev the document revision, optional
*/
-protected[core] class DocRevision private (val rev: String) extends AnyVal {
+protected[core] class DocRevision private (val rev: String) extends AnyVal with Ordered[DocRevision] {
def asString = rev // to make explicit that this is a string conversion
def empty = rev == null
override def toString = rev
def serialize = DocRevision.serdes.write(this).compactPrint
+
+ override def compare(that: DocRevision): Int = {
+ if (this.empty && that.empty) {
+ 0
+ } else if (this.empty) {
+ -1
+ } else if (that.empty) {
+ 1
+ } else {
+ StringUtils.substringBefore(rev, "-").toInt - StringUtils.substringBefore(that.rev, "-").toInt
+ }
+ }
+
+ def ==(that: DocRevision): Boolean = {
+ this.compare(that) == 0
+ }
}
/**
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
index fd22948..ded9a6a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
@@ -163,7 +163,7 @@ object size {
implicit val pureconfigReader =
ConfigReader[ConfigValue].map(v => ByteSize(v.atKey("key").getBytes("key"), SizeUnits.BYTE))
- protected[entity] implicit val serdes = new RootJsonFormat[ByteSize] {
+ implicit val serdes = new RootJsonFormat[ByteSize] {
def write(b: ByteSize) = JsString(b.toString)
def read(value: JsValue): ByteSize = value match {
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index be68333..9aeea13 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -60,9 +60,13 @@ whisk {
user-memory: 1024 m
concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
- prewarm-expiration-check-interval: 1 minute # period to check for prewarm expiration
+ prewarm-expiration-check-init-delay: 10 minute # the init delay time for the first check
+ prewarm-expiration-check-interval: 10 minute # period to check for prewarm expiration
prewarm-expiration-check-interval-variance: 10 seconds # varies expiration across invokers to avoid many concurrent expirations
prewarm-expiration-limit: 100 # number of prewarms to expire in one expiration cycle (remaining expired will be considered for expiration in next cycle)
+ prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm containers
+ prewarm-promotion: false # if true, action can take prewarm container which has bigger memory
+ memory-sync-interval: 1 second # period to sync memory info to etcd
}
kubernetes {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 724cd59..c6358aa 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -91,7 +91,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
.nextInt(v.toSeconds.toInt))
.getOrElse(0)
.seconds
- context.system.scheduler.schedule(2.seconds, interval, self, AdjustPrewarmedContainer)
+ if (prewarmConfig.exists(!_.reactive.isEmpty)) {
+ context.system.scheduler.schedule(
+ poolConfig.prewarmExpirationCheckInitDelay,
+ interval,
+ self,
+ AdjustPrewarmedContainer)
+ }
def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
val namespaceName = r.msg.user.namespace.name.asString
@@ -590,9 +596,9 @@ object ContainerPool {
}
.sortBy(_._2.expires.getOrElse(now))
- // emit expired container counter metric with memory + kind
- MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
if (expiredPrewarmedContainer.nonEmpty) {
+ // emit expired container counter metric with memory + kind
+ MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
logging.info(
this,
s"[kind: ${kind} memory: ${memory.toString}] ${expiredPrewarmedContainer.size} expired prewarmed containers")
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
new file mode 100644
index 0000000..7260c83
--- /dev/null
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -0,0 +1,857 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector.ContainerCreationError._
+import org.apache.openwhisk.core.connector.{
+ ContainerCreationAckMessage,
+ ContainerCreationMessage,
+ ContainerDeletionMessage
+}
+import org.apache.openwhisk.core.containerpool.{
+ AdjustPrewarmedContainer,
+ BlackboxStartupError,
+ ColdStartKey,
+ ContainerPool,
+ ContainerPoolConfig,
+ ContainerRemoved,
+ PrewarmingConfig,
+ WhiskContainerStartupError
+}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.http.Messages
+
+import scala.annotation.tailrec
+import scala.collection.concurrent.TrieMap
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Random, Try}
+import scala.collection.immutable.Queue
+
+case class CreationContainer(creationMessage: ContainerCreationMessage, action: WhiskAction)
+case class DeletionContainer(deletionMessage: ContainerDeletionMessage)
+case object Remove
+case class Keep(timeout: FiniteDuration)
+case class PrewarmContainer(maxConcurrent: Int)
+
+/**
+ * A pool managing containers to run actions on.
+ *
+ * This pool fulfills the other half of the ContainerProxy contract. Only
+ * one job (either Start or Run) is sent to a child-actor at any given
+ * time. The pool then waits for a response of that container, indicating
+ * the container is done with the job. Only then will the pool send another
+ * request to that container.
+ *
+ * Upon actor creation, the pool will start to prewarm containers according
+ * to the provided prewarmConfig, iff set. Those containers will **not** be
+ * part of the poolsize calculation, which is capped by the poolSize parameter.
+ * Prewarm containers are only used, if they have matching arguments
+ * (kind, memory) and there is space in the pool.
+ *
+ * @param childFactory method to create new container proxy actor
+ * @param prewarmConfig optional settings for container prewarming
+ * @param poolConfig config for the ContainerPool
+ */
+class FunctionPullingContainerPool(
+ childFactory: ActorRefFactory => ActorRef,
+ invokerHealthService: ActorRef,
+ poolConfig: ContainerPoolConfig,
+ instance: InvokerInstanceId,
+ prewarmConfig: List[PrewarmingConfig] = List.empty,
+ sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+ implicit val logging: Logging)
+ extends Actor {
+ import ContainerPoolV2.memoryConsumptionOf
+
+ implicit val ec = context.system.dispatcher
+
+ private var busyPool = immutable.Map.empty[ActorRef, Data]
+ private var inProgressPool = immutable.Map.empty[ActorRef, Data]
+ private var warmedPool = immutable.Map.empty[ActorRef, WarmData]
+ private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
+ private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
+
+ private var shuttingDown = false
+
+ private val creationMessages = TrieMap[ActorRef, ContainerCreationMessage]()
+
+ private var preWarmScheduler: Option[Cancellable] = None
+ private var prewarmConfigQueue = Queue.empty[(CodeExec[_], ByteSize, Option[FiniteDuration])]
+ private val prewarmCreateFailedCount = new AtomicInteger(0)
+
+ val logScheduler = context.system.scheduler.schedule(0.seconds, 1.seconds) {
+ MetricEmitter.emitHistogramMetric(
+ LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("inprogress"),
+ memoryConsumptionOf(inProgressPool))
+ MetricEmitter.emitHistogramMetric(
+ LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("busy"),
+ memoryConsumptionOf(busyPool))
+ MetricEmitter.emitHistogramMetric(
+ LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("prewarmed"),
+ memoryConsumptionOf(prewarmedPool))
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("max"), poolConfig.userMemory.toMB)
+ }
+
+ // Key is ColdStartKey, value is the number of cold Start in minute
+ var coldStartCount = immutable.Map.empty[ColdStartKey, Int]
+
+ adjustPrewarmedContainer(true, false)
+
+ // check periodically, adjust prewarmed container(delete if unused for some time and create some increment containers)
+ // add some random amount to this schedule to avoid a herd of container removal + creation
+ val interval = poolConfig.prewarmExpirationCheckInterval + poolConfig.prewarmExpirationCheckIntervalVariance
+ .map(v =>
+ Random
+ .nextInt(v.toSeconds.toInt))
+ .getOrElse(0)
+ .seconds
+
+ if (prewarmConfig.exists(!_.reactive.isEmpty)) {
+ context.system.scheduler.schedule(
+ poolConfig.prewarmExpirationCheckInitDelay,
+ interval,
+ self,
+ AdjustPrewarmedContainer)
+ }
+
+ val resourceSubmitter = context.system.scheduler.schedule(0.seconds, poolConfig.memorySyncInterval) {
+ syncMemoryInfo
+ }
+
+ private def logContainerStart(c: ContainerCreationMessage, action: WhiskAction, containerState: String): Unit = {
+ val FQN = c.action
+ if (FQN.namespace.name == "whisk.system" && FQN.fullPath.segments > 2) {
+ MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_SHAREDPACKAGE(FQN.fullPath.asString))
+ }
+
+ MetricEmitter.emitCounterMetric(
+ LoggingMarkers.INVOKER_CONTAINER_START(
+ containerState,
+ c.invocationNamespace,
+ c.action.namespace.toString,
+ c.action.name.toString))
+ }
+
+ def receive: Receive = {
+ case PrewarmContainer(maxConcurrent) =>
+ if (prewarmConfigQueue.isEmpty) {
+ preWarmScheduler.map(_.cancel())
+ preWarmScheduler = None
+ } else {
+ for (_ <- 1 to maxConcurrent if !prewarmConfigQueue.isEmpty) {
+ val ((codeExec, byteSize, ttl), newQueue) = prewarmConfigQueue.dequeue
+ prewarmConfigQueue = newQueue
+ prewarmContainer(codeExec, byteSize, ttl)
+ }
+ }
+
+ case CreationContainer(create: ContainerCreationMessage, action: WhiskAction) =>
+ if (shuttingDown) {
+ val message =
+ s"creationId: ${create.creationId}, invoker is shutting down, reschedule ${action.fullyQualifiedName(false)}"
+ val ack = ContainerCreationAckMessage(
+ create.transid,
+ create.creationId,
+ create.invocationNamespace,
+ create.action,
+ create.revision,
+ create.whiskActionMetaData,
+ instance,
+ create.schedulerHost,
+ create.rpcPort,
+ create.retryCount,
+ Some(ShuttingDownError),
+ Some(message))
+ logging.warn(this, message)
+ sendAckToScheduler(create.rootSchedulerIndex, ack)
+ } else {
+ logging.info(this, s"received a container creation message: ${create.creationId}")
+ action.toExecutableWhiskAction match {
+ case Some(executable) =>
+ val createdContainer =
+ takeWarmedContainer(executable, create.invocationNamespace, create.revision)
+ .map(container => (container, "warmed"))
+ .orElse {
+ takeContainer(executable)
+ }
+ handleChosenContainer(create, executable, createdContainer)
+ case None =>
+ val message =
+ s"creationId: ${create.creationId}, non-executable action reached the container pool ${action.fullyQualifiedName(false)}"
+ logging.error(this, message)
+ val ack = ContainerCreationAckMessage(
+ create.transid,
+ create.creationId,
+ create.invocationNamespace,
+ create.action,
+ create.revision,
+ create.whiskActionMetaData,
+ instance,
+ create.schedulerHost,
+ create.rpcPort,
+ create.retryCount,
+ Some(NonExecutableActionError),
+ Some(message))
+ sendAckToScheduler(create.rootSchedulerIndex, ack)
+ }
+ }
+
+ case DeletionContainer(deletionMessage: ContainerDeletionMessage) =>
+ val oldRevision = deletionMessage.revision
+ val invocationNamespace = deletionMessage.invocationNamespace
+ val fqn = deletionMessage.action.copy(version = None)
+
+ warmedPool.foreach(warmed => {
+ val proxy = warmed._1
+ val data = warmed._2
+
+ if (data.invocationNamespace == invocationNamespace
+ && data.action.fullyQualifiedName(withVersion = false) == fqn.copy(version = None)
+ && data.revision <= oldRevision) {
+ proxy ! GracefulShutdown
+ }
+ })
+
+ busyPool.foreach(f = busy => {
+ val proxy = busy._1
+ busy._2 match {
+ case warmData: WarmData
+ if warmData.invocationNamespace == invocationNamespace
+ && warmData.action.fullyQualifiedName(withVersion = false) == fqn.copy(version = None)
+ && warmData.revision <= oldRevision =>
+ proxy ! GracefulShutdown
+ case initializedData: InitializedData
+ if initializedData.invocationNamespace == invocationNamespace
+ && initializedData.action.fullyQualifiedName(withVersion = false) == fqn.copy(version = None) =>
+ proxy ! GracefulShutdown
+ case _ => // Other actions are ignored.
+ }
+ })
+
+ case ReadyToWork(data) =>
+ prewarmStartingPool = prewarmStartingPool - sender()
+ prewarmedPool = prewarmedPool + (sender() -> data)
+ // after create prewarm successfully, reset the value to 0
+ if (prewarmCreateFailedCount.get() > 0) {
+ prewarmCreateFailedCount.set(0)
+ }
+
+ // Container is initialized
+ case Initialized(data) =>
+ busyPool = busyPool + (sender() -> data)
+ inProgressPool = inProgressPool - sender()
+ // container init completed, send creationAck(success) to scheduler
+ creationMessages.remove(sender()).foreach { msg =>
+ val ack = ContainerCreationAckMessage(
+ msg.transid,
+ msg.creationId,
+ msg.invocationNamespace,
+ msg.action,
+ msg.revision,
+ msg.whiskActionMetaData,
+ instance,
+ msg.schedulerHost,
+ msg.rpcPort,
+ msg.retryCount)
+ sendAckToScheduler(msg.rootSchedulerIndex, ack)
+ }
+
+ case Resumed(data) =>
+ busyPool = busyPool + (sender() -> data)
+ inProgressPool = inProgressPool - sender()
+ // container init completed, send creationAck(success) to scheduler
+ creationMessages.remove(sender()).foreach { msg =>
+ val ack = ContainerCreationAckMessage(
+ msg.transid,
+ msg.creationId,
+ msg.invocationNamespace,
+ msg.action,
+ msg.revision,
+ msg.whiskActionMetaData,
+ instance,
+ msg.schedulerHost,
+ msg.rpcPort,
+ msg.retryCount)
+ sendAckToScheduler(msg.rootSchedulerIndex, ack)
+ }
+
+ // if warmed containers is failed to resume, we should try to use other container or create a new one
+ case ResumeFailed(data) =>
+ inProgressPool = inProgressPool - sender()
+ creationMessages.remove(sender()).foreach { msg =>
+ val container = takeWarmedContainer(data.action, data.invocationNamespace, data.revision)
+ .map(container => (container, "warmed"))
+ .orElse {
+ takeContainer(data.action)
+ }
+ handleChosenContainer(msg, data.action, container)
+ }
+
+ case ContainerCreationFailed(t) =>
+ val (error, message) = t match {
+ case WhiskContainerStartupError(msg) => (WhiskError, msg)
+ case BlackboxStartupError(msg) => (BlackBoxError, msg)
+ case _ => (WhiskError, Messages.resourceProvisionError)
+ }
+ creationMessages.remove(sender()).foreach { msg =>
+ val ack = ContainerCreationAckMessage(
+ msg.transid,
+ msg.creationId,
+ msg.invocationNamespace,
+ msg.action,
+ msg.revision,
+ msg.whiskActionMetaData,
+ instance,
+ msg.schedulerHost,
+ msg.rpcPort,
+ msg.retryCount,
+ Some(error),
+ Some(message))
+ sendAckToScheduler(msg.rootSchedulerIndex, ack)
+ }
+
+ case ContainerIsPaused(data) =>
+ warmedPool = warmedPool + (sender() -> data)
+ busyPool = busyPool - sender() // remove container from busy pool
+
+ // Container got removed
+ case ContainerRemoved(replacePrewarm) =>
+ inProgressPool.get(sender()).foreach { _ =>
+ inProgressPool = inProgressPool - sender()
+ }
+
+ warmedPool.get(sender()).foreach { _ =>
+ warmedPool = warmedPool - sender()
+ }
+
+ // container was busy (busy indicates at full capacity), so there is capacity to accept another job request
+ busyPool.get(sender()).foreach { _ =>
+ busyPool = busyPool - sender()
+ }
+
+ //in case this was a prewarm
+ prewarmedPool.get(sender()).foreach { data =>
+ prewarmedPool = prewarmedPool - sender()
+ logging.info(
+ this,
+ s"${if (replacePrewarm) "failed" else "expired"} prewarm [kind: ${data.kind}, memory: ${data.memoryLimit.toString}] removed")
+ }
+
+ //in case this was a starting prewarm
+ prewarmStartingPool.get(sender()).foreach { data =>
+ logging.info(this, s"failed starting prewarm [kind: ${data._1}, memory: ${data._2.toString}] removed")
+ prewarmStartingPool = prewarmStartingPool - sender()
+ prewarmCreateFailedCount.incrementAndGet()
+ }
+
+ //backfill prewarms on every ContainerRemoved, just in case
+ if (replacePrewarm) {
+ adjustPrewarmedContainer(false, false) //in case a prewarm is removed due to health failure or crash
+ }
+
+ // there maybe a chance that container create failed or init grpc client failed,
+ // send creationAck(reschedule) to scheduler
+ creationMessages.remove(sender()).foreach { msg =>
+ val ack = ContainerCreationAckMessage(
+ msg.transid,
+ msg.creationId,
+ msg.invocationNamespace,
+ msg.action,
+ msg.revision,
+ msg.whiskActionMetaData,
+ instance,
+ msg.schedulerHost,
+ msg.rpcPort,
+ msg.retryCount,
+ Some(UnknownError),
+ Some("ContainerProxy init failed."))
+ sendAckToScheduler(msg.rootSchedulerIndex, ack)
+ }
+
+ case GracefulShutdown =>
+ shuttingDown = true
+ waitForPoolToClear()
+
+ case Enable =>
+ shuttingDown = false
+
+ case AdjustPrewarmedContainer =>
+ // Reset the prewarmCreateCount value when do expiration check and backfill prewarm if possible
+ prewarmCreateFailedCount.set(0)
+ adjustPrewarmedContainer(false, true)
+ }
+
+ /** Install prewarm containers up to the configured requirements for each kind/memory combination or specified kind/memory */
+ private def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
+ if (!shuttingDown) {
+ if (scheduled) {
+ //on scheduled time, remove expired prewarms
+ ContainerPoolV2.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
+ prewarmedPool = prewarmedPool - p
+ p ! Remove
+ }
+ //on scheduled time, emit cold start counter metric with memory + kind
+ coldStartCount foreach { coldStart =>
+ val coldStartKey = coldStart._1
+ MetricEmitter.emitCounterMetric(
+ LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString, coldStartKey.kind))
+ }
+ }
+
+ ContainerPoolV2
+ .increasePrewarms(
+ init,
+ scheduled,
+ coldStartCount,
+ prewarmConfig,
+ prewarmedPool,
+ prewarmStartingPool,
+ prewarmConfigQueue)
+ .foreach { c =>
+ val config = c._1
+ val currentCount = c._2._1
+ val desiredCount = c._2._2
+ if (prewarmCreateFailedCount.get() > poolConfig.prewarmMaxRetryLimit) {
+ logging.warn(
+ this,
+ s"[kind: ${config.exec.kind}, memory: ${config.memoryLimit.toString}] prewarm create failed count exceeds max retry limit: ${poolConfig.prewarmMaxRetryLimit}, currentCount: ${currentCount}, desiredCount: ${desiredCount}")
+ } else {
+ if (currentCount < desiredCount) {
+ (currentCount until desiredCount).foreach { _ =>
+ poolConfig.prewarmContainerCreationConfig match {
+ case Some(_) =>
+ prewarmConfigQueue =
+ prewarmConfigQueue.enqueue((config.exec, config.memoryLimit, config.reactive.map(_.ttl)))
+ case None =>
+ prewarmContainer(config.exec, config.memoryLimit, config.reactive.map(_.ttl))
+ }
+ }
+ }
+ }
+ }
+
+ // run queue consumer
+ poolConfig.prewarmContainerCreationConfig.foreach(config => {
+ logging.info(
+ this,
+ s"prewarm container creation is starting with creation delay configuration [maxConcurrent: ${config.maxConcurrent}, creationDelay: ${config.creationDelay.toMillis} millisecond]")
+ if (preWarmScheduler.isEmpty) {
+ preWarmScheduler = Some(
+ context.system.scheduler
+ .schedule(0.seconds, config.creationDelay, self, PrewarmContainer(config.maxConcurrent)))
+ }
+ })
+
+ if (scheduled) {
+ // lastly, clear coldStartCounts each time scheduled event is processed to reset counts
+ coldStartCount = immutable.Map.empty[ColdStartKey, Int]
+ }
+ }
+ }
+
+ private def syncMemoryInfo: Unit = {
+ val busyMemory = memoryConsumptionOf(busyPool)
+ val inProgressMemory = memoryConsumptionOf(inProgressPool)
+ invokerHealthService ! MemoryInfo(
+ poolConfig.userMemory.toMB - busyMemory - inProgressMemory,
+ busyMemory,
+ inProgressMemory)
+ }
+
+ /** Creates a new container and updates state accordingly. */
+ private def createContainer(memoryLimit: ByteSize): (ActorRef, Data) = {
+ val ref = childFactory(context)
+ val data = MemoryData(memoryLimit)
+ ref -> data
+ }
+
+ /** Creates a new prewarmed container */
+ private def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize, ttl: Option[FiniteDuration]): Unit = {
+ val newContainer = childFactory(context)
+ prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind, memoryLimit))
+ newContainer ! Start(exec, memoryLimit, ttl)
+ }
+
+ /** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */
+ def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = {
+ prewarmConfig
+ .filter { config =>
+ kind == config.exec.kind && memoryLimit == config.memoryLimit
+ }
+ .foreach { _ =>
+ val coldStartKey = ColdStartKey(kind, memoryLimit)
+ coldStartCount.get(coldStartKey) match {
+ case Some(value) => coldStartCount = coldStartCount + (coldStartKey -> (value + 1))
+ case None => coldStartCount = coldStartCount + (coldStartKey -> 1)
+ }
+ }
+ }
+
+ /**
+ * Takes a warmed container out of the warmed pool
+ * iff a container with a matching revision is found
+ *
+ * @param action the action.
+ * @param invocationNamespace the invocation namespace for shared package.
+ * @param revision the DocRevision.
+ * @return the container iff found
+ */
+ private def takeWarmedContainer(action: ExecutableWhiskAction,
+ invocationNamespace: String,
+ revision: DocRevision): Option[(ActorRef, Data)] = {
+ warmedPool
+ .find {
+ case (_, WarmData(_, `invocationNamespace`, `action`, `revision`, _, _)) => true
+ case _ => false
+ }
+ .map {
+ case (ref, data) =>
+ warmedPool = warmedPool - ref
+ logging.info(this, s"Choose warmed container ${data.container.containerId}")
+ (ref, data)
+ }
+ }
+
+ /**
+ * Takes a prewarm container out of the prewarmed pool
+ * iff a container with a matching kind and suitable memory is found.
+ *
+ * @param action the action that holds the kind and the required memory.
+ * @param maximumMemory the maximum memory container can have
+ * @return the container iff found
+ */
+ private def takePrewarmContainer(action: ExecutableWhiskAction, maximumMemory: ByteSize): Option[(ActorRef, Data)] = {
+ val kind = action.exec.kind
+ val memory = action.limits.memory.megabytes.MB
+
+ // find a container with same kind and smallest memory
+ prewarmedPool.filter {
+ case (_, PreWarmData(_, `kind`, preMemory, _)) if preMemory >= memory && preMemory <= maximumMemory => true
+ case _ => false
+ }.toList match {
+ case Nil =>
+ None
+ case res =>
+ val (ref, data) = res.minBy(_._2.memoryLimit)
+ prewarmedPool = prewarmedPool - ref
+
+ //get the appropriate ttl from prewarm configs
+ val ttl =
+ prewarmConfig.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind).flatMap(_.reactive.map(_.ttl))
+ prewarmContainer(action.exec, data.memoryLimit, ttl)
+ Some(ref, data)
+ }
+ }
+
+ /** Removes a container and updates state accordingly. */
+ def removeContainer(toDelete: ActorRef) = {
+ toDelete ! Remove
+ warmedPool = warmedPool - toDelete
+ }
+
+ /**
+ * Calculate if there is enough free memory within a given pool.
+ *
+ * @param pool The pool, that has to be checked, if there is enough free memory.
+ * @param memory The amount of memory to check.
+ * @return true, if there is enough space for the given amount of memory.
+ */
+ private def hasPoolSpaceFor[A](pool: Map[A, Data], memory: ByteSize): Boolean = {
+ memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
+ }
+
+ /**
+ * Make all busyPool's memoryQueue actor shutdown gracefully
+ */
+ private def waitForPoolToClear(): Unit = {
+ busyPool.keys.foreach(_ ! GracefulShutdown)
+ warmedPool.keys.foreach(_ ! GracefulShutdown)
+ if (inProgressPool.nonEmpty) {
+ context.system.scheduler.scheduleOnce(5.seconds) {
+ waitForPoolToClear()
+ }
+ }
+ }
+
+ /**
+ * take a prewarmed container or create a new one
+ *
+ * @param executable The executable whisk action
+ * @return
+ */
+ private def takeContainer(executable: ExecutableWhiskAction) = {
+ val freeMemory = poolConfig.userMemory.toMB - memoryConsumptionOf(busyPool ++ warmedPool ++ inProgressPool)
+ val deletableMemory = memoryConsumptionOf(warmedPool)
+ val requiredMemory = executable.limits.memory.megabytes
+ if (requiredMemory > freeMemory + deletableMemory) {
+ None
+ } else {
+ // try to take a preWarmed container whose memory doesn't exceed the max `usable` memory
+ takePrewarmContainer(
+ executable,
+ if (poolConfig.prewarmPromotion) (freeMemory + deletableMemory).MB else requiredMemory.MB) match {
+ // there is a suitable preWarmed container but not enough free memory for it, delete some warmed container first
+ case Some(container) if container._2.memoryLimit > freeMemory.MB =>
+ ContainerPoolV2
+ .remove(warmedPool, container._2.memoryLimit - freeMemory.MB)
+ .map(removeContainer)
+ .headOption
+ .map { _ =>
+ (container, "prewarmed")
+ }
+ // there is a suitable preWarmed container and enough free memory for it
+ case Some(container) =>
+ Some((container, "prewarmed"))
+ // there is no suitable preWarmed container and not enough free memory for the action
+ case None if executable.limits.memory.megabytes > freeMemory =>
+ ContainerPoolV2
+ .remove(warmedPool, executable.limits.memory.megabytes.MB - freeMemory.MB)
+ .map(removeContainer)
+ .headOption
+ .map { _ =>
+ incrementColdStartCount(executable.exec.kind, executable.limits.memory.megabytes.MB)
+ (createContainer(executable.limits.memory.megabytes.MB), "cold")
+ }
+ // there is no suitable preWarmed container and enough free memory
+ case None =>
+ incrementColdStartCount(executable.exec.kind, executable.limits.memory.megabytes.MB)
+ Some(createContainer(executable.limits.memory.megabytes.MB), "cold")
+
+ // this should not happen, but just for safety
+ case _ =>
+ None
+ }
+ }
+ }
+
+ private def handleChosenContainer(create: ContainerCreationMessage,
+ executable: ExecutableWhiskAction,
+ container: Option[((ActorRef, Data), String)]) = {
+ container match {
+ case Some(((proxy, data), containerState)) =>
+ // record creationMessage so when container created failed, we can send failed message to scheduler
+ creationMessages.getOrElseUpdate(proxy, create)
+ proxy ! Initialize(create.invocationNamespace, executable, create.schedulerHost, create.rpcPort, create.transid)
+ inProgressPool = inProgressPool + (proxy -> data)
+ logContainerStart(create, executable.toWhiskAction, containerState)
+
+ case None =>
+ val message =
+ s"creationId: ${create.creationId}, invoker[$instance] doesn't have enough resource for container: ${create.action}"
+ logging.info(this, message)
+ syncMemoryInfo
+ val ack = ContainerCreationAckMessage(
+ create.transid,
+ create.creationId,
+ create.invocationNamespace,
+ create.action,
+ create.revision,
+ create.whiskActionMetaData,
+ instance,
+ create.schedulerHost,
+ create.rpcPort,
+ create.retryCount,
+ Some(ResourceNotEnoughError),
+ Some(message))
+ sendAckToScheduler(create.rootSchedulerIndex, ack)
+ }
+ }
+}
+
+object ContainerPoolV2 {
+
+ /**
+ * Calculate the memory of a given pool.
+ *
+ * @param pool The pool with the containers.
+ * @return The memory consumption of all containers in the pool in Megabytes.
+ */
+ protected[containerpool] def memoryConsumptionOf[A](pool: Map[A, Data]): Long = {
+ pool.map(_._2.memoryLimit.toMB).sum
+ }
+
+ /**
+ * Finds the oldest previously used container to remove to make space for the job passed to run.
+ * Depending on the space that has to be allocated, several containers might be removed.
+ *
+ * NOTE: This method is never called to remove an action that is in the pool already,
+ * since this would be picked up earlier in the scheduler and the container reused.
+ *
+ * @param pool a map of all free containers in the pool
+ * @param memory the amount of memory that has to be freed up
+ * @return a list of containers to be removed iff found
+ */
+ @tailrec
+ protected[containerpool] def remove[A](pool: Map[A, WarmData],
+ memory: ByteSize,
+ toRemove: List[A] = List.empty): List[A] = {
+ if (memory > 0.B && pool.nonEmpty && memoryConsumptionOf(pool) >= memory.toMB) {
+ // Remove the oldest container if:
+ // - there is more memory required
+ // - there are still containers that can be removed
+ // - there are enough free containers that can be removed
+ val (ref, data) = pool.minBy(_._2.lastUsed)
+ // Catch exception if remaining memory will be negative
+ val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B)
+ remove(pool - ref, remainingMemory, toRemove ++ List(ref))
+ } else {
+ // If this is the first call: All containers are in use currently, or there is more memory needed than
+ // containers can be removed.
+ // Or, if this is one of the recursions: Enough containers are found to get the memory, that is
+ // necessary. -> Abort recursion
+ toRemove
+ }
+ }
+
+ /**
+ * Find the expired actor in prewarmedPool
+ *
+ * @param poolConfig
+ * @param prewarmConfig
+ * @param prewarmedPool
+ * @param logging
+ * @return a list of expired actor
+ */
+ def removeExpired[A](poolConfig: ContainerPoolConfig,
+ prewarmConfig: List[PrewarmingConfig],
+ prewarmedPool: Map[A, PreWarmData])(implicit logging: Logging): List[A] = {
+ val now = Deadline.now
+ val expireds = prewarmConfig
+ .flatMap { config =>
+ val kind = config.exec.kind
+ val memory = config.memoryLimit
+ config.reactive
+ .map { c =>
+ val expiredPrewarmedContainer = prewarmedPool.toSeq
+ .filter { warmInfo =>
+ warmInfo match {
+ case (_, p @ PreWarmData(_, `kind`, `memory`, _)) if p.isExpired() => true
+ case _ => false
+ }
+ }
+ .sortBy(_._2.expires.getOrElse(now))
+
+ // emit expired container counter metric with memory + kind
+ MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
+ if (expiredPrewarmedContainer.nonEmpty) {
+ logging.info(
+ this,
+ s"[kind: ${kind} memory: ${memory.toString}] ${expiredPrewarmedContainer.size} expired prewarmed containers")
+ }
+ expiredPrewarmedContainer.map(e => (e._1, e._2.expires.getOrElse(now)))
+ }
+ .getOrElse(List.empty)
+ }
+ .sortBy(_._2) //need to sort these so that if the results are limited, we take the oldest
+ .map(_._1)
+ if (expireds.nonEmpty) {
+ logging.info(this, s"removing up to ${poolConfig.prewarmExpirationLimit} of ${expireds.size} expired containers")
+ expireds.take(poolConfig.prewarmExpirationLimit).foreach { e =>
+ prewarmedPool.get(e).map { d =>
+ logging.info(this, s"removing expired prewarm of kind ${d.kind} with container ${d.container} ")
+ }
+ }
+ }
+ expireds.take(poolConfig.prewarmExpirationLimit)
+ }
+
+ /**
+ * Find the increased number for the prewarmed kind
+ *
+ * @param init
+ * @param scheduled
+ * @param coldStartCount
+ * @param prewarmConfig
+ * @param prewarmedPool
+ * @param prewarmStartingPool
+ * @param logging
+ * @return the current number and increased number for the kind in the Map
+ */
+ def increasePrewarms(init: Boolean,
+ scheduled: Boolean,
+ coldStartCount: Map[ColdStartKey, Int],
+ prewarmConfig: List[PrewarmingConfig],
+ prewarmedPool: Map[ActorRef, PreWarmData],
+ prewarmStartingPool: Map[ActorRef, (String, ByteSize)],
+ prewarmQueue: Queue[(CodeExec[_], ByteSize, Option[FiniteDuration])])(
+ implicit logging: Logging): Map[PrewarmingConfig, (Int, Int)] = {
+ prewarmConfig.map { config =>
+ val kind = config.exec.kind
+ val memory = config.memoryLimit
+
+ val runningCount = prewarmedPool.count {
+ // done starting (include expired, since they may not have been removed yet)
+ case (_, p @ PreWarmData(_, `kind`, `memory`, _)) => true
+ // started but not finished starting (or expired)
+ case _ => false
+ }
+ val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && p._2._2 == memory)
+ val queuingCount = prewarmQueue.count(p => p._1.kind == kind && p._2 == memory)
+ val currentCount = runningCount + startingCount + queuingCount
+
+ // determine how many are needed
+ val desiredCount: Int =
+ if (init) config.initialCount
+ else {
+ if (scheduled) {
+ // scheduled/reactive config backfill
+ config.reactive
+ .map(c => ContainerPool.getReactiveCold(coldStartCount, c, kind, memory).getOrElse(c.minCount)) //reactive -> desired is either cold start driven, or minCount
+ .getOrElse(config.initialCount) //not reactive -> desired is always initial count
+ } else {
+ // normal backfill after removal - make sure at least minCount or initialCount is started
+ config.reactive.map(_.minCount).getOrElse(config.initialCount)
+ }
+ }
+
+ if (currentCount < desiredCount) {
+ logging.info(
+ this,
+ s"found ${currentCount} started and ${startingCount} starting and ${queuingCount} queuing; ${if (init) "initing"
+ else "backfilling"} ${desiredCount - currentCount} pre-warms to desired count: ${desiredCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
+ TransactionId.invokerWarmup)
+ }
+ (config, (currentCount, desiredCount))
+ }.toMap
+ }
+
+ def props(factory: ActorRefFactory => ActorRef,
+ invokerHealthService: ActorRef,
+ poolConfig: ContainerPoolConfig,
+ instance: InvokerInstanceId,
+ prewarmConfig: List[PrewarmingConfig] = List.empty,
+ sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+ implicit logging: Logging): Props = {
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService,
+ poolConfig,
+ instance,
+ prewarmConfig,
+ sendAckToScheduler))
+ }
+}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index 2a33a0d..6f943d8 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -25,13 +25,15 @@ import org.apache.openwhisk.core.containerpool.Container
import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
import org.apache.openwhisk.core.entity.size._
+import scala.concurrent.duration.{Deadline, FiniteDuration}
+
// Events received by the actor
case class Initialize(invocationNamespace: String,
action: ExecutableWhiskAction,
schedulerHost: String,
rpcPort: Int,
transId: TransactionId)
-case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize, ttl: Option[FiniteDuration] = None)
// Event sent by the actor
case class ContainerCreationFailed(throwable: Throwable)
@@ -40,12 +42,11 @@ case class ClientCreationFailed(throwable: Throwable,
container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction)
-case class ReadyToWork(data: Data)
+case class ReadyToWork(data: PreWarmData)
case class Initialized(data: InitializedData)
case class Resumed(data: WarmData)
case class ResumeFailed(data: WarmData)
case class RecreateClient(action: ExecutableWhiskAction)
-case object ContainerRemoved // when container is destroyed
// States
sealed trait ProxyState
@@ -72,9 +73,13 @@ case class MemoryData(override val memoryLimit: ByteSize) extends Data(memoryLim
override def getContainer = None
}
trait WithClient { val clientProxy: ActorRef }
-case class PreWarmData(container: Container, kind: String, override val memoryLimit: ByteSize)
+case class PreWarmData(container: Container,
+ kind: String,
+ override val memoryLimit: ByteSize,
+ expires: Option[Deadline] = None)
extends Data(memoryLimit) {
override def getContainer = Some(container)
+ def isExpired(): Boolean = expires.exists(_.isOverdue())
}
case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
index 8372fc1..71ab9a0 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -22,6 +22,7 @@ import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, St
import akka.util.Timeout
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool.ContainerRemoved
import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
import org.apache.openwhisk.core.entitlement.Privilege
import org.apache.openwhisk.core.entity.size._
@@ -66,7 +67,7 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,
}
when(Unhealthy) {
- case Event(ContainerRemoved, _) =>
+ case Event(ContainerRemoved(_), _) =>
healthActionProxy = None
startTestAction(self)
stay
@@ -90,7 +91,7 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,
// Initialized messages sent by ContainerProxy for HealthManger
stay()
- case Event(ContainerRemoved, _) =>
+ case Event(ContainerRemoved(_), _) =>
// Drop messages sent by ContainerProxy for HealthManger
healthActionProxy = None
stay()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
index 451faff..1e0950f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
@@ -72,6 +72,8 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
producer.close()
}
+ def getProducer(): MessageProducer = producer
+
private val producer = new MessageProducer {
def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata] = {
queue.synchronized {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index e6cd99f..4496742 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -84,7 +84,7 @@ class MesosContainerFactoryTest
}
// 80 slots, each 265MB
- val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, 1.minute, None, 100)
+ val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.seconds)
val actionMemory = 265.MB
val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index bc10350..eff4b92 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -134,7 +134,7 @@ class ContainerPoolTests
}
def poolConfig(userMemory: ByteSize) =
- ContainerPoolConfig(userMemory, 0.5, false, 1.minute, None, 100)
+ ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
behavior of "ContainerPool"
@@ -805,9 +805,20 @@ class ContainerPoolTests
val feed = TestProbe()
stream.reset()
+ val prewarmExpirationCheckInitDelay = FiniteDuration(2, TimeUnit.SECONDS)
val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
val poolConfig =
- ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
+ ContainerPoolConfig(
+ MemoryLimit.STD_MEMORY * 4,
+ 0.5,
+ false,
+ prewarmExpirationCheckInitDelay,
+ prewarmExpirationCheckIntervel,
+ None,
+ 100,
+ 3,
+ false,
+ 1.second)
val initialCount = 2
val pool =
system.actorOf(
@@ -840,9 +851,20 @@ class ContainerPoolTests
val feed = TestProbe()
stream.reset()
+ val prewarmExpirationCheckInitDelay = 2.seconds
val prewarmExpirationCheckIntervel = 2.seconds
val poolConfig =
- ContainerPoolConfig(MemoryLimit.STD_MEMORY * 12, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
+ ContainerPoolConfig(
+ MemoryLimit.STD_MEMORY * 12,
+ 0.5,
+ false,
+ prewarmExpirationCheckInitDelay,
+ prewarmExpirationCheckIntervel,
+ None,
+ 100,
+ 3,
+ false,
+ 1.second)
val minCount = 0
val initialCount = 2
val maxCount = 4
@@ -1215,7 +1237,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
}
it should "remove expired in order of expiration" in {
- val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 1)
+ val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
//use a second kind so that we know sorting is not isolated to the expired of each kind
val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None)
@@ -1239,7 +1261,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
it should "remove only the prewarmExpirationLimit of expired prewarms" in {
//limit prewarm removal to 2
- val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 2)
+ val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val prewarmConfig =
@@ -1265,7 +1287,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
it should "remove only the expired prewarms regardless of minCount" in {
//limit prewarm removal to 100
- val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 100)
+ val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
//minCount is 2 - should leave at least 2 prewarms when removing expired
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index dc8a33f..87aeaf1 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -276,7 +276,7 @@ class ContainerProxyTests
(transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) =>
Future.successful(())
}
- val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 1.minute, None, 100)
+ val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
new file mode 100644
index 0000000..e6ed5ed
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -0,0 +1,1152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2.test
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.stream.ActorMaterializer
+import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
+import common.StreamLogging
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.{Enable, GracefulShutdown, TransactionId}
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.connector.ContainerCreationError._
+import org.apache.openwhisk.core.connector.test.TestConnector
+import org.apache.openwhisk.core.connector.{
+ ContainerCreationAckMessage,
+ ContainerCreationError,
+ ContainerCreationMessage,
+ MessageProducer
+}
+import org.apache.openwhisk.core.containerpool.docker.DockerContainer
+import org.apache.openwhisk.core.containerpool.v2._
+import org.apache.openwhisk.core.containerpool.{
+ Container,
+ ContainerAddress,
+ ContainerPoolConfig,
+ ContainerRemoved,
+ PrewarmContainerCreationConfig,
+ PrewarmingConfig
+}
+import org.apache.openwhisk.core.database.test.DbUtils
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, ReactivePrewarmingConfig, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.utils.{retry => utilRetry}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+import org.scalatest.concurrent.Eventually
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/**
+ * Behavior tests for the ContainerPool
+ *
+ * These tests test the runtime behavior of a ContainerPool actor.
+ */
+@RunWith(classOf[JUnitRunner])
+class FunctionPullingContainerPoolTests
+ extends TestKit(ActorSystem("FunctionPullingContainerPool"))
+ with ImplicitSender
+ with FlatSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with MockFactory
+ with StreamLogging
+ with Eventually
+ with DbUtils {
+
+ override def afterAll = {
+ TestKit.shutdownActorSystem(system)
+ super.afterAll()
+ }
+ override def afterEach = {
+ cleanup()
+ super.afterEach()
+ }
+
+ private val config = new WhiskConfig(ExecManifest.requiredProperties)
+ ExecManifest.initialize(config) should be a 'success
+
+ val timeout = 5.seconds
+
+ private implicit val mt = ActorMaterializer()
+ private implicit val transId = TransactionId.testing
+ private implicit val creationId = CreationId.generate()
+
+ // Common entities to pass to the tests. We don't really care what's inside
+ // those for the behavior testing here, as none of the contents will really
+ // reach a container anyway. We merely assert that passing and extraction of
+ // the values is done properly.
+ private val actionKind = "nodejs:8"
+ private val exec = CodeExecAsString(RuntimeManifest(actionKind, ImageName("testImage")), "testCode", None)
+ private val memoryLimit = MemoryLimit.STD_MEMORY.toMB.MB
+ private val whiskAction = WhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec)
+ private val invocationNamespace = EntityName("invocationSpace")
+ private val schedulerHost = "127.17.0.1"
+ private val rpcPort = 13001
+ private val isBlockboxInvocation = false
+ private val bigWhiskAction = WhiskAction(
+ EntityPath("actionSpace"),
+ EntityName("bigActionName"),
+ exec,
+ limits = ActionLimits(memory = MemoryLimit(memoryLimit * 2)))
+ private val execMetadata = CodeExecMetaDataAsString(exec.manifest, entryPoint = exec.entryPoint)
+ private val actionMetaData =
+ WhiskActionMetaData(
+ whiskAction.namespace,
+ whiskAction.name,
+ execMetadata,
+ whiskAction.parameters,
+ whiskAction.limits,
+ whiskAction.version,
+ whiskAction.publish,
+ whiskAction.annotations)
+ private val bigActionMetaData =
+ WhiskActionMetaData(
+ bigWhiskAction.namespace,
+ bigWhiskAction.name,
+ execMetadata,
+ bigWhiskAction.parameters,
+ bigWhiskAction.limits,
+ bigWhiskAction.version,
+ bigWhiskAction.publish,
+ bigWhiskAction.annotations)
+ private val invokerHealthService = TestProbe()
+ private val schedulerInstanceId = SchedulerInstanceId("0")
+ private val producer = stub[MessageProducer]
+ private val prewarmedData = PreWarmData(mock[MockableV2Container], actionKind, memoryLimit)
+ private val initializedData =
+ InitializedData(
+ mock[MockableV2Container],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ TestProbe().ref)
+
+ private val entityStore = WhiskEntityStore.datastore()
+ private val invokerInstance = InvokerInstanceId(0, userMemory = 0 B)
+ private val creationMessage =
+ ContainerCreationMessage(
+ transId,
+ invocationNamespace.asString,
+ whiskAction.fullyQualifiedName(true),
+ DocRevision.empty,
+ actionMetaData,
+ schedulerInstanceId,
+ schedulerHost,
+ rpcPort,
+ creationId = creationId)
+ private val creationMessageLarge =
+ ContainerCreationMessage(
+ transId,
+ invocationNamespace.asString,
+ bigWhiskAction.fullyQualifiedName(true),
+ DocRevision.empty,
+ bigActionMetaData,
+ schedulerInstanceId,
+ schedulerHost,
+ rpcPort,
+ creationId = creationId)
+
+ /** Creates a sequence of containers and a factory returning this sequence. */
+ def testContainers(n: Int) = {
+ val containers = (0 to n).map(_ => TestProbe())
+ val queue = mutable.Queue(containers: _*)
+ val factory = (fac: ActorRefFactory) => queue.dequeue().ref
+ (containers, factory)
+ }
+
+ def poolConfig(userMemory: ByteSize,
+ memorySyncInterval: FiniteDuration = FiniteDuration(1, TimeUnit.SECONDS),
+ prewarmMaxRetryLimit: Int = 3,
+ prewarmPromotion: Boolean = false,
+ prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) =
+ ContainerPoolConfig(
+ userMemory,
+ 0.5,
+ false,
+ FiniteDuration(2, TimeUnit.SECONDS),
+ FiniteDuration(1, TimeUnit.MINUTES),
+ None,
+ 100,
+ prewarmMaxRetryLimit,
+ prewarmPromotion,
+ memorySyncInterval,
+ prewarmContainerCreationConfig)
+
+ def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
+ ackMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
+ val topic = s"creationAck${schedulerInstanceId.asString}"
+ producer.send(topic, ackMessage)
+ }
+
+ behavior of "ContainerPool"
+
+ /*
+ * CONTAINER SCHEDULING
+ *
+ * These tests only test the simplest approaches. Look below for full coverage tests
+ * of the respective scheduling methods.
+ */
+
+ it should "create a container if it cannot find a matching prewarmed container" in within(timeout) {
+ val (containers, factory) = testContainers(2)
+ val doc = put(entityStore, whiskAction)
+ // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 4),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(producer))))
+
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(1).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ }
+
+ it should "not start a new container if there is not enough space in the pool" in within(timeout) {
+ val (containers, factory) = testContainers(2)
+ val doc = put(entityStore, whiskAction)
+ val bigDoc = put(entityStore, bigWhiskAction)
+ // use a fake producer here so sendAckToScheduler won't failed
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 2),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(producer))))
+
+ // Start first action
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ // Send second action to the pool
+ pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction) // message is too large to be processed immediately.
+ containers(1).expectNoMessage(100.milliseconds)
+
+ // First container is removed
+ containers(0).send(pool, ContainerRemoved(true)) // pool is empty again.
+
+ pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction)
+ // Second container should run now
+ containers(1).expectMsgPF() {
+ case Initialize(invocationNamespace, bigExecuteAction, schedulerHost, rpcPort, _) => true
+ }
+ }
+
+ it should "not start a new container if it is shut down" in within(timeout) {
+ val (containers, factory) = testContainers(1)
+ val doc = put(entityStore, bigWhiskAction)
+ val topic = s"creationAck${schedulerInstanceId.asString}"
+ val consumer = new TestConnector(topic, 4, true)
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(consumer.getProducer()))))
+
+ pool ! GracefulShutdown
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
+
+ containers(0).expectNoMessage()
+
+ val error =
+ s"creationId: ${creationMessage.creationId}, invoker is shutting down, reschedule ${creationMessage.action.copy(version = None)}"
+ val ackMessage =
+ createAckMsg(creationMessage.copy(revision = doc.rev), Some(ShuttingDownError), Some(error))
+
+ utilRetry({
+ val buffer = consumer.peek(50.millisecond)
+ buffer.size shouldBe 1
+ buffer.head._1 shouldBe topic
+ buffer.head._4 shouldBe ackMessage.serialize.getBytes
+ }, 10, Some(500.millisecond))
+
+ // pool should be back to work after enabled again
+ pool ! Enable
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ }
+
+ it should "create prewarmed containers on startup" in within(timeout) {
+ stream.reset()
+ val (containers, factory) = testContainers(1)
+
+ val pool = system.actorOf(
+ Props(new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 2),
+ invokerInstance,
+ List(PrewarmingConfig(1, exec, memoryLimit)),
+ sendAckToScheduler(producer))))
+ containers(0).expectMsg(Start(exec, memoryLimit))
+ stream.toString should include("initing 1 pre-warms to desired count: 1")
+ stream.toString should not include ("prewarm container creation is starting with creation delay configuration")
+ }
+
+ it should "create prewarmed containers on startup with creation delay configuration" in within(7.seconds) {
+ stream.reset()
+ val (containers, factory) = testContainers(3)
+ val prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] =
+ Some(PrewarmContainerCreationConfig(1, 3.seconds))
+
+ val poolConfig = ContainerPoolConfig(
+ MemoryLimit.STD_MEMORY * 3,
+ 0.5,
+ false,
+ FiniteDuration(10, TimeUnit.SECONDS),
+ FiniteDuration(10, TimeUnit.SECONDS),
+ None,
+ 100,
+ 3,
+ false,
+ FiniteDuration(10, TimeUnit.SECONDS),
+ prewarmContainerCreationConfig)
+
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig,
+ invokerInstance,
+ List(PrewarmingConfig(3, exec, memoryLimit)),
+ sendAckToScheduler(producer))))
+ containers(0).expectMsg(Start(exec, memoryLimit))
+ containers(1).expectNoMessage(2.seconds)
+ containers(1).expectMsg(4.seconds, Start(exec, memoryLimit))
+ containers(2).expectNoMessage(2.seconds)
+ containers(2).expectMsg(4.seconds, Start(exec, memoryLimit))
+ stream.toString should include("prewarm container creation is starting with creation delay configuration")
+ }
+
+ it should "backfill prewarms when prewarm containers are removed" in within(timeout) {
+ val (containers, factory) = testContainers(6)
+
+ val pool = system.actorOf(
+ Props(new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 2),
+ invokerInstance,
+ List(PrewarmingConfig(2, exec, memoryLimit)),
+ sendAckToScheduler(producer))))
+
+ containers(0).expectMsg(Start(exec, memoryLimit))
+ containers(1).expectMsg(Start(exec, memoryLimit))
+
+ //removing 2 prewarm containers will start 2 containers via backfill
+ containers(0).send(pool, ContainerRemoved(true))
+ containers(1).send(pool, ContainerRemoved(true))
+ containers(2).expectMsg(Start(exec, memoryLimit))
+ containers(3).expectMsg(Start(exec, memoryLimit))
+ //make sure extra prewarms are not started
+ containers(4).expectNoMessage(100.milliseconds)
+ containers(5).expectNoMessage(100.milliseconds)
+ }
+
+ it should "use a prewarmed container when kind and memory are both match and create a new one to fill its place when prewarmPromotion is false" in within(
+ timeout) {
+ val (containers, factory) = testContainers(4)
+ val doc = put(entityStore, whiskAction)
+ val biggerMemory = memoryLimit * 2
+
+ val pool = system.actorOf(
+ Props(new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 6, prewarmPromotion = false),
+ invokerInstance,
+ List(PrewarmingConfig(1, exec, memoryLimit), PrewarmingConfig(1, exec, biggerMemory)),
+ sendAckToScheduler(producer))))
+ containers(0).expectMsg(Start(exec, memoryLimit))
+ containers(1).expectMsg(Start(exec, biggerMemory))
+
+ // prewarm container is started
+ containers(0).send(pool, ReadyToWork(prewarmedData))
+ containers(1).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = biggerMemory)))
+
+ // the prewarm container with matched memory should be chose
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ // prewarm a new container
+ containers(2).expectMsgPF() {
+ case Start(exec, memoryLimit, _) => true
+ }
+
+ // the prewarm container with bigger memory should not be chose
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(3).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ }
+
+ it should "use a prewarmed container when kind is matched and create a new one to fill its place when prewarmPromotion is true" in within(
+ timeout) {
+ val (containers, factory) = testContainers(6)
+ val doc = put(entityStore, whiskAction)
+ val biggerMemory = memoryLimit * 2
+ val biggestMemory = memoryLimit * 3
+
+ val pool = system.actorOf(
+ Props(new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 6, prewarmPromotion = true),
+ invokerInstance,
+ List(PrewarmingConfig(1, exec, memoryLimit), PrewarmingConfig(1, exec, biggestMemory)),
+ sendAckToScheduler(producer))))
+ containers(0).expectMsg(Start(exec, memoryLimit))
+ containers(1).expectMsg(Start(exec, biggestMemory))
+
+ // two prewarm containers are started
+ containers(0).send(pool, ReadyToWork(prewarmedData))
+ containers(1).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = biggestMemory)))
+
+ // the prewarm container with smallest memory should be chose
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ // prewarm a new container
+ containers(2).expectMsgPF() {
+ case Start(exec, memoryLimit, _) => true
+ }
+
+ // the prewarm container with bigger memory should be chose
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(1).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ // prewarm a new container
+ containers(3).expectMsgPF() {
+ case Start(exec, biggestMemory, _) => true
+ }
+
+ // now free memory is (6 - 3 - 1) * stdMemory, and required 2 * stdMemory, so both two prewarmed containers are not suitable
+ // a new container should be created
+ pool ! CreationContainer(creationMessageLarge.copy(revision = doc.rev), bigWhiskAction)
+ containers(4).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ // no new prewarmed container should be created
+ containers(6).expectNoMessage(500.milliseconds)
+ }
+
+ it should "not use a prewarmed container if it doesn't fit the kind" in within(timeout) {
+ val (containers, factory) = testContainers(2)
+ val doc = put(entityStore, whiskAction)
+
+ val alternativeExec = CodeExecAsString(RuntimeManifest("anotherKind", ImageName("testImage")), "testCode", None)
+
+ val pool = system.actorOf(
+ Props(new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY),
+ invokerInstance,
+ List(PrewarmingConfig(1, alternativeExec, memoryLimit)),
+ sendAckToScheduler(producer))))
+
+ containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
+ containers(0).send(pool, ReadyToWork(prewarmedData.copy(kind = alternativeExec.kind))) // container0 was started
+
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(1).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ }
+
+ it should "not use a prewarmed container if it doesn't fit memory wise" in within(timeout) {
+ val (containers, factory) = testContainers(2)
+ val doc = put(entityStore, whiskAction)
+
+ val alternativeLimit = 128.MB
+
+ val pool = system.actorOf(
+ Props(new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY),
+ invokerInstance,
+ List(PrewarmingConfig(1, exec, alternativeLimit)),
+ sendAckToScheduler(producer))))
+
+ containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
+ containers(0).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = alternativeLimit))) // container0 was started
+
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(1).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ }
+
+ it should "use a warmed container when invocationNamespace, action and revision matched" in within(timeout) {
+ val (containers, factory) = testContainers(3)
+ val doc = put(entityStore, whiskAction)
+
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 4),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(producer))))
+
+ // register a fake warmed container
+ val container = TestProbe()
+ pool.tell(
+ ContainerIsPaused(
+ WarmData(
+ stub[DockerContainer],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ doc.rev,
+ Instant.now,
+ TestProbe().ref)),
+ container.ref)
+
+ // the revision doesn't match, create 1 container
+ pool ! CreationContainer(creationMessage, whiskAction)
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ // the invocation namespace doesn't match, create 1 container
+ pool ! CreationContainer(creationMessage.copy(invocationNamespace = "otherNamespace"), whiskAction)
+ containers(1).expectMsgPF() {
+ case Initialize("otherNamespace", executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ container.expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ // warmed container is occupied, create 1 more container
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(2).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ }
+
+ it should "retry when chosen warmed container is failed to resume" in within(timeout) {
+
+ val (containers, factory) = testContainers(2)
+ val doc = put(entityStore, whiskAction)
+
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 2),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(producer))))
+
+ // register a fake warmed container
+ val container = TestProbe()
+ pool.tell(
+ ContainerIsPaused(
+ WarmData(
+ stub[DockerContainer],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ doc.rev,
+ Instant.now,
+ TestProbe().ref)),
+ container.ref)
+
+ // choose the warmed container
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ container.expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ // warmed container is failed to resume
+ pool.tell(
+ ResumeFailed(
+ WarmData(
+ stub[DockerContainer],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ doc.rev,
+ Instant.now,
+ TestProbe().ref)),
+ container.ref)
+
+ // then a new container will be created
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ }
+
+ it should "remove oldest previously used container to make space for the job passed to run" in within(timeout) {
+ val (containers, factory) = testContainers(2)
+ val doc = put(entityStore, whiskAction)
+
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 3),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(producer))))
+
+ // register three fake warmed containers, so now pool has no space for new container
+ val container1 = TestProbe()
+ pool.tell(
+ ContainerIsPaused(
+ WarmData(
+ stub[DockerContainer],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ doc.rev,
+ Instant.now,
+ TestProbe().ref)),
+ container1.ref)
+
+ val container2 = TestProbe()
+ pool.tell(
+ ContainerIsPaused(
+ WarmData(
+ stub[DockerContainer],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ doc.rev,
+ Instant.now,
+ TestProbe().ref)),
+ container2.ref)
+
+ val container3 = TestProbe()
+ pool.tell(
+ ContainerIsPaused(
+ WarmData(
+ stub[DockerContainer],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ doc.rev,
+ Instant.now,
+ TestProbe().ref)),
+ container3.ref)
+
+ // now the pool has no free memory, and new job needs 2*stdMemory, so it needs to remove two warmed containers
+ pool ! CreationContainer(creationMessage, bigWhiskAction)
+ container1.expectMsg(Remove)
+ container2.expectMsg(Remove)
+ container3.expectNoMessage()
+
+ // a new container will be created
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ }
+
+ private def createAckMsg(creationMessage: ContainerCreationMessage,
+ error: Option[ContainerCreationError],
+ reason: Option[String]) = {
+ ContainerCreationAckMessage(
+ creationMessage.transid,
+ creationMessage.creationId,
+ invocationNamespace.asString,
+ creationMessage.action,
+ creationMessage.revision,
+ creationMessage.whiskActionMetaData,
+ invokerInstance,
+ creationMessage.schedulerHost,
+ creationMessage.rpcPort,
+ creationMessage.retryCount,
+ error,
+ reason)
+ }
+
+ it should "send ack(success) to scheduler when container creation is finished" in within(timeout) {
+ val (containers, factory) = testContainers(1)
+ val doc = put(entityStore, whiskAction)
+ // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
+ val topic = s"creationAck${creationMessage.rootSchedulerIndex.asString}"
+
+ val consumer = new TestConnector(topic, 4, true)
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(consumer.getProducer()))))
+
+ val actualCreationMessage = creationMessage.copy(revision = doc.rev)
+ val ackMessage = createAckMsg(actualCreationMessage, None, None)
+
+ pool ! CreationContainer(actualCreationMessage, whiskAction)
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ containers(0).send(pool, Initialized(initializedData)) // container is initialized
+
+ utilRetry({
+ val buffer = consumer.peek(50.millisecond)
+ buffer.size shouldBe 1
+ buffer.head._1 shouldBe topic
+ buffer.head._4 shouldBe ackMessage.serialize.getBytes
+ }, 10, Some(500.millisecond))
+ }
+
+ it should "send ack(success) to scheduler when chosen warmed container is resumed" in within(timeout) {
+ val (containers, factory) = testContainers(1)
+ val doc = put(entityStore, whiskAction)
+ // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
+ val topic = s"creationAck${creationMessage.rootSchedulerIndex.asString}"
+
+ val consumer = new TestConnector(topic, 4, true)
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(consumer.getProducer()))))
+
+ val actualCreationMessage = creationMessage.copy(revision = doc.rev)
+ val ackMessage = createAckMsg(actualCreationMessage, None, None)
+
+ // register a fake warmed container
+ val container = TestProbe()
+ pool.tell(
+ ContainerIsPaused(
+ WarmData(
+ stub[DockerContainer],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ doc.rev,
+ Instant.now,
+ TestProbe().ref)),
+ container.ref)
+
+ // choose the warmed container
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ container.expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ pool.tell(
+ Resumed(
+ WarmData(
+ stub[DockerContainer],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ doc.rev,
+ Instant.now,
+ TestProbe().ref)),
+ container.ref)
+
+ utilRetry({
+ val buffer = consumer.peek(50.millisecond)
+ buffer.size shouldBe 1
+ buffer.head._1 shouldBe topic
+ buffer.head._4 shouldBe ackMessage.serialize.getBytes
+ }, 10, Some(500.millisecond))
+ }
+
+ it should "send ack(reschedule) to scheduler when container creation is failed or resource is not enough" in within(
+ timeout) {
+ val (containers, factory) = testContainers(1)
+ val doc = put(entityStore, bigWhiskAction)
+ val doc2 = put(entityStore, whiskAction)
+ val topic = s"creationAck${schedulerInstanceId.asString}"
+ val consumer = new TestConnector(topic, 4, true)
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(consumer.getProducer()))))
+
+ val actualCreationMessageLarge = creationMessageLarge.copy(revision = doc.rev)
+ val error =
+ s"creationId: ${creationMessageLarge.creationId}, invoker[$invokerInstance] doesn't have enough resource for container: ${creationMessageLarge.action}"
+ val ackMessage =
+ createAckMsg(actualCreationMessageLarge, Some(ResourceNotEnoughError), Some(error))
+
+ pool ! CreationContainer(actualCreationMessageLarge, bigWhiskAction)
+
+ utilRetry({
+ val buffer = consumer.peek(50.millisecond)
+ buffer.size shouldBe 1
+ buffer.head._1 shouldBe topic
+ buffer.head._4 shouldBe ackMessage.serialize.getBytes
+ }, 10, Some(500.millisecond))
+
+ val actualCreationMessage = creationMessage.copy(revision = doc2.rev)
+ val rescheduleAckMsg = createAckMsg(actualCreationMessage, Some(UnknownError), Some("ContainerProxy init failed."))
+
+ pool ! CreationContainer(actualCreationMessage, whiskAction)
+ containers(0).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ containers(0).send(pool, ContainerRemoved(true)) // the container0 init failed or create container failed
+
+ utilRetry({
+ val buffer2 = consumer.peek(50.millisecond)
+ buffer2.size shouldBe 1
+ buffer2.head._1 shouldBe topic
+ buffer2.head._4 shouldBe rescheduleAckMsg.serialize.getBytes
+ }, 10, Some(500.millisecond))
+ }
+
+ it should "send memory info to invokerHealthManager immediately when doesn't have enough resource happens" in within(
+ timeout) {
+ val (containers, factory) = testContainers(1)
+ val doc = put(entityStore, whiskAction)
+
+ val invokerHealthService = TestProbe()
+ var count = 0
+ invokerHealthService.setAutoPilot((_: ActorRef, msg: Any) =>
+ msg match {
+ case _: MemoryInfo =>
+ count += 1
+ TestActor.KeepRunning
+
+ case _ =>
+ TestActor.KeepRunning
+ })
+
+ val pool = system.actorOf(
+ Props(new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 1, memorySyncInterval = 1.minute),
+ invokerInstance,
+ List(PrewarmingConfig(1, exec, memoryLimit)),
+ sendAckToScheduler(producer))))
+ containers(0).expectMsg(Start(exec, memoryLimit))
+
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+
+ awaitAssert {
+ count shouldBe 3
+ }
+ }
+
+ it should "adjust prewarm container run well without reactive config" in {
+ stream.reset()
+ val (containers, factory) = testContainers(4)
+
+ val prewarmExpirationCheckInitDelay = FiniteDuration(2, TimeUnit.SECONDS)
+ val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
+ val poolConfig =
+ ContainerPoolConfig(
+ MemoryLimit.STD_MEMORY * 4,
+ 0.5,
+ false,
+ prewarmExpirationCheckInitDelay,
+ prewarmExpirationCheckIntervel,
+ None,
+ 100,
+ 3,
+ false,
+ 1.second)
+ val initialCount = 2
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig,
+ invokerInstance,
+ List(PrewarmingConfig(initialCount, exec, memoryLimit)),
+ sendAckToScheduler(producer))))
+
+ containers(0).expectMsg(Start(exec, memoryLimit))
+ containers(1).expectMsg(Start(exec, memoryLimit))
+ containers(0).send(pool, ReadyToWork(prewarmedData))
+ containers(1).send(pool, ReadyToWork(prewarmedData))
+
+ // when invoker starts, include 0 prewarm container at the very beginning
+ stream.toString should include(s"found 0 started")
+
+ // the desiredCount should equal with initialCount when invoker starts
+ stream.toString should include(s"desired count: ${initialCount}")
+
+ stream.reset()
+
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+ // Because already supplemented the prewarmed container, so currentCount should equal with initialCount
+ eventually {
+ stream.toString should not include ("started")
+ }
+ }
+
+ it should "adjust prewarm container run well with reactive config" in {
+ stream.reset()
+ val (containers, factory) = testContainers(15)
+ val doc = put(entityStore, whiskAction)
+
+ val prewarmExpirationCheckInitDelay = FiniteDuration(2, TimeUnit.SECONDS)
+ val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
+ val poolConfig =
+ ContainerPoolConfig(
+ MemoryLimit.STD_MEMORY * 8,
+ 0.5,
+ false,
+ prewarmExpirationCheckInitDelay,
+ prewarmExpirationCheckIntervel,
+ None,
+ 100,
+ 3,
+ false,
+ 1.second)
+ val minCount = 0
+ val initialCount = 2
+ val maxCount = 4
+ val ttl = FiniteDuration(500, TimeUnit.MILLISECONDS)
+ val threshold = 1
+ val increment = 1
+ val deadline: Option[Deadline] = Some(ttl.fromNow)
+ val reactive: Option[ReactivePrewarmingConfig] =
+ Some(ReactivePrewarmingConfig(minCount, maxCount, ttl, threshold, increment))
+ val prewarmedData = PreWarmData(mock[MockableV2Container], actionKind, memoryLimit, deadline)
+ val pool = system.actorOf(
+ Props(new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig,
+ invokerInstance,
+ List(PrewarmingConfig(initialCount, exec, memoryLimit, reactive)),
+ sendAckToScheduler(producer))))
+
+ containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(0).send(pool, ReadyToWork(prewarmedData))
+ containers(1).send(pool, ReadyToWork(prewarmedData))
+
+ // when invoker starts, include 0 prewarm container at the very beginning
+ stream.toString should include(s"found 0 started")
+
+ // the desiredCount should equal with initialCount when invoker starts
+ stream.toString should include(s"desired count: ${initialCount}")
+
+ stream.reset()
+
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+ //expire 2 prewarms
+ containers(0).expectMsg(Remove)
+ containers(1).expectMsg(Remove)
+ containers(0).send(pool, ContainerRemoved(true))
+ containers(1).send(pool, ContainerRemoved(true))
+
+ // currentCount should equal with 0 due to these 2 prewarmed containers are expired
+ stream.toString should not include (s"found 0 started")
+
+ // the desiredCount should equal with minCount because cold start didn't happen
+ stream.toString should not include (s"desired count: ${minCount}")
+ // Previously created prewarmed containers should be removed
+ stream.toString should not include (s"removed ${initialCount} expired prewarmed container")
+
+ // 2 cold start happened
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(2).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(3).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+ eventually {
+ // Because already removed expired prewarmed containrs, so currentCount should equal with 0
+ stream.toString should include(s"found 0 started")
+ // the desiredCount should equal with 2 due to cold start happened
+ stream.toString should include(s"desired count: 2")
+ }
+ containers(4).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(5).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(4).send(pool, ReadyToWork(prewarmedData))
+ containers(5).send(pool, ReadyToWork(prewarmedData))
+
+ stream.reset()
+
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+ containers(4).expectMsg(Remove)
+ containers(5).expectMsg(Remove)
+ containers(4).send(pool, ContainerRemoved(true))
+ containers(5).send(pool, ContainerRemoved(true))
+
+ // removed previous 2 prewarmed container due to expired
+ stream.toString should include(s"removing up to ${poolConfig.prewarmExpirationLimit} of 2 expired containers")
+
+ stream.reset()
+
+ // 5 code start happened(5 > maxCount)
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(6).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(7).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(8).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(9).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+ pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+ containers(10).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+ }
+
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+ eventually {
+ // Because already removed expired prewarmed containrs, so currentCount should equal with 0
+ stream.toString should include(s"found 0 started")
+ // in spite of the cold start number > maxCount, but the desiredCount can't be greater than maxCount
+ stream.toString should include(s"desired count: ${maxCount}")
+ }
+
+ containers(11).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(12).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(13).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(14).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(11).send(pool, ReadyToWork(prewarmedData))
+ containers(12).send(pool, ReadyToWork(prewarmedData))
+ containers(13).send(pool, ReadyToWork(prewarmedData))
+ containers(14).send(pool, ReadyToWork(prewarmedData))
+ }
+
+ it should "prewarm failed creation count cannot exceed max retry limit" in {
+ stream.reset()
+ val (containers, factory) = testContainers(4)
+
+ val prewarmExpirationCheckInitDelay = FiniteDuration(1, TimeUnit.MINUTES)
+ val prewarmExpirationCheckIntervel = FiniteDuration(1, TimeUnit.MINUTES)
+ val maxRetryLimit = 3
+ val poolConfig =
+ ContainerPoolConfig(
+ MemoryLimit.STD_MEMORY * 4,
+ 0.5,
+ false,
+ prewarmExpirationCheckInitDelay,
+ prewarmExpirationCheckIntervel,
+ None,
+ 100,
+ maxRetryLimit,
+ false,
+ 1.second)
+ val initialCount = 1
+ val pool = system.actorOf(
+ Props(
+ new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig,
+ invokerInstance,
+ List(PrewarmingConfig(initialCount, exec, memoryLimit)),
+ sendAckToScheduler(producer))))
+
+ // create the prewarm initially
+ containers(0).expectMsg(Start(exec, memoryLimit))
+ containers(0).send(pool, ContainerRemoved(true))
+ stream.toString should not include (s"prewarm create failed count exceeds max retry limit")
+
+ // the first retry
+ containers(1).expectMsg(Start(exec, memoryLimit))
+ containers(1).send(pool, ContainerRemoved(true))
+ stream.toString should not include (s"prewarm create failed count exceeds max retry limit")
+
+ // the second retry
+ containers(2).expectMsg(Start(exec, memoryLimit))
+ containers(2).send(pool, ContainerRemoved(true))
+ stream.toString should not include (s"prewarm create failed count exceeds max retry limit")
+
+ // the third retry
+ containers(3).expectMsg(Start(exec, memoryLimit))
+ containers(3).send(pool, ContainerRemoved(true))
+
+ // the forth retry but failed retry count exceeds max retry limit
+ eventually {
+ stream.toString should include(s"prewarm create failed count exceeds max retry limit")
+ }
+ }
+
+}
+
+abstract class MockableV2Container extends Container {
+ protected[core] val addr: ContainerAddress = ContainerAddress("nohost")
+}