You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ji...@apache.org on 2021/05/12 01:35:02 UTC

[openwhisk] branch master updated: [New Scheduler] Implement FunctionPullingContainerPool (#5102)

This is an automated email from the ASF dual-hosted git repository.

jiangpengcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3802374  [New Scheduler] Implement FunctionPullingContainerPool (#5102)
3802374 is described below

commit 3802374d58d87fc6a95477929fc67269d6dcfe2c
Author: ningyougang <41...@qq.com>
AuthorDate: Wed May 12 09:34:06 2021 +0800

    [New Scheduler] Implement FunctionPullingContainerPool (#5102)
    
    * Implement FunctionPullingContainerPool
    
    * Fix review points
---
 ansible/roles/invoker/tasks/deploy.yml             |    6 +-
 .../org/apache/openwhisk/common/Logging.scala      |    4 +
 .../apache/openwhisk/core/connector/Message.scala  |  162 +++
 .../core/containerpool/ContainerFactory.scala      |   12 +-
 .../org/apache/openwhisk/core/entity/DocInfo.scala |   22 +-
 .../org/apache/openwhisk/core/entity/Size.scala    |    2 +-
 core/invoker/src/main/resources/application.conf   |    6 +-
 .../core/containerpool/ContainerPool.scala         |   12 +-
 .../v2/FunctionPullingContainerPool.scala          |  857 +++++++++++++++
 .../v2/FunctionPullingContainerProxy.scala         |   13 +-
 .../containerpool/v2/InvokerHealthManager.scala    |    5 +-
 .../core/connector/test/TestConnector.scala        |    2 +
 .../mesos/test/MesosContainerFactoryTest.scala     |    2 +-
 .../containerpool/test/ContainerPoolTests.scala    |   34 +-
 .../containerpool/test/ContainerProxyTests.scala   |    2 +-
 .../test/FunctionPullingContainerPoolTests.scala   | 1152 ++++++++++++++++++++
 16 files changed, 2269 insertions(+), 24 deletions(-)

diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index fe79439..591a07e 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -282,7 +282,11 @@
       "CONFIG_whisk_invoker_https_keystorePassword": "{{ invoker.ssl.keystore.password }}"
       "CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
       "CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
-      "CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
+      "CONFIG_whisk_containerPool_prewarmExpirationCheckInitDelay": "{{ container_pool_prewarm_expirationCheckInitDelay | default('10 minutes') }}"
+      "CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('10 minutes') }}"
+      "CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{ container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds') }}"
+      "CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict | default('false') | lower }}"
+      "CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{ container_pool_prewarm_max_retry_limit | default(5) }}"
 
 - name: extend invoker dns env
   set_fact:
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 885b2a2..afce8fb 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -388,6 +388,10 @@ object LoggingMarkers {
 
   // Time that is needed to produce message in kafka
   val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka, start)(MeasurementUnit.time.milliseconds)
+  def INVOKER_SHAREDPACKAGE(path: String) =
+    LogMarkerToken(invoker, "sharedPackage", counter, None, Map("path" -> path))(MeasurementUnit.none)
+  def INVOKER_CONTAINERPOOL_MEMORY(state: String) =
+    LogMarkerToken(invoker, "containerPoolMemory", counter, Some(state), Map("state" -> state))(MeasurementUnit.none)
 
   // System overload and random invoker assignment
   val MANAGED_SYSTEM_OVERLOAD =
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index bcd56e0..88f4f11 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -484,3 +484,165 @@ object StatusData extends DefaultJsonProtocol {
   implicit val serdes =
     jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
 }
+
+case class ContainerCreationMessage(override val transid: TransactionId,
+                                    invocationNamespace: String,
+                                    action: FullyQualifiedEntityName,
+                                    revision: DocRevision,
+                                    whiskActionMetaData: WhiskActionMetaData,
+                                    rootSchedulerIndex: SchedulerInstanceId,
+                                    schedulerHost: String,
+                                    rpcPort: Int,
+                                    retryCount: Int = 0,
+                                    creationId: CreationId = CreationId.generate())
+    extends ContainerMessage(transid) {
+
+  override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)
+  override def serialize: String = toJson.compactPrint
+}
+
+object ContainerCreationMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[ContainerCreationMessage] = Try(serdes.read(msg.parseJson))
+
+  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+  private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
+  private implicit val byteSizeSerdes = size.serdes
+  implicit val serdes = jsonFormat10(
+    ContainerCreationMessage.apply(
+      _: TransactionId,
+      _: String,
+      _: FullyQualifiedEntityName,
+      _: DocRevision,
+      _: WhiskActionMetaData,
+      _: SchedulerInstanceId,
+      _: String,
+      _: Int,
+      _: Int,
+      _: CreationId))
+}
+
+case class ContainerDeletionMessage(override val transid: TransactionId,
+                                    invocationNamespace: String,
+                                    action: FullyQualifiedEntityName,
+                                    revision: DocRevision,
+                                    whiskActionMetaData: WhiskActionMetaData)
+    extends ContainerMessage(transid) {
+  override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)
+  override def serialize: String = toJson.compactPrint
+}
+
+object ContainerDeletionMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[ContainerDeletionMessage] = Try(serdes.read(msg.parseJson))
+
+  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+  private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
+  private implicit val byteSizeSerdes = size.serdes
+  implicit val serdes = jsonFormat5(
+    ContainerDeletionMessage
+      .apply(_: TransactionId, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData))
+}
+
+abstract class ContainerMessage(private val tid: TransactionId) extends Message {
+  override val transid: TransactionId = tid
+  override def serialize: String = ContainerMessage.serdes.write(this).compactPrint
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+}
+
+object ContainerMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[ContainerMessage] = Try(serdes.read(msg.parseJson))
+
+  implicit val serdes = new RootJsonFormat[ContainerMessage] {
+    override def write(m: ContainerMessage): JsValue = m.toJson
+
+    override def read(json: JsValue): ContainerMessage = {
+      val JsObject(fields) = json
+      val creation = fields.contains("creationId")
+      if (creation) {
+        json.convertTo[ContainerCreationMessage]
+      } else {
+        json.convertTo[ContainerDeletionMessage]
+      }
+    }
+  }
+}
+
+sealed trait ContainerCreationError
+object ContainerCreationError extends Enumeration {
+  case object NoAvailableInvokersError extends ContainerCreationError
+  case object NoAvailableResourceInvokersError extends ContainerCreationError
+  case object ResourceNotEnoughError extends ContainerCreationError
+  case object WhiskError extends ContainerCreationError
+  case object UnknownError extends ContainerCreationError
+  case object TimeoutError extends ContainerCreationError
+  case object ShuttingDownError extends ContainerCreationError
+  case object NonExecutableActionError extends ContainerCreationError
+  case object DBFetchError extends ContainerCreationError
+  case object BlackBoxError extends ContainerCreationError
+  case object ZeroNamespaceLimit extends ContainerCreationError
+  case object TooManyConcurrentRequests extends ContainerCreationError
+
+  val whiskErrors: Set[ContainerCreationError] =
+    Set(
+      NoAvailableInvokersError,
+      NoAvailableResourceInvokersError,
+      ResourceNotEnoughError,
+      WhiskError,
+      ShuttingDownError,
+      UnknownError,
+      TimeoutError,
+      ZeroNamespaceLimit)
+
+  def fromName(name: String) = name.toUpperCase match {
+    case "NOAVAILABLEINVOKERSERROR"         => NoAvailableInvokersError
+    case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
+    case "RESOURCENOTENOUGHERROR"           => ResourceNotEnoughError
+    case "NONEXECUTBLEACTIONERROR"          => NonExecutableActionError
+    case "DBFETCHERROR"                     => DBFetchError
+    case "WHISKERROR"                       => WhiskError
+    case "BLACKBOXERROR"                    => BlackBoxError
+    case "TIMEOUTERROR"                     => TimeoutError
+    case "ZERONAMESPACELIMIT"               => ZeroNamespaceLimit
+    case "TOOMANYCONCURRENTREQUESTS"        => TooManyConcurrentRequests
+    case "UNKNOWNERROR"                     => UnknownError
+  }
+
+  implicit val serds = new RootJsonFormat[ContainerCreationError] {
+    override def write(error: ContainerCreationError): JsValue = JsString(error.toString)
+    override def read(json: JsValue): ContainerCreationError =
+      Try {
+        val JsString(str) = json
+        ContainerCreationError.fromName(str.trim.toUpperCase)
+      } getOrElse {
+        throw deserializationError("ContainerCreationError must be a valid string")
+      }
+  }
+}
+
+case class ContainerCreationAckMessage(override val transid: TransactionId,
+                                       creationId: CreationId,
+                                       invocationNamespace: String,
+                                       action: FullyQualifiedEntityName,
+                                       revision: DocRevision,
+                                       actionMetaData: WhiskActionMetaData,
+                                       rootInvokerIndex: InvokerInstanceId,
+                                       schedulerHost: String,
+                                       rpcPort: Int,
+                                       retryCount: Int = 0,
+                                       error: Option[ContainerCreationError] = None,
+                                       reason: Option[String] = None)
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = ContainerCreationAckMessage.serdes.write(this).compactPrint
+}
+
+object ContainerCreationAckMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[ContainerCreationAckMessage] = Try(serdes.read(msg.parseJson))
+  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+  private implicit val byteSizeSerdes = size.serdes
+  implicit val serdes = jsonFormat12(ContainerCreationAckMessage.apply)
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index 1603ba2..3b5a6c4 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -47,9 +47,14 @@ case class ContainerArgsConfig(network: String,
 case class ContainerPoolConfig(userMemory: ByteSize,
                                concurrentPeekFactor: Double,
                                akkaClient: Boolean,
+                               prewarmExpirationCheckInitDelay: FiniteDuration,
                                prewarmExpirationCheckInterval: FiniteDuration,
                                prewarmExpirationCheckIntervalVariance: Option[FiniteDuration],
-                               prewarmExpirationLimit: Int) {
+                               prewarmExpirationLimit: Int,
+                               prewarmMaxRetryLimit: Int,
+                               prewarmPromotion: Boolean,
+                               memorySyncInterval: FiniteDuration,
+                               prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
   require(
     concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
     s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
@@ -68,6 +73,11 @@ case class ContainerPoolConfig(userMemory: ByteSize,
     max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2) // The minimum allowed cpu-shares is 2
 }
 
+case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay: FiniteDuration) {
+  require(maxConcurrent > 0, "maxConcurrent for per invoker must be > 0")
+  require(creationDelay.toSeconds > 0, "creationDelay must be > 0")
+}
+
 case class RuntimesRegistryCredentials(user: String, password: String)
 
 case class RuntimesRegistryConfig(url: String, credentials: Option[RuntimesRegistryCredentials])
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
index 77e2008..d632f12 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
@@ -17,8 +17,9 @@
 
 package org.apache.openwhisk.core.entity
 
-import scala.util.Try
+import org.apache.commons.lang3.StringUtils
 
+import scala.util.Try
 import spray.json.DefaultJsonProtocol
 import spray.json.JsNull
 import spray.json.JsString
@@ -26,7 +27,6 @@ import spray.json.JsValue
 import spray.json.RootJsonFormat
 import spray.json.deserializationError
 import spray.json._
-
 import org.apache.openwhisk.core.entity.ArgNormalizer.trim
 
 /**
@@ -56,11 +56,27 @@ protected[core] class DocId(val id: String) extends AnyVal {
  *
  * @param rev the document revision, optional
  */
-protected[core] class DocRevision private (val rev: String) extends AnyVal {
+protected[core] class DocRevision private (val rev: String) extends AnyVal with Ordered[DocRevision] {
   def asString = rev // to make explicit that this is a string conversion
   def empty = rev == null
   override def toString = rev
   def serialize = DocRevision.serdes.write(this).compactPrint
+
+  override def compare(that: DocRevision): Int = {
+    if (this.empty && that.empty) {
+      0
+    } else if (this.empty) {
+      -1
+    } else if (that.empty) {
+      1
+    } else {
+      StringUtils.substringBefore(rev, "-").toInt - StringUtils.substringBefore(that.rev, "-").toInt
+    }
+  }
+
+  def ==(that: DocRevision): Boolean = {
+    this.compare(that) == 0
+  }
 }
 
 /**
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
index fd22948..ded9a6a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
@@ -163,7 +163,7 @@ object size {
   implicit val pureconfigReader =
     ConfigReader[ConfigValue].map(v => ByteSize(v.atKey("key").getBytes("key"), SizeUnits.BYTE))
 
-  protected[entity] implicit val serdes = new RootJsonFormat[ByteSize] {
+  implicit val serdes = new RootJsonFormat[ByteSize] {
     def write(b: ByteSize) = JsString(b.toString)
 
     def read(value: JsValue): ByteSize = value match {
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index be68333..9aeea13 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -60,9 +60,13 @@ whisk {
     user-memory: 1024 m
     concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
     akka-client:  false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
-    prewarm-expiration-check-interval: 1 minute # period to check for prewarm expiration
+    prewarm-expiration-check-init-delay: 10 minute # the init delay time for the first check
+    prewarm-expiration-check-interval: 10 minute # period to check for prewarm expiration
     prewarm-expiration-check-interval-variance: 10 seconds # varies expiration across invokers to avoid many concurrent expirations
     prewarm-expiration-limit: 100 # number of prewarms to expire in one expiration cycle (remaining expired will be considered for expiration in next cycle)
+    prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm containers
+    prewarm-promotion: false # if true, action can take prewarm container which has bigger memory
+    memory-sync-interval: 1 second # period to sync memory info to etcd
   }
 
   kubernetes {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 724cd59..c6358aa 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -91,7 +91,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
         .nextInt(v.toSeconds.toInt))
     .getOrElse(0)
     .seconds
-  context.system.scheduler.schedule(2.seconds, interval, self, AdjustPrewarmedContainer)
+  if (prewarmConfig.exists(!_.reactive.isEmpty)) {
+    context.system.scheduler.schedule(
+      poolConfig.prewarmExpirationCheckInitDelay,
+      interval,
+      self,
+      AdjustPrewarmedContainer)
+  }
 
   def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
     val namespaceName = r.msg.user.namespace.name.asString
@@ -590,9 +596,9 @@ object ContainerPool {
               }
               .sortBy(_._2.expires.getOrElse(now))
 
-            // emit expired container counter metric with memory + kind
-            MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
             if (expiredPrewarmedContainer.nonEmpty) {
+              // emit expired container counter metric with memory + kind
+              MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
               logging.info(
                 this,
                 s"[kind: ${kind} memory: ${memory.toString}] ${expiredPrewarmedContainer.size} expired prewarmed containers")
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
new file mode 100644
index 0000000..7260c83
--- /dev/null
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -0,0 +1,857 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector.ContainerCreationError._
+import org.apache.openwhisk.core.connector.{
+  ContainerCreationAckMessage,
+  ContainerCreationMessage,
+  ContainerDeletionMessage
+}
+import org.apache.openwhisk.core.containerpool.{
+  AdjustPrewarmedContainer,
+  BlackboxStartupError,
+  ColdStartKey,
+  ContainerPool,
+  ContainerPoolConfig,
+  ContainerRemoved,
+  PrewarmingConfig,
+  WhiskContainerStartupError
+}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.http.Messages
+
+import scala.annotation.tailrec
+import scala.collection.concurrent.TrieMap
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Random, Try}
+import scala.collection.immutable.Queue
+
+case class CreationContainer(creationMessage: ContainerCreationMessage, action: WhiskAction)
+case class DeletionContainer(deletionMessage: ContainerDeletionMessage)
+case object Remove
+case class Keep(timeout: FiniteDuration)
+case class PrewarmContainer(maxConcurrent: Int)
+
+/**
+ * A pool managing containers to run actions on.
+ *
+ * This pool fulfills the other half of the ContainerProxy contract. Only
+ * one job (either Start or Run) is sent to a child-actor at any given
+ * time. The pool then waits for a response of that container, indicating
+ * the container is done with the job. Only then will the pool send another
+ * request to that container.
+ *
+ * Upon actor creation, the pool will start to prewarm containers according
+ * to the provided prewarmConfig, iff set. Those containers will **not** be
+ * part of the poolsize calculation, which is capped by the poolSize parameter.
+ * Prewarm containers are only used, if they have matching arguments
+ * (kind, memory) and there is space in the pool.
+ *
+ * @param childFactory method to create new container proxy actor
+ * @param prewarmConfig optional settings for container prewarming
+ * @param poolConfig config for the ContainerPool
+ */
+class FunctionPullingContainerPool(
+  childFactory: ActorRefFactory => ActorRef,
+  invokerHealthService: ActorRef,
+  poolConfig: ContainerPoolConfig,
+  instance: InvokerInstanceId,
+  prewarmConfig: List[PrewarmingConfig] = List.empty,
+  sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+  implicit val logging: Logging)
+    extends Actor {
+  import ContainerPoolV2.memoryConsumptionOf
+
+  implicit val ec = context.system.dispatcher
+
+  private var busyPool = immutable.Map.empty[ActorRef, Data]
+  private var inProgressPool = immutable.Map.empty[ActorRef, Data]
+  private var warmedPool = immutable.Map.empty[ActorRef, WarmData]
+  private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
+  private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
+
+  private var shuttingDown = false
+
+  private val creationMessages = TrieMap[ActorRef, ContainerCreationMessage]()
+
+  private var preWarmScheduler: Option[Cancellable] = None
+  private var prewarmConfigQueue = Queue.empty[(CodeExec[_], ByteSize, Option[FiniteDuration])]
+  private val prewarmCreateFailedCount = new AtomicInteger(0)
+
+  val logScheduler = context.system.scheduler.schedule(0.seconds, 1.seconds) {
+    MetricEmitter.emitHistogramMetric(
+      LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("inprogress"),
+      memoryConsumptionOf(inProgressPool))
+    MetricEmitter.emitHistogramMetric(
+      LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("busy"),
+      memoryConsumptionOf(busyPool))
+    MetricEmitter.emitHistogramMetric(
+      LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("prewarmed"),
+      memoryConsumptionOf(prewarmedPool))
+    MetricEmitter.emitHistogramMetric(LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("max"), poolConfig.userMemory.toMB)
+  }
+
+  // Key is ColdStartKey, value is the number of cold Start in minute
+  var coldStartCount = immutable.Map.empty[ColdStartKey, Int]
+
+  adjustPrewarmedContainer(true, false)
+
+  // check periodically, adjust prewarmed container(delete if unused for some time and create some increment containers)
+  // add some random amount to this schedule to avoid a herd of container removal + creation
+  val interval = poolConfig.prewarmExpirationCheckInterval + poolConfig.prewarmExpirationCheckIntervalVariance
+    .map(v =>
+      Random
+        .nextInt(v.toSeconds.toInt))
+    .getOrElse(0)
+    .seconds
+
+  if (prewarmConfig.exists(!_.reactive.isEmpty)) {
+    context.system.scheduler.schedule(
+      poolConfig.prewarmExpirationCheckInitDelay,
+      interval,
+      self,
+      AdjustPrewarmedContainer)
+  }
+
+  val resourceSubmitter = context.system.scheduler.schedule(0.seconds, poolConfig.memorySyncInterval) {
+    syncMemoryInfo
+  }
+
+  private def logContainerStart(c: ContainerCreationMessage, action: WhiskAction, containerState: String): Unit = {
+    val FQN = c.action
+    if (FQN.namespace.name == "whisk.system" && FQN.fullPath.segments > 2) {
+      MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_SHAREDPACKAGE(FQN.fullPath.asString))
+    }
+
+    MetricEmitter.emitCounterMetric(
+      LoggingMarkers.INVOKER_CONTAINER_START(
+        containerState,
+        c.invocationNamespace,
+        c.action.namespace.toString,
+        c.action.name.toString))
+  }
+
+  def receive: Receive = {
+    case PrewarmContainer(maxConcurrent) =>
+      if (prewarmConfigQueue.isEmpty) {
+        preWarmScheduler.map(_.cancel())
+        preWarmScheduler = None
+      } else {
+        for (_ <- 1 to maxConcurrent if !prewarmConfigQueue.isEmpty) {
+          val ((codeExec, byteSize, ttl), newQueue) = prewarmConfigQueue.dequeue
+          prewarmConfigQueue = newQueue
+          prewarmContainer(codeExec, byteSize, ttl)
+        }
+      }
+
+    case CreationContainer(create: ContainerCreationMessage, action: WhiskAction) =>
+      if (shuttingDown) {
+        val message =
+          s"creationId: ${create.creationId}, invoker is shutting down, reschedule ${action.fullyQualifiedName(false)}"
+        val ack = ContainerCreationAckMessage(
+          create.transid,
+          create.creationId,
+          create.invocationNamespace,
+          create.action,
+          create.revision,
+          create.whiskActionMetaData,
+          instance,
+          create.schedulerHost,
+          create.rpcPort,
+          create.retryCount,
+          Some(ShuttingDownError),
+          Some(message))
+        logging.warn(this, message)
+        sendAckToScheduler(create.rootSchedulerIndex, ack)
+      } else {
+        logging.info(this, s"received a container creation message: ${create.creationId}")
+        action.toExecutableWhiskAction match {
+          case Some(executable) =>
+            val createdContainer =
+              takeWarmedContainer(executable, create.invocationNamespace, create.revision)
+                .map(container => (container, "warmed"))
+                .orElse {
+                  takeContainer(executable)
+                }
+            handleChosenContainer(create, executable, createdContainer)
+          case None =>
+            val message =
+              s"creationId: ${create.creationId}, non-executable action reached the container pool ${action.fullyQualifiedName(false)}"
+            logging.error(this, message)
+            val ack = ContainerCreationAckMessage(
+              create.transid,
+              create.creationId,
+              create.invocationNamespace,
+              create.action,
+              create.revision,
+              create.whiskActionMetaData,
+              instance,
+              create.schedulerHost,
+              create.rpcPort,
+              create.retryCount,
+              Some(NonExecutableActionError),
+              Some(message))
+            sendAckToScheduler(create.rootSchedulerIndex, ack)
+        }
+      }
+
+    case DeletionContainer(deletionMessage: ContainerDeletionMessage) =>
+      val oldRevision = deletionMessage.revision
+      val invocationNamespace = deletionMessage.invocationNamespace
+      val fqn = deletionMessage.action.copy(version = None)
+
+      warmedPool.foreach(warmed => {
+        val proxy = warmed._1
+        val data = warmed._2
+
+        if (data.invocationNamespace == invocationNamespace
+            && data.action.fullyQualifiedName(withVersion = false) == fqn.copy(version = None)
+            && data.revision <= oldRevision) {
+          proxy ! GracefulShutdown
+        }
+      })
+
+      busyPool.foreach(f = busy => {
+        val proxy = busy._1
+        busy._2 match {
+          case warmData: WarmData
+              if warmData.invocationNamespace == invocationNamespace
+                && warmData.action.fullyQualifiedName(withVersion = false) == fqn.copy(version = None)
+                && warmData.revision <= oldRevision =>
+            proxy ! GracefulShutdown
+          case initializedData: InitializedData
+              if initializedData.invocationNamespace == invocationNamespace
+                && initializedData.action.fullyQualifiedName(withVersion = false) == fqn.copy(version = None) =>
+            proxy ! GracefulShutdown
+          case _ => // Other actions are ignored.
+        }
+      })
+
+    case ReadyToWork(data) =>
+      prewarmStartingPool = prewarmStartingPool - sender()
+      prewarmedPool = prewarmedPool + (sender() -> data)
+      // after create prewarm successfully, reset the value to 0
+      if (prewarmCreateFailedCount.get() > 0) {
+        prewarmCreateFailedCount.set(0)
+      }
+
+    // Container is initialized
+    case Initialized(data) =>
+      busyPool = busyPool + (sender() -> data)
+      inProgressPool = inProgressPool - sender()
+      // container init completed, send creationAck(success) to scheduler
+      creationMessages.remove(sender()).foreach { msg =>
+        val ack = ContainerCreationAckMessage(
+          msg.transid,
+          msg.creationId,
+          msg.invocationNamespace,
+          msg.action,
+          msg.revision,
+          msg.whiskActionMetaData,
+          instance,
+          msg.schedulerHost,
+          msg.rpcPort,
+          msg.retryCount)
+        sendAckToScheduler(msg.rootSchedulerIndex, ack)
+      }
+
+    case Resumed(data) =>
+      busyPool = busyPool + (sender() -> data)
+      inProgressPool = inProgressPool - sender()
+      // container init completed, send creationAck(success) to scheduler
+      creationMessages.remove(sender()).foreach { msg =>
+        val ack = ContainerCreationAckMessage(
+          msg.transid,
+          msg.creationId,
+          msg.invocationNamespace,
+          msg.action,
+          msg.revision,
+          msg.whiskActionMetaData,
+          instance,
+          msg.schedulerHost,
+          msg.rpcPort,
+          msg.retryCount)
+        sendAckToScheduler(msg.rootSchedulerIndex, ack)
+      }
+
+    // if warmed containers is failed to resume, we should try to use other container or create a new one
+    case ResumeFailed(data) =>
+      inProgressPool = inProgressPool - sender()
+      creationMessages.remove(sender()).foreach { msg =>
+        val container = takeWarmedContainer(data.action, data.invocationNamespace, data.revision)
+          .map(container => (container, "warmed"))
+          .orElse {
+            takeContainer(data.action)
+          }
+        handleChosenContainer(msg, data.action, container)
+      }
+
+    case ContainerCreationFailed(t) =>
+      val (error, message) = t match {
+        case WhiskContainerStartupError(msg) => (WhiskError, msg)
+        case BlackboxStartupError(msg)       => (BlackBoxError, msg)
+        case _                               => (WhiskError, Messages.resourceProvisionError)
+      }
+      creationMessages.remove(sender()).foreach { msg =>
+        val ack = ContainerCreationAckMessage(
+          msg.transid,
+          msg.creationId,
+          msg.invocationNamespace,
+          msg.action,
+          msg.revision,
+          msg.whiskActionMetaData,
+          instance,
+          msg.schedulerHost,
+          msg.rpcPort,
+          msg.retryCount,
+          Some(error),
+          Some(message))
+        sendAckToScheduler(msg.rootSchedulerIndex, ack)
+      }
+
+    case ContainerIsPaused(data) =>
+      warmedPool = warmedPool + (sender() -> data)
+      busyPool = busyPool - sender() // remove container from busy pool
+
+    // Container got removed
+    case ContainerRemoved(replacePrewarm) =>
+      inProgressPool.get(sender()).foreach { _ =>
+        inProgressPool = inProgressPool - sender()
+      }
+
+      warmedPool.get(sender()).foreach { _ =>
+        warmedPool = warmedPool - sender()
+      }
+
+      // container was busy (busy indicates at full capacity), so there is capacity to accept another job request
+      busyPool.get(sender()).foreach { _ =>
+        busyPool = busyPool - sender()
+      }
+
+      //in case this was a prewarm
+      prewarmedPool.get(sender()).foreach { data =>
+        prewarmedPool = prewarmedPool - sender()
+        logging.info(
+          this,
+          s"${if (replacePrewarm) "failed" else "expired"} prewarm [kind: ${data.kind}, memory: ${data.memoryLimit.toString}] removed")
+      }
+
+      //in case this was a starting prewarm
+      prewarmStartingPool.get(sender()).foreach { data =>
+        logging.info(this, s"failed starting prewarm [kind: ${data._1}, memory: ${data._2.toString}] removed")
+        prewarmStartingPool = prewarmStartingPool - sender()
+        prewarmCreateFailedCount.incrementAndGet()
+      }
+
+      //backfill prewarms on every ContainerRemoved, just in case
+      if (replacePrewarm) {
+        adjustPrewarmedContainer(false, false) //in case a prewarm is removed due to health failure or crash
+      }
+
+      // there maybe a chance that container create failed or init grpc client failed,
+      // send creationAck(reschedule) to scheduler
+      creationMessages.remove(sender()).foreach { msg =>
+        val ack = ContainerCreationAckMessage(
+          msg.transid,
+          msg.creationId,
+          msg.invocationNamespace,
+          msg.action,
+          msg.revision,
+          msg.whiskActionMetaData,
+          instance,
+          msg.schedulerHost,
+          msg.rpcPort,
+          msg.retryCount,
+          Some(UnknownError),
+          Some("ContainerProxy init failed."))
+        sendAckToScheduler(msg.rootSchedulerIndex, ack)
+      }
+
+    case GracefulShutdown =>
+      shuttingDown = true
+      waitForPoolToClear()
+
+    case Enable =>
+      shuttingDown = false
+
+    case AdjustPrewarmedContainer =>
+      // Reset the prewarmCreateCount value when do expiration check and backfill prewarm if possible
+      prewarmCreateFailedCount.set(0)
+      adjustPrewarmedContainer(false, true)
+  }
+
+  /** Install prewarm containers up to the configured requirements for each kind/memory combination or specified kind/memory */
+  private def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
+    if (!shuttingDown) {
+      if (scheduled) {
+        //on scheduled time, remove expired prewarms
+        ContainerPoolV2.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
+          prewarmedPool = prewarmedPool - p
+          p ! Remove
+        }
+        //on scheduled time, emit cold start counter metric with memory + kind
+        coldStartCount foreach { coldStart =>
+          val coldStartKey = coldStart._1
+          MetricEmitter.emitCounterMetric(
+            LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString, coldStartKey.kind))
+        }
+      }
+
+      ContainerPoolV2
+        .increasePrewarms(
+          init,
+          scheduled,
+          coldStartCount,
+          prewarmConfig,
+          prewarmedPool,
+          prewarmStartingPool,
+          prewarmConfigQueue)
+        .foreach { c =>
+          val config = c._1
+          val currentCount = c._2._1
+          val desiredCount = c._2._2
+          if (prewarmCreateFailedCount.get() > poolConfig.prewarmMaxRetryLimit) {
+            logging.warn(
+              this,
+              s"[kind: ${config.exec.kind}, memory: ${config.memoryLimit.toString}] prewarm create failed count exceeds max retry limit: ${poolConfig.prewarmMaxRetryLimit}, currentCount: ${currentCount}, desiredCount: ${desiredCount}")
+          } else {
+            if (currentCount < desiredCount) {
+              (currentCount until desiredCount).foreach { _ =>
+                poolConfig.prewarmContainerCreationConfig match {
+                  case Some(_) =>
+                    prewarmConfigQueue =
+                      prewarmConfigQueue.enqueue((config.exec, config.memoryLimit, config.reactive.map(_.ttl)))
+                  case None =>
+                    prewarmContainer(config.exec, config.memoryLimit, config.reactive.map(_.ttl))
+                }
+              }
+            }
+          }
+        }
+
+      // run queue consumer
+      poolConfig.prewarmContainerCreationConfig.foreach(config => {
+        logging.info(
+          this,
+          s"prewarm container creation is starting with creation delay configuration [maxConcurrent: ${config.maxConcurrent}, creationDelay: ${config.creationDelay.toMillis} millisecond]")
+        if (preWarmScheduler.isEmpty) {
+          preWarmScheduler = Some(
+            context.system.scheduler
+              .schedule(0.seconds, config.creationDelay, self, PrewarmContainer(config.maxConcurrent)))
+        }
+      })
+
+      if (scheduled) {
+        //   lastly, clear coldStartCounts each time scheduled event is processed to reset counts
+        coldStartCount = immutable.Map.empty[ColdStartKey, Int]
+      }
+    }
+  }
+
+  private def syncMemoryInfo: Unit = {
+    val busyMemory = memoryConsumptionOf(busyPool)
+    val inProgressMemory = memoryConsumptionOf(inProgressPool)
+    invokerHealthService ! MemoryInfo(
+      poolConfig.userMemory.toMB - busyMemory - inProgressMemory,
+      busyMemory,
+      inProgressMemory)
+  }
+
+  /** Creates a new container and updates state accordingly. */
+  private def createContainer(memoryLimit: ByteSize): (ActorRef, Data) = {
+    val ref = childFactory(context)
+    val data = MemoryData(memoryLimit)
+    ref -> data
+  }
+
+  /** Creates a new prewarmed container */
+  private def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize, ttl: Option[FiniteDuration]): Unit = {
+    val newContainer = childFactory(context)
+    prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind, memoryLimit))
+    newContainer ! Start(exec, memoryLimit, ttl)
+  }
+
+  /** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */
+  def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = {
+    prewarmConfig
+      .filter { config =>
+        kind == config.exec.kind && memoryLimit == config.memoryLimit
+      }
+      .foreach { _ =>
+        val coldStartKey = ColdStartKey(kind, memoryLimit)
+        coldStartCount.get(coldStartKey) match {
+          case Some(value) => coldStartCount = coldStartCount + (coldStartKey -> (value + 1))
+          case None        => coldStartCount = coldStartCount + (coldStartKey -> 1)
+        }
+      }
+  }
+
+  /**
+   * Takes a warmed container out of the warmed pool
+   * iff a container with a matching revision is found
+   *
+   * @param action the action.
+   * @param invocationNamespace the invocation namespace for shared package.
+   * @param revision the DocRevision.
+   * @return the container iff found
+   */
+  private def takeWarmedContainer(action: ExecutableWhiskAction,
+                                  invocationNamespace: String,
+                                  revision: DocRevision): Option[(ActorRef, Data)] = {
+    warmedPool
+      .find {
+        case (_, WarmData(_, `invocationNamespace`, `action`, `revision`, _, _)) => true
+        case _                                                                   => false
+      }
+      .map {
+        case (ref, data) =>
+          warmedPool = warmedPool - ref
+          logging.info(this, s"Choose warmed container ${data.container.containerId}")
+          (ref, data)
+      }
+  }
+
+  /**
+   * Takes a prewarm container out of the prewarmed pool
+   * iff a container with a matching kind and suitable memory is found.
+   *
+   * @param action the action that holds the kind and the required memory.
+   * @param maximumMemory the maximum memory container can have
+   * @return the container iff found
+   */
+  private def takePrewarmContainer(action: ExecutableWhiskAction, maximumMemory: ByteSize): Option[(ActorRef, Data)] = {
+    val kind = action.exec.kind
+    val memory = action.limits.memory.megabytes.MB
+
+    // find a container with same kind and smallest memory
+    prewarmedPool.filter {
+      case (_, PreWarmData(_, `kind`, preMemory, _)) if preMemory >= memory && preMemory <= maximumMemory => true
+      case _                                                                                              => false
+    }.toList match {
+      case Nil =>
+        None
+      case res =>
+        val (ref, data) = res.minBy(_._2.memoryLimit)
+        prewarmedPool = prewarmedPool - ref
+
+        //get the appropriate ttl from prewarm configs
+        val ttl =
+          prewarmConfig.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind).flatMap(_.reactive.map(_.ttl))
+        prewarmContainer(action.exec, data.memoryLimit, ttl)
+        Some(ref, data)
+    }
+  }
+
+  /** Removes a container and updates state accordingly. */
+  def removeContainer(toDelete: ActorRef) = {
+    toDelete ! Remove
+    warmedPool = warmedPool - toDelete
+  }
+
+  /**
+   * Calculate if there is enough free memory within a given pool.
+   *
+   * @param pool The pool, that has to be checked, if there is enough free memory.
+   * @param memory The amount of memory to check.
+   * @return true, if there is enough space for the given amount of memory.
+   */
+  private def hasPoolSpaceFor[A](pool: Map[A, Data], memory: ByteSize): Boolean = {
+    memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
+  }
+
+  /**
+   * Make all busyPool's memoryQueue actor shutdown gracefully
+   */
+  private def waitForPoolToClear(): Unit = {
+    busyPool.keys.foreach(_ ! GracefulShutdown)
+    warmedPool.keys.foreach(_ ! GracefulShutdown)
+    if (inProgressPool.nonEmpty) {
+      context.system.scheduler.scheduleOnce(5.seconds) {
+        waitForPoolToClear()
+      }
+    }
+  }
+
+  /**
+   * take a prewarmed container or create a new one
+   *
+   * @param executable The executable whisk action
+   * @return
+   */
+  private def takeContainer(executable: ExecutableWhiskAction) = {
+    val freeMemory = poolConfig.userMemory.toMB - memoryConsumptionOf(busyPool ++ warmedPool ++ inProgressPool)
+    val deletableMemory = memoryConsumptionOf(warmedPool)
+    val requiredMemory = executable.limits.memory.megabytes
+    if (requiredMemory > freeMemory + deletableMemory) {
+      None
+    } else {
+      // try to take a preWarmed container whose memory doesn't exceed the max `usable` memory
+      takePrewarmContainer(
+        executable,
+        if (poolConfig.prewarmPromotion) (freeMemory + deletableMemory).MB else requiredMemory.MB) match {
+        // there is a suitable preWarmed container but not enough free memory for it, delete some warmed container first
+        case Some(container) if container._2.memoryLimit > freeMemory.MB =>
+          ContainerPoolV2
+            .remove(warmedPool, container._2.memoryLimit - freeMemory.MB)
+            .map(removeContainer)
+            .headOption
+            .map { _ =>
+              (container, "prewarmed")
+            }
+        // there is a suitable preWarmed container and enough free memory for it
+        case Some(container) =>
+          Some((container, "prewarmed"))
+        // there is no suitable preWarmed container and not enough free memory for the action
+        case None if executable.limits.memory.megabytes > freeMemory =>
+          ContainerPoolV2
+            .remove(warmedPool, executable.limits.memory.megabytes.MB - freeMemory.MB)
+            .map(removeContainer)
+            .headOption
+            .map { _ =>
+              incrementColdStartCount(executable.exec.kind, executable.limits.memory.megabytes.MB)
+              (createContainer(executable.limits.memory.megabytes.MB), "cold")
+            }
+        // there is no suitable preWarmed container and enough free memory
+        case None =>
+          incrementColdStartCount(executable.exec.kind, executable.limits.memory.megabytes.MB)
+          Some(createContainer(executable.limits.memory.megabytes.MB), "cold")
+
+        // this should not happen, but just for safety
+        case _ =>
+          None
+      }
+    }
+  }
+
+  private def handleChosenContainer(create: ContainerCreationMessage,
+                                    executable: ExecutableWhiskAction,
+                                    container: Option[((ActorRef, Data), String)]) = {
+    container match {
+      case Some(((proxy, data), containerState)) =>
+        // record creationMessage so when container created failed, we can send failed message to scheduler
+        creationMessages.getOrElseUpdate(proxy, create)
+        proxy ! Initialize(create.invocationNamespace, executable, create.schedulerHost, create.rpcPort, create.transid)
+        inProgressPool = inProgressPool + (proxy -> data)
+        logContainerStart(create, executable.toWhiskAction, containerState)
+
+      case None =>
+        val message =
+          s"creationId: ${create.creationId}, invoker[$instance] doesn't have enough resource for container: ${create.action}"
+        logging.info(this, message)
+        syncMemoryInfo
+        val ack = ContainerCreationAckMessage(
+          create.transid,
+          create.creationId,
+          create.invocationNamespace,
+          create.action,
+          create.revision,
+          create.whiskActionMetaData,
+          instance,
+          create.schedulerHost,
+          create.rpcPort,
+          create.retryCount,
+          Some(ResourceNotEnoughError),
+          Some(message))
+        sendAckToScheduler(create.rootSchedulerIndex, ack)
+    }
+  }
+}
+
+object ContainerPoolV2 {
+
+  /**
+   * Calculate the memory of a given pool.
+   *
+   * @param pool The pool with the containers.
+   * @return The memory consumption of all containers in the pool in Megabytes.
+   */
+  protected[containerpool] def memoryConsumptionOf[A](pool: Map[A, Data]): Long = {
+    pool.map(_._2.memoryLimit.toMB).sum
+  }
+
+  /**
+   * Finds the oldest previously used container to remove to make space for the job passed to run.
+   * Depending on the space that has to be allocated, several containers might be removed.
+   *
+   * NOTE: This method is never called to remove an action that is in the pool already,
+   * since this would be picked up earlier in the scheduler and the container reused.
+   *
+   * @param pool a map of all free containers in the pool
+   * @param memory the amount of memory that has to be freed up
+   * @return a list of containers to be removed iff found
+   */
+  @tailrec
+  protected[containerpool] def remove[A](pool: Map[A, WarmData],
+                                         memory: ByteSize,
+                                         toRemove: List[A] = List.empty): List[A] = {
+    if (memory > 0.B && pool.nonEmpty && memoryConsumptionOf(pool) >= memory.toMB) {
+      // Remove the oldest container if:
+      // - there is more memory required
+      // - there are still containers that can be removed
+      // - there are enough free containers that can be removed
+      val (ref, data) = pool.minBy(_._2.lastUsed)
+      // Catch exception if remaining memory will be negative
+      val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B)
+      remove(pool - ref, remainingMemory, toRemove ++ List(ref))
+    } else {
+      // If this is the first call: All containers are in use currently, or there is more memory needed than
+      // containers can be removed.
+      // Or, if this is one of the recursions: Enough containers are found to get the memory, that is
+      // necessary. -> Abort recursion
+      toRemove
+    }
+  }
+
+  /**
+   * Find the expired actor in prewarmedPool
+   *
+   * @param poolConfig
+   * @param prewarmConfig
+   * @param prewarmedPool
+   * @param logging
+   * @return a list of expired actor
+   */
+  def removeExpired[A](poolConfig: ContainerPoolConfig,
+                       prewarmConfig: List[PrewarmingConfig],
+                       prewarmedPool: Map[A, PreWarmData])(implicit logging: Logging): List[A] = {
+    val now = Deadline.now
+    val expireds = prewarmConfig
+      .flatMap { config =>
+        val kind = config.exec.kind
+        val memory = config.memoryLimit
+        config.reactive
+          .map { c =>
+            val expiredPrewarmedContainer = prewarmedPool.toSeq
+              .filter { warmInfo =>
+                warmInfo match {
+                  case (_, p @ PreWarmData(_, `kind`, `memory`, _)) if p.isExpired() => true
+                  case _                                                             => false
+                }
+              }
+              .sortBy(_._2.expires.getOrElse(now))
+
+            // emit expired container counter metric with memory + kind
+            MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
+            if (expiredPrewarmedContainer.nonEmpty) {
+              logging.info(
+                this,
+                s"[kind: ${kind} memory: ${memory.toString}] ${expiredPrewarmedContainer.size} expired prewarmed containers")
+            }
+            expiredPrewarmedContainer.map(e => (e._1, e._2.expires.getOrElse(now)))
+          }
+          .getOrElse(List.empty)
+      }
+      .sortBy(_._2) //need to sort these so that if the results are limited, we take the oldest
+      .map(_._1)
+    if (expireds.nonEmpty) {
+      logging.info(this, s"removing up to ${poolConfig.prewarmExpirationLimit} of ${expireds.size} expired containers")
+      expireds.take(poolConfig.prewarmExpirationLimit).foreach { e =>
+        prewarmedPool.get(e).map { d =>
+          logging.info(this, s"removing expired prewarm of kind ${d.kind} with container ${d.container} ")
+        }
+      }
+    }
+    expireds.take(poolConfig.prewarmExpirationLimit)
+  }
+
+  /**
+   * Find the increased number for the prewarmed kind
+   *
+   * @param init
+   * @param scheduled
+   * @param coldStartCount
+   * @param prewarmConfig
+   * @param prewarmedPool
+   * @param prewarmStartingPool
+   * @param logging
+   * @return the current number and increased number for the kind in the Map
+   */
+  def increasePrewarms(init: Boolean,
+                       scheduled: Boolean,
+                       coldStartCount: Map[ColdStartKey, Int],
+                       prewarmConfig: List[PrewarmingConfig],
+                       prewarmedPool: Map[ActorRef, PreWarmData],
+                       prewarmStartingPool: Map[ActorRef, (String, ByteSize)],
+                       prewarmQueue: Queue[(CodeExec[_], ByteSize, Option[FiniteDuration])])(
+    implicit logging: Logging): Map[PrewarmingConfig, (Int, Int)] = {
+    prewarmConfig.map { config =>
+      val kind = config.exec.kind
+      val memory = config.memoryLimit
+
+      val runningCount = prewarmedPool.count {
+        // done starting (include expired, since they may not have been removed yet)
+        case (_, p @ PreWarmData(_, `kind`, `memory`, _)) => true
+        // started but not finished starting (or expired)
+        case _ => false
+      }
+      val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && p._2._2 == memory)
+      val queuingCount = prewarmQueue.count(p => p._1.kind == kind && p._2 == memory)
+      val currentCount = runningCount + startingCount + queuingCount
+
+      // determine how many are needed
+      val desiredCount: Int =
+        if (init) config.initialCount
+        else {
+          if (scheduled) {
+            // scheduled/reactive config backfill
+            config.reactive
+              .map(c => ContainerPool.getReactiveCold(coldStartCount, c, kind, memory).getOrElse(c.minCount)) //reactive -> desired is either cold start driven, or minCount
+              .getOrElse(config.initialCount) //not reactive -> desired is always initial count
+          } else {
+            // normal backfill after removal - make sure at least minCount or initialCount is started
+            config.reactive.map(_.minCount).getOrElse(config.initialCount)
+          }
+        }
+
+      if (currentCount < desiredCount) {
+        logging.info(
+          this,
+          s"found ${currentCount} started and ${startingCount} starting and ${queuingCount} queuing; ${if (init) "initing"
+          else "backfilling"} ${desiredCount - currentCount} pre-warms to desired count: ${desiredCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
+          TransactionId.invokerWarmup)
+      }
+      (config, (currentCount, desiredCount))
+    }.toMap
+  }
+
+  def props(factory: ActorRefFactory => ActorRef,
+            invokerHealthService: ActorRef,
+            poolConfig: ContainerPoolConfig,
+            instance: InvokerInstanceId,
+            prewarmConfig: List[PrewarmingConfig] = List.empty,
+            sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+    implicit logging: Logging): Props = {
+    Props(
+      new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService,
+        poolConfig,
+        instance,
+        prewarmConfig,
+        sendAckToScheduler))
+  }
+}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index 2a33a0d..6f943d8 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -25,13 +25,15 @@ import org.apache.openwhisk.core.containerpool.Container
 import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
 import org.apache.openwhisk.core.entity.size._
 
+import scala.concurrent.duration.{Deadline, FiniteDuration}
+
 // Events received by the actor
 case class Initialize(invocationNamespace: String,
                       action: ExecutableWhiskAction,
                       schedulerHost: String,
                       rpcPort: Int,
                       transId: TransactionId)
-case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize, ttl: Option[FiniteDuration] = None)
 
 // Event sent by the actor
 case class ContainerCreationFailed(throwable: Throwable)
@@ -40,12 +42,11 @@ case class ClientCreationFailed(throwable: Throwable,
                                 container: Container,
                                 invocationNamespace: String,
                                 action: ExecutableWhiskAction)
-case class ReadyToWork(data: Data)
+case class ReadyToWork(data: PreWarmData)
 case class Initialized(data: InitializedData)
 case class Resumed(data: WarmData)
 case class ResumeFailed(data: WarmData)
 case class RecreateClient(action: ExecutableWhiskAction)
-case object ContainerRemoved // when container is destroyed
 
 // States
 sealed trait ProxyState
@@ -72,9 +73,13 @@ case class MemoryData(override val memoryLimit: ByteSize) extends Data(memoryLim
   override def getContainer = None
 }
 trait WithClient { val clientProxy: ActorRef }
-case class PreWarmData(container: Container, kind: String, override val memoryLimit: ByteSize)
+case class PreWarmData(container: Container,
+                       kind: String,
+                       override val memoryLimit: ByteSize,
+                       expires: Option[Deadline] = None)
     extends Data(memoryLimit) {
   override def getContainer = Some(container)
+  def isExpired(): Boolean = expires.exists(_.isOverdue())
 }
 
 case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
index 8372fc1..71ab9a0 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -22,6 +22,7 @@ import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, St
 import akka.util.Timeout
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool.ContainerRemoved
 import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
 import org.apache.openwhisk.core.entitlement.Privilege
 import org.apache.openwhisk.core.entity.size._
@@ -66,7 +67,7 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,
   }
 
   when(Unhealthy) {
-    case Event(ContainerRemoved, _) =>
+    case Event(ContainerRemoved(_), _) =>
       healthActionProxy = None
       startTestAction(self)
       stay
@@ -90,7 +91,7 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,
       // Initialized messages sent by ContainerProxy for HealthManger
       stay()
 
-    case Event(ContainerRemoved, _) =>
+    case Event(ContainerRemoved(_), _) =>
       // Drop messages sent by ContainerProxy for HealthManger
       healthActionProxy = None
       stay()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
index 451faff..1e0950f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
@@ -72,6 +72,8 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
     producer.close()
   }
 
+  def getProducer(): MessageProducer = producer
+
   private val producer = new MessageProducer {
     def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata] = {
       queue.synchronized {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index e6cd99f..4496742 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -84,7 +84,7 @@ class MesosContainerFactoryTest
   }
 
   // 80 slots, each 265MB
-  val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, 1.minute, None, 100)
+  val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.seconds)
   val actionMemory = 265.MB
   val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index bc10350..eff4b92 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -134,7 +134,7 @@ class ContainerPoolTests
   }
 
   def poolConfig(userMemory: ByteSize) =
-    ContainerPoolConfig(userMemory, 0.5, false, 1.minute, None, 100)
+    ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
 
   behavior of "ContainerPool"
 
@@ -805,9 +805,20 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     stream.reset()
+    val prewarmExpirationCheckInitDelay = FiniteDuration(2, TimeUnit.SECONDS)
     val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
     val poolConfig =
-      ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
+      ContainerPoolConfig(
+        MemoryLimit.STD_MEMORY * 4,
+        0.5,
+        false,
+        prewarmExpirationCheckInitDelay,
+        prewarmExpirationCheckIntervel,
+        None,
+        100,
+        3,
+        false,
+        1.second)
     val initialCount = 2
     val pool =
       system.actorOf(
@@ -840,9 +851,20 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     stream.reset()
+    val prewarmExpirationCheckInitDelay = 2.seconds
     val prewarmExpirationCheckIntervel = 2.seconds
     val poolConfig =
-      ContainerPoolConfig(MemoryLimit.STD_MEMORY * 12, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
+      ContainerPoolConfig(
+        MemoryLimit.STD_MEMORY * 12,
+        0.5,
+        false,
+        prewarmExpirationCheckInitDelay,
+        prewarmExpirationCheckIntervel,
+        None,
+        100,
+        3,
+        false,
+        1.second)
     val minCount = 0
     val initialCount = 2
     val maxCount = 4
@@ -1215,7 +1237,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
   }
 
   it should "remove expired in order of expiration" in {
-    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 1)
+    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second)
     val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
     //use a second kind so that we know sorting is not isolated to the expired of each kind
     val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None)
@@ -1239,7 +1261,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
 
   it should "remove only the prewarmExpirationLimit of expired prewarms" in {
     //limit prewarm removal to 2
-    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 2)
+    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second)
     val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
     val memoryLimit = 256.MB
     val prewarmConfig =
@@ -1265,7 +1287,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
 
   it should "remove only the expired prewarms regardless of minCount" in {
     //limit prewarm removal to 100
-    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 100)
+    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second)
     val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
     val memoryLimit = 256.MB
     //minCount is 2 - should leave at least 2 prewarms when removing expired
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index dc8a33f..87aeaf1 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -276,7 +276,7 @@ class ContainerProxyTests
     (transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) =>
       Future.successful(())
   }
-  val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 1.minute, None, 100)
+  val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
   def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
   val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
new file mode 100644
index 0000000..e6ed5ed
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -0,0 +1,1152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2.test
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.stream.ActorMaterializer
+import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
+import common.StreamLogging
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.{Enable, GracefulShutdown, TransactionId}
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.connector.ContainerCreationError._
+import org.apache.openwhisk.core.connector.test.TestConnector
+import org.apache.openwhisk.core.connector.{
+  ContainerCreationAckMessage,
+  ContainerCreationError,
+  ContainerCreationMessage,
+  MessageProducer
+}
+import org.apache.openwhisk.core.containerpool.docker.DockerContainer
+import org.apache.openwhisk.core.containerpool.v2._
+import org.apache.openwhisk.core.containerpool.{
+  Container,
+  ContainerAddress,
+  ContainerPoolConfig,
+  ContainerRemoved,
+  PrewarmContainerCreationConfig,
+  PrewarmingConfig
+}
+import org.apache.openwhisk.core.database.test.DbUtils
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, ReactivePrewarmingConfig, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.utils.{retry => utilRetry}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+import org.scalatest.concurrent.Eventually
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/**
+ * Behavior tests for the ContainerPool
+ *
+ * These tests test the runtime behavior of a ContainerPool actor.
+ */
+@RunWith(classOf[JUnitRunner])
+class FunctionPullingContainerPoolTests
+    extends TestKit(ActorSystem("FunctionPullingContainerPool"))
+    with ImplicitSender
+    with FlatSpecLike
+    with Matchers
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with MockFactory
+    with StreamLogging
+    with Eventually
+    with DbUtils {
+
+  override def afterAll = {
+    TestKit.shutdownActorSystem(system)
+    super.afterAll()
+  }
+  override def afterEach = {
+    cleanup()
+    super.afterEach()
+  }
+
+  private val config = new WhiskConfig(ExecManifest.requiredProperties)
+  ExecManifest.initialize(config) should be a 'success
+
+  val timeout = 5.seconds
+
+  private implicit val mt = ActorMaterializer()
+  private implicit val transId = TransactionId.testing
+  private implicit val creationId = CreationId.generate()
+
+  // Common entities to pass to the tests. We don't really care what's inside
+  // those for the behavior testing here, as none of the contents will really
+  // reach a container anyway. We merely assert that passing and extraction of
+  // the values is done properly.
+  private val actionKind = "nodejs:8"
+  private val exec = CodeExecAsString(RuntimeManifest(actionKind, ImageName("testImage")), "testCode", None)
+  private val memoryLimit = MemoryLimit.STD_MEMORY.toMB.MB
+  private val whiskAction = WhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec)
+  private val invocationNamespace = EntityName("invocationSpace")
+  private val schedulerHost = "127.17.0.1"
+  private val rpcPort = 13001
+  private val isBlockboxInvocation = false
+  private val bigWhiskAction = WhiskAction(
+    EntityPath("actionSpace"),
+    EntityName("bigActionName"),
+    exec,
+    limits = ActionLimits(memory = MemoryLimit(memoryLimit * 2)))
+  private val execMetadata = CodeExecMetaDataAsString(exec.manifest, entryPoint = exec.entryPoint)
+  private val actionMetaData =
+    WhiskActionMetaData(
+      whiskAction.namespace,
+      whiskAction.name,
+      execMetadata,
+      whiskAction.parameters,
+      whiskAction.limits,
+      whiskAction.version,
+      whiskAction.publish,
+      whiskAction.annotations)
+  private val bigActionMetaData =
+    WhiskActionMetaData(
+      bigWhiskAction.namespace,
+      bigWhiskAction.name,
+      execMetadata,
+      bigWhiskAction.parameters,
+      bigWhiskAction.limits,
+      bigWhiskAction.version,
+      bigWhiskAction.publish,
+      bigWhiskAction.annotations)
+  private val invokerHealthService = TestProbe()
+  private val schedulerInstanceId = SchedulerInstanceId("0")
+  private val producer = stub[MessageProducer]
+  private val prewarmedData = PreWarmData(mock[MockableV2Container], actionKind, memoryLimit)
+  private val initializedData =
+    InitializedData(
+      mock[MockableV2Container],
+      invocationNamespace.asString,
+      whiskAction.toExecutableWhiskAction.get,
+      TestProbe().ref)
+
+  private val entityStore = WhiskEntityStore.datastore()
+  private val invokerInstance = InvokerInstanceId(0, userMemory = 0 B)
+  private val creationMessage =
+    ContainerCreationMessage(
+      transId,
+      invocationNamespace.asString,
+      whiskAction.fullyQualifiedName(true),
+      DocRevision.empty,
+      actionMetaData,
+      schedulerInstanceId,
+      schedulerHost,
+      rpcPort,
+      creationId = creationId)
+  private val creationMessageLarge =
+    ContainerCreationMessage(
+      transId,
+      invocationNamespace.asString,
+      bigWhiskAction.fullyQualifiedName(true),
+      DocRevision.empty,
+      bigActionMetaData,
+      schedulerInstanceId,
+      schedulerHost,
+      rpcPort,
+      creationId = creationId)
+
+  /** Creates a sequence of containers and a factory returning this sequence. */
+  def testContainers(n: Int) = {
+    val containers = (0 to n).map(_ => TestProbe())
+    val queue = mutable.Queue(containers: _*)
+    val factory = (fac: ActorRefFactory) => queue.dequeue().ref
+    (containers, factory)
+  }
+
+  def poolConfig(userMemory: ByteSize,
+                 memorySyncInterval: FiniteDuration = FiniteDuration(1, TimeUnit.SECONDS),
+                 prewarmMaxRetryLimit: Int = 3,
+                 prewarmPromotion: Boolean = false,
+                 prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) =
+    ContainerPoolConfig(
+      userMemory,
+      0.5,
+      false,
+      FiniteDuration(2, TimeUnit.SECONDS),
+      FiniteDuration(1, TimeUnit.MINUTES),
+      None,
+      100,
+      prewarmMaxRetryLimit,
+      prewarmPromotion,
+      memorySyncInterval,
+      prewarmContainerCreationConfig)
+
+  def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
+                                                    ackMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
+    val topic = s"creationAck${schedulerInstanceId.asString}"
+    producer.send(topic, ackMessage)
+  }
+
+  behavior of "ContainerPool"
+
+  /*
+   * CONTAINER SCHEDULING
+   *
+   * These tests only test the simplest approaches. Look below for full coverage tests
+   * of the respective scheduling methods.
+   */
+
+  it should "create a container if it cannot find a matching prewarmed container" in within(timeout) {
+    val (containers, factory) = testContainers(2)
+    val doc = put(entityStore, whiskAction)
+    // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig(MemoryLimit.STD_MEMORY * 4),
+          invokerInstance,
+          List.empty,
+          sendAckToScheduler(producer))))
+
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(1).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+  }
+
+  it should "not start a new container if there is not enough space in the pool" in within(timeout) {
+    val (containers, factory) = testContainers(2)
+    val doc = put(entityStore, whiskAction)
+    val bigDoc = put(entityStore, bigWhiskAction)
+    // use a fake producer here so sendAckToScheduler won't failed
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig(MemoryLimit.STD_MEMORY * 2),
+          invokerInstance,
+          List.empty,
+          sendAckToScheduler(producer))))
+
+    // Start first action
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    // Send second action to the pool
+    pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction) // message is too large to be processed immediately.
+    containers(1).expectNoMessage(100.milliseconds)
+
+    // First container is removed
+    containers(0).send(pool, ContainerRemoved(true)) // pool is empty again.
+
+    pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction)
+    // Second container should run now
+    containers(1).expectMsgPF() {
+      case Initialize(invocationNamespace, bigExecuteAction, schedulerHost, rpcPort, _) => true
+    }
+  }
+
+  it should "not start a new container if it is shut down" in within(timeout) {
+    val (containers, factory) = testContainers(1)
+    val doc = put(entityStore, bigWhiskAction)
+    val topic = s"creationAck${schedulerInstanceId.asString}"
+    val consumer = new TestConnector(topic, 4, true)
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig(MemoryLimit.STD_MEMORY),
+          invokerInstance,
+          List.empty,
+          sendAckToScheduler(consumer.getProducer()))))
+
+    pool ! GracefulShutdown
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
+
+    containers(0).expectNoMessage()
+
+    val error =
+      s"creationId: ${creationMessage.creationId}, invoker is shutting down, reschedule ${creationMessage.action.copy(version = None)}"
+    val ackMessage =
+      createAckMsg(creationMessage.copy(revision = doc.rev), Some(ShuttingDownError), Some(error))
+
+    utilRetry({
+      val buffer = consumer.peek(50.millisecond)
+      buffer.size shouldBe 1
+      buffer.head._1 shouldBe topic
+      buffer.head._4 shouldBe ackMessage.serialize.getBytes
+    }, 10, Some(500.millisecond))
+
+    // pool should be back to work after enabled again
+    pool ! Enable
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+  }
+
+  it should "create prewarmed containers on startup" in within(timeout) {
+    stream.reset()
+    val (containers, factory) = testContainers(1)
+
+    val pool = system.actorOf(
+      Props(new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService.ref,
+        poolConfig(MemoryLimit.STD_MEMORY * 2),
+        invokerInstance,
+        List(PrewarmingConfig(1, exec, memoryLimit)),
+        sendAckToScheduler(producer))))
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    stream.toString should include("initing 1 pre-warms to desired count: 1")
+    stream.toString should not include ("prewarm container creation is starting with creation delay configuration")
+  }
+
+  it should "create prewarmed containers on startup with creation delay configuration" in within(7.seconds) {
+    stream.reset()
+    val (containers, factory) = testContainers(3)
+    val prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] =
+      Some(PrewarmContainerCreationConfig(1, 3.seconds))
+
+    val poolConfig = ContainerPoolConfig(
+      MemoryLimit.STD_MEMORY * 3,
+      0.5,
+      false,
+      FiniteDuration(10, TimeUnit.SECONDS),
+      FiniteDuration(10, TimeUnit.SECONDS),
+      None,
+      100,
+      3,
+      false,
+      FiniteDuration(10, TimeUnit.SECONDS),
+      prewarmContainerCreationConfig)
+
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig,
+          invokerInstance,
+          List(PrewarmingConfig(3, exec, memoryLimit)),
+          sendAckToScheduler(producer))))
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    containers(1).expectNoMessage(2.seconds)
+    containers(1).expectMsg(4.seconds, Start(exec, memoryLimit))
+    containers(2).expectNoMessage(2.seconds)
+    containers(2).expectMsg(4.seconds, Start(exec, memoryLimit))
+    stream.toString should include("prewarm container creation is starting with creation delay configuration")
+  }
+
+  it should "backfill prewarms when prewarm containers are removed" in within(timeout) {
+    val (containers, factory) = testContainers(6)
+
+    val pool = system.actorOf(
+      Props(new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService.ref,
+        poolConfig(MemoryLimit.STD_MEMORY * 2),
+        invokerInstance,
+        List(PrewarmingConfig(2, exec, memoryLimit)),
+        sendAckToScheduler(producer))))
+
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    containers(1).expectMsg(Start(exec, memoryLimit))
+
+    //removing 2 prewarm containers will start 2 containers via backfill
+    containers(0).send(pool, ContainerRemoved(true))
+    containers(1).send(pool, ContainerRemoved(true))
+    containers(2).expectMsg(Start(exec, memoryLimit))
+    containers(3).expectMsg(Start(exec, memoryLimit))
+    //make sure extra prewarms are not started
+    containers(4).expectNoMessage(100.milliseconds)
+    containers(5).expectNoMessage(100.milliseconds)
+  }
+
+  it should "use a prewarmed container when kind and memory are both match and create a new one to fill its place when prewarmPromotion is false" in within(
+    timeout) {
+    val (containers, factory) = testContainers(4)
+    val doc = put(entityStore, whiskAction)
+    val biggerMemory = memoryLimit * 2
+
+    val pool = system.actorOf(
+      Props(new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService.ref,
+        poolConfig(MemoryLimit.STD_MEMORY * 6, prewarmPromotion = false),
+        invokerInstance,
+        List(PrewarmingConfig(1, exec, memoryLimit), PrewarmingConfig(1, exec, biggerMemory)),
+        sendAckToScheduler(producer))))
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    containers(1).expectMsg(Start(exec, biggerMemory))
+
+    // prewarm container is started
+    containers(0).send(pool, ReadyToWork(prewarmedData))
+    containers(1).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = biggerMemory)))
+
+    // the prewarm container with matched memory should be chose
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    // prewarm a new container
+    containers(2).expectMsgPF() {
+      case Start(exec, memoryLimit, _) => true
+    }
+
+    // the prewarm container with bigger memory should not be chose
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(3).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+  }
+
+  it should "use a prewarmed container when kind is matched and create a new one to fill its place when prewarmPromotion is true" in within(
+    timeout) {
+    val (containers, factory) = testContainers(6)
+    val doc = put(entityStore, whiskAction)
+    val biggerMemory = memoryLimit * 2
+    val biggestMemory = memoryLimit * 3
+
+    val pool = system.actorOf(
+      Props(new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService.ref,
+        poolConfig(MemoryLimit.STD_MEMORY * 6, prewarmPromotion = true),
+        invokerInstance,
+        List(PrewarmingConfig(1, exec, memoryLimit), PrewarmingConfig(1, exec, biggestMemory)),
+        sendAckToScheduler(producer))))
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    containers(1).expectMsg(Start(exec, biggestMemory))
+
+    // two prewarm containers are started
+    containers(0).send(pool, ReadyToWork(prewarmedData))
+    containers(1).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = biggestMemory)))
+
+    // the prewarm container with smallest memory should be chose
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    // prewarm a new container
+    containers(2).expectMsgPF() {
+      case Start(exec, memoryLimit, _) => true
+    }
+
+    // the prewarm container with bigger memory should be chose
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(1).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    // prewarm a new container
+    containers(3).expectMsgPF() {
+      case Start(exec, biggestMemory, _) => true
+    }
+
+    // now free memory is (6 - 3 - 1) * stdMemory, and required 2 * stdMemory, so both two prewarmed containers are not suitable
+    // a new container should be created
+    pool ! CreationContainer(creationMessageLarge.copy(revision = doc.rev), bigWhiskAction)
+    containers(4).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    // no new prewarmed container should be created
+    containers(6).expectNoMessage(500.milliseconds)
+  }
+
+  it should "not use a prewarmed container if it doesn't fit the kind" in within(timeout) {
+    val (containers, factory) = testContainers(2)
+    val doc = put(entityStore, whiskAction)
+
+    val alternativeExec = CodeExecAsString(RuntimeManifest("anotherKind", ImageName("testImage")), "testCode", None)
+
+    val pool = system.actorOf(
+      Props(new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService.ref,
+        poolConfig(MemoryLimit.STD_MEMORY),
+        invokerInstance,
+        List(PrewarmingConfig(1, alternativeExec, memoryLimit)),
+        sendAckToScheduler(producer))))
+
+    containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
+    containers(0).send(pool, ReadyToWork(prewarmedData.copy(kind = alternativeExec.kind))) // container0 was started
+
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(1).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+  }
+
+  it should "not use a prewarmed container if it doesn't fit memory wise" in within(timeout) {
+    val (containers, factory) = testContainers(2)
+    val doc = put(entityStore, whiskAction)
+
+    val alternativeLimit = 128.MB
+
+    val pool = system.actorOf(
+      Props(new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService.ref,
+        poolConfig(MemoryLimit.STD_MEMORY),
+        invokerInstance,
+        List(PrewarmingConfig(1, exec, alternativeLimit)),
+        sendAckToScheduler(producer))))
+
+    containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
+    containers(0).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = alternativeLimit))) // container0 was started
+
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(1).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+  }
+
+  it should "use a warmed container when invocationNamespace, action and revision matched" in within(timeout) {
+    val (containers, factory) = testContainers(3)
+    val doc = put(entityStore, whiskAction)
+
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig(MemoryLimit.STD_MEMORY * 4),
+          invokerInstance,
+          List.empty,
+          sendAckToScheduler(producer))))
+
+    // register a fake warmed container
+    val container = TestProbe()
+    pool.tell(
+      ContainerIsPaused(
+        WarmData(
+          stub[DockerContainer],
+          invocationNamespace.asString,
+          whiskAction.toExecutableWhiskAction.get,
+          doc.rev,
+          Instant.now,
+          TestProbe().ref)),
+      container.ref)
+
+    // the revision doesn't match, create 1 container
+    pool ! CreationContainer(creationMessage, whiskAction)
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    // the invocation namespace doesn't match, create 1 container
+    pool ! CreationContainer(creationMessage.copy(invocationNamespace = "otherNamespace"), whiskAction)
+    containers(1).expectMsgPF() {
+      case Initialize("otherNamespace", executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    container.expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    // warmed container is occupied, create 1 more container
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(2).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+  }
+
+  it should "retry when chosen warmed container is failed to resume" in within(timeout) {
+
+    val (containers, factory) = testContainers(2)
+    val doc = put(entityStore, whiskAction)
+
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig(MemoryLimit.STD_MEMORY * 2),
+          invokerInstance,
+          List.empty,
+          sendAckToScheduler(producer))))
+
+    // register a fake warmed container
+    val container = TestProbe()
+    pool.tell(
+      ContainerIsPaused(
+        WarmData(
+          stub[DockerContainer],
+          invocationNamespace.asString,
+          whiskAction.toExecutableWhiskAction.get,
+          doc.rev,
+          Instant.now,
+          TestProbe().ref)),
+      container.ref)
+
+    // choose the warmed container
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    container.expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    // warmed container is failed to resume
+    pool.tell(
+      ResumeFailed(
+        WarmData(
+          stub[DockerContainer],
+          invocationNamespace.asString,
+          whiskAction.toExecutableWhiskAction.get,
+          doc.rev,
+          Instant.now,
+          TestProbe().ref)),
+      container.ref)
+
+    // then a new container will be created
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+  }
+
+  it should "remove oldest previously used container to make space for the job passed to run" in within(timeout) {
+    val (containers, factory) = testContainers(2)
+    val doc = put(entityStore, whiskAction)
+
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig(MemoryLimit.STD_MEMORY * 3),
+          invokerInstance,
+          List.empty,
+          sendAckToScheduler(producer))))
+
+    // register three fake warmed containers, so now pool has no space for new container
+    val container1 = TestProbe()
+    pool.tell(
+      ContainerIsPaused(
+        WarmData(
+          stub[DockerContainer],
+          invocationNamespace.asString,
+          whiskAction.toExecutableWhiskAction.get,
+          doc.rev,
+          Instant.now,
+          TestProbe().ref)),
+      container1.ref)
+
+    val container2 = TestProbe()
+    pool.tell(
+      ContainerIsPaused(
+        WarmData(
+          stub[DockerContainer],
+          invocationNamespace.asString,
+          whiskAction.toExecutableWhiskAction.get,
+          doc.rev,
+          Instant.now,
+          TestProbe().ref)),
+      container2.ref)
+
+    val container3 = TestProbe()
+    pool.tell(
+      ContainerIsPaused(
+        WarmData(
+          stub[DockerContainer],
+          invocationNamespace.asString,
+          whiskAction.toExecutableWhiskAction.get,
+          doc.rev,
+          Instant.now,
+          TestProbe().ref)),
+      container3.ref)
+
+    // now the pool has no free memory, and new job needs 2*stdMemory, so it needs to remove two warmed containers
+    pool ! CreationContainer(creationMessage, bigWhiskAction)
+    container1.expectMsg(Remove)
+    container2.expectMsg(Remove)
+    container3.expectNoMessage()
+
+    // a new container will be created
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+  }
+
+  private def createAckMsg(creationMessage: ContainerCreationMessage,
+                           error: Option[ContainerCreationError],
+                           reason: Option[String]) = {
+    ContainerCreationAckMessage(
+      creationMessage.transid,
+      creationMessage.creationId,
+      invocationNamespace.asString,
+      creationMessage.action,
+      creationMessage.revision,
+      creationMessage.whiskActionMetaData,
+      invokerInstance,
+      creationMessage.schedulerHost,
+      creationMessage.rpcPort,
+      creationMessage.retryCount,
+      error,
+      reason)
+  }
+
+  it should "send ack(success) to scheduler when container creation is finished" in within(timeout) {
+    val (containers, factory) = testContainers(1)
+    val doc = put(entityStore, whiskAction)
+    // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
+    val topic = s"creationAck${creationMessage.rootSchedulerIndex.asString}"
+
+    val consumer = new TestConnector(topic, 4, true)
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig(MemoryLimit.STD_MEMORY),
+          invokerInstance,
+          List.empty,
+          sendAckToScheduler(consumer.getProducer()))))
+
+    val actualCreationMessage = creationMessage.copy(revision = doc.rev)
+    val ackMessage = createAckMsg(actualCreationMessage, None, None)
+
+    pool ! CreationContainer(actualCreationMessage, whiskAction)
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+    containers(0).send(pool, Initialized(initializedData)) // container is initialized
+
+    utilRetry({
+      val buffer = consumer.peek(50.millisecond)
+      buffer.size shouldBe 1
+      buffer.head._1 shouldBe topic
+      buffer.head._4 shouldBe ackMessage.serialize.getBytes
+    }, 10, Some(500.millisecond))
+  }
+
+  it should "send ack(success) to scheduler when chosen warmed container is resumed" in within(timeout) {
+    val (containers, factory) = testContainers(1)
+    val doc = put(entityStore, whiskAction)
+    // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
+    val topic = s"creationAck${creationMessage.rootSchedulerIndex.asString}"
+
+    val consumer = new TestConnector(topic, 4, true)
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig(MemoryLimit.STD_MEMORY),
+          invokerInstance,
+          List.empty,
+          sendAckToScheduler(consumer.getProducer()))))
+
+    val actualCreationMessage = creationMessage.copy(revision = doc.rev)
+    val ackMessage = createAckMsg(actualCreationMessage, None, None)
+
+    // register a fake warmed container
+    val container = TestProbe()
+    pool.tell(
+      ContainerIsPaused(
+        WarmData(
+          stub[DockerContainer],
+          invocationNamespace.asString,
+          whiskAction.toExecutableWhiskAction.get,
+          doc.rev,
+          Instant.now,
+          TestProbe().ref)),
+      container.ref)
+
+    // choose the warmed container
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    container.expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+    pool.tell(
+      Resumed(
+        WarmData(
+          stub[DockerContainer],
+          invocationNamespace.asString,
+          whiskAction.toExecutableWhiskAction.get,
+          doc.rev,
+          Instant.now,
+          TestProbe().ref)),
+      container.ref)
+
+    utilRetry({
+      val buffer = consumer.peek(50.millisecond)
+      buffer.size shouldBe 1
+      buffer.head._1 shouldBe topic
+      buffer.head._4 shouldBe ackMessage.serialize.getBytes
+    }, 10, Some(500.millisecond))
+  }
+
+  it should "send ack(reschedule) to scheduler when container creation is failed or resource is not enough" in within(
+    timeout) {
+    val (containers, factory) = testContainers(1)
+    val doc = put(entityStore, bigWhiskAction)
+    val doc2 = put(entityStore, whiskAction)
+    val topic = s"creationAck${schedulerInstanceId.asString}"
+    val consumer = new TestConnector(topic, 4, true)
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig(MemoryLimit.STD_MEMORY),
+          invokerInstance,
+          List.empty,
+          sendAckToScheduler(consumer.getProducer()))))
+
+    val actualCreationMessageLarge = creationMessageLarge.copy(revision = doc.rev)
+    val error =
+      s"creationId: ${creationMessageLarge.creationId}, invoker[$invokerInstance] doesn't have enough resource for container: ${creationMessageLarge.action}"
+    val ackMessage =
+      createAckMsg(actualCreationMessageLarge, Some(ResourceNotEnoughError), Some(error))
+
+    pool ! CreationContainer(actualCreationMessageLarge, bigWhiskAction)
+
+    utilRetry({
+      val buffer = consumer.peek(50.millisecond)
+      buffer.size shouldBe 1
+      buffer.head._1 shouldBe topic
+      buffer.head._4 shouldBe ackMessage.serialize.getBytes
+    }, 10, Some(500.millisecond))
+
+    val actualCreationMessage = creationMessage.copy(revision = doc2.rev)
+    val rescheduleAckMsg = createAckMsg(actualCreationMessage, Some(UnknownError), Some("ContainerProxy init failed."))
+
+    pool ! CreationContainer(actualCreationMessage, whiskAction)
+    containers(0).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+    containers(0).send(pool, ContainerRemoved(true)) // the container0 init failed or create container failed
+
+    utilRetry({
+      val buffer2 = consumer.peek(50.millisecond)
+      buffer2.size shouldBe 1
+      buffer2.head._1 shouldBe topic
+      buffer2.head._4 shouldBe rescheduleAckMsg.serialize.getBytes
+    }, 10, Some(500.millisecond))
+  }
+
+  it should "send memory info to invokerHealthManager immediately when doesn't have enough resource happens" in within(
+    timeout) {
+    val (containers, factory) = testContainers(1)
+    val doc = put(entityStore, whiskAction)
+
+    val invokerHealthService = TestProbe()
+    var count = 0
+    invokerHealthService.setAutoPilot((_: ActorRef, msg: Any) =>
+      msg match {
+        case _: MemoryInfo =>
+          count += 1
+          TestActor.KeepRunning
+
+        case _ =>
+          TestActor.KeepRunning
+    })
+
+    val pool = system.actorOf(
+      Props(new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService.ref,
+        poolConfig(MemoryLimit.STD_MEMORY * 1, memorySyncInterval = 1.minute),
+        invokerInstance,
+        List(PrewarmingConfig(1, exec, memoryLimit)),
+        sendAckToScheduler(producer))))
+    containers(0).expectMsg(Start(exec, memoryLimit))
+
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+
+    awaitAssert {
+      count shouldBe 3
+    }
+  }
+
+  it should "adjust prewarm container run well without reactive config" in {
+    stream.reset()
+    val (containers, factory) = testContainers(4)
+
+    val prewarmExpirationCheckInitDelay = FiniteDuration(2, TimeUnit.SECONDS)
+    val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
+    val poolConfig =
+      ContainerPoolConfig(
+        MemoryLimit.STD_MEMORY * 4,
+        0.5,
+        false,
+        prewarmExpirationCheckInitDelay,
+        prewarmExpirationCheckIntervel,
+        None,
+        100,
+        3,
+        false,
+        1.second)
+    val initialCount = 2
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig,
+          invokerInstance,
+          List(PrewarmingConfig(initialCount, exec, memoryLimit)),
+          sendAckToScheduler(producer))))
+
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    containers(1).expectMsg(Start(exec, memoryLimit))
+    containers(0).send(pool, ReadyToWork(prewarmedData))
+    containers(1).send(pool, ReadyToWork(prewarmedData))
+
+    // when invoker starts, include 0 prewarm container at the very beginning
+    stream.toString should include(s"found 0 started")
+
+    // the desiredCount should equal with initialCount when invoker starts
+    stream.toString should include(s"desired count: ${initialCount}")
+
+    stream.reset()
+
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+    // Because already supplemented the prewarmed container, so currentCount should equal with initialCount
+    eventually {
+      stream.toString should not include ("started")
+    }
+  }
+
+  it should "adjust prewarm container run well with reactive config" in {
+    stream.reset()
+    val (containers, factory) = testContainers(15)
+    val doc = put(entityStore, whiskAction)
+
+    val prewarmExpirationCheckInitDelay = FiniteDuration(2, TimeUnit.SECONDS)
+    val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
+    val poolConfig =
+      ContainerPoolConfig(
+        MemoryLimit.STD_MEMORY * 8,
+        0.5,
+        false,
+        prewarmExpirationCheckInitDelay,
+        prewarmExpirationCheckIntervel,
+        None,
+        100,
+        3,
+        false,
+        1.second)
+    val minCount = 0
+    val initialCount = 2
+    val maxCount = 4
+    val ttl = FiniteDuration(500, TimeUnit.MILLISECONDS)
+    val threshold = 1
+    val increment = 1
+    val deadline: Option[Deadline] = Some(ttl.fromNow)
+    val reactive: Option[ReactivePrewarmingConfig] =
+      Some(ReactivePrewarmingConfig(minCount, maxCount, ttl, threshold, increment))
+    val prewarmedData = PreWarmData(mock[MockableV2Container], actionKind, memoryLimit, deadline)
+    val pool = system.actorOf(
+      Props(new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService.ref,
+        poolConfig,
+        invokerInstance,
+        List(PrewarmingConfig(initialCount, exec, memoryLimit, reactive)),
+        sendAckToScheduler(producer))))
+
+    containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(0).send(pool, ReadyToWork(prewarmedData))
+    containers(1).send(pool, ReadyToWork(prewarmedData))
+
+    // when invoker starts, include 0 prewarm container at the very beginning
+    stream.toString should include(s"found 0 started")
+
+    // the desiredCount should equal with initialCount when invoker starts
+    stream.toString should include(s"desired count: ${initialCount}")
+
+    stream.reset()
+
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+    //expire 2 prewarms
+    containers(0).expectMsg(Remove)
+    containers(1).expectMsg(Remove)
+    containers(0).send(pool, ContainerRemoved(true))
+    containers(1).send(pool, ContainerRemoved(true))
+
+    // currentCount should equal with 0 due to these 2 prewarmed containers are expired
+    stream.toString should not include (s"found 0 started")
+
+    // the desiredCount should equal with minCount because cold start didn't happen
+    stream.toString should not include (s"desired count: ${minCount}")
+    // Previously created prewarmed containers should be removed
+    stream.toString should not include (s"removed ${initialCount} expired prewarmed container")
+
+    // 2 cold start happened
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(2).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(3).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+    eventually {
+      // Because already removed expired prewarmed containrs, so currentCount should equal with 0
+      stream.toString should include(s"found 0 started")
+      // the desiredCount should equal with 2 due to cold start happened
+      stream.toString should include(s"desired count: 2")
+    }
+    containers(4).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(5).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(4).send(pool, ReadyToWork(prewarmedData))
+    containers(5).send(pool, ReadyToWork(prewarmedData))
+
+    stream.reset()
+
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+    containers(4).expectMsg(Remove)
+    containers(5).expectMsg(Remove)
+    containers(4).send(pool, ContainerRemoved(true))
+    containers(5).send(pool, ContainerRemoved(true))
+
+    // removed previous 2 prewarmed container due to expired
+    stream.toString should include(s"removing up to ${poolConfig.prewarmExpirationLimit} of 2 expired containers")
+
+    stream.reset()
+
+    // 5 code start happened(5 > maxCount)
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(6).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(7).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(8).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(9).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+    pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
+    containers(10).expectMsgPF() {
+      case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
+    }
+
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+    eventually {
+      // Because already removed expired prewarmed containrs, so currentCount should equal with 0
+      stream.toString should include(s"found 0 started")
+      // in spite of the cold start number > maxCount, but the desiredCount can't be greater than maxCount
+      stream.toString should include(s"desired count: ${maxCount}")
+    }
+
+    containers(11).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(12).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(13).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(14).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(11).send(pool, ReadyToWork(prewarmedData))
+    containers(12).send(pool, ReadyToWork(prewarmedData))
+    containers(13).send(pool, ReadyToWork(prewarmedData))
+    containers(14).send(pool, ReadyToWork(prewarmedData))
+  }
+
+  it should "prewarm failed creation count cannot exceed max retry limit" in {
+    stream.reset()
+    val (containers, factory) = testContainers(4)
+
+    val prewarmExpirationCheckInitDelay = FiniteDuration(1, TimeUnit.MINUTES)
+    val prewarmExpirationCheckIntervel = FiniteDuration(1, TimeUnit.MINUTES)
+    val maxRetryLimit = 3
+    val poolConfig =
+      ContainerPoolConfig(
+        MemoryLimit.STD_MEMORY * 4,
+        0.5,
+        false,
+        prewarmExpirationCheckInitDelay,
+        prewarmExpirationCheckIntervel,
+        None,
+        100,
+        maxRetryLimit,
+        false,
+        1.second)
+    val initialCount = 1
+    val pool = system.actorOf(
+      Props(
+        new FunctionPullingContainerPool(
+          factory,
+          invokerHealthService.ref,
+          poolConfig,
+          invokerInstance,
+          List(PrewarmingConfig(initialCount, exec, memoryLimit)),
+          sendAckToScheduler(producer))))
+
+    // create the prewarm initially
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    containers(0).send(pool, ContainerRemoved(true))
+    stream.toString should not include (s"prewarm create failed count exceeds max retry limit")
+
+    // the first retry
+    containers(1).expectMsg(Start(exec, memoryLimit))
+    containers(1).send(pool, ContainerRemoved(true))
+    stream.toString should not include (s"prewarm create failed count exceeds max retry limit")
+
+    // the second retry
+    containers(2).expectMsg(Start(exec, memoryLimit))
+    containers(2).send(pool, ContainerRemoved(true))
+    stream.toString should not include (s"prewarm create failed count exceeds max retry limit")
+
+    // the third retry
+    containers(3).expectMsg(Start(exec, memoryLimit))
+    containers(3).send(pool, ContainerRemoved(true))
+
+    // the forth retry but failed retry count exceeds max retry limit
+    eventually {
+      stream.toString should include(s"prewarm create failed count exceeds max retry limit")
+    }
+  }
+
+}
+
+abstract class MockableV2Container extends Container {
+  protected[core] val addr: ContainerAddress = ContainerAddress("nohost")
+}