You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/08/22 05:08:10 UTC

[openwhisk] branch master updated: Exclude warmed containers in disabled invokers. (#5313)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e36b2d8c0 Exclude warmed containers in disabled invokers. (#5313)
e36b2d8c0 is described below

commit e36b2d8c0cd3ef362ea0f6220d9af0371acd10c2
Author: Dominic Kim <st...@apache.org>
AuthorDate: Mon Aug 22 14:08:03 2022 +0900

    Exclude warmed containers in disabled invokers. (#5313)
    
    * Exclude warmed containers in disabled invokers.
    
    * Exclude warmed containers in disabled invokers.
    
    * Find the first warmed container.
    
    * Remove the code added by mistake.
    
    * Add more logs for error cases.
---
 .../apache/openwhisk/core/connector/Message.scala  |  17 +
 .../core/invoker/FPCInvokerReactive.scala          |  16 +-
 .../scheduler/container/ContainerManager.scala     | 383 ++++++++++-------
 .../container/test/ContainerManagerTests.scala     | 464 +++++++++++++++++----
 4 files changed, 627 insertions(+), 253 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index d3c550eb2..b65b11496 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -636,6 +636,23 @@ object ContainerMessage extends DefaultJsonProtocol {
 sealed trait ContainerCreationError
 
 object ContainerCreationError extends Enumeration {
+  import scala.language.implicitConversions
+  implicit def containerCreationErrorToString(x: ContainerCreationError): String = {
+    x match {
+      case NoAvailableInvokersError         => "no available invoker is found"
+      case NoAvailableResourceInvokersError => "no available invoker with the resources is found: "
+      case ResourceNotEnoughError           => "invoker(s) have not enough resources"
+      case WhiskError                       => "whisk error(recoverable) happens"
+      case UnknownError                     => "a unknown error happens"
+      case TimeoutError                     => "a timeout error happens"
+      case ShuttingDownError                => "shutting down error happens"
+      case NonExecutableActionError         => "no executable found for the action"
+      case DBFetchError                     => "an error happens while fetching data from DB"
+      case BlackBoxError                    => "a blackbox error happens"
+      case ZeroNamespaceLimit               => "the namespace has 0 limit configured"
+      case TooManyConcurrentRequests        => "too many concurrent requests are in flight."
+    }
+  }
 
   case object NoAvailableInvokersError extends ContainerCreationError
 
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index e0393c056..e38c39701 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -376,18 +376,6 @@ class FPCInvokerReactive(config: WhiskConfig,
   override def enable(): Route = {
     invokerHealthManager ! Enable
     pool ! Enable
-    // re-enable consumer
-    if (consumer.isEmpty)
-      consumer = Some(
-        new ContainerMessageConsumer(
-          instance,
-          pool,
-          entityStore,
-          cfg,
-          msgProvider,
-          longPollDuration = 1.second,
-          maxPeek,
-          sendAckToScheduler))
     warmUp()
     complete("Success enable invoker")
   }
@@ -395,15 +383,13 @@ class FPCInvokerReactive(config: WhiskConfig,
   override def disable(): Route = {
     invokerHealthManager ! GracefulShutdown
     pool ! GracefulShutdown
-    consumer.foreach(_.close())
-    consumer = None
     warmUpWatcher.foreach(_.close())
     warmUpWatcher = None
     complete("Successfully disabled invoker")
   }
 
   override def isEnabled(): Route = {
-    complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).serialize())
+    complete(InvokerEnabled(warmUpWatcher.nonEmpty).serialize())
   }
 
   override def backfillPrewarm(): Route = {
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
index 837045ed2..038dbe09c 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -16,62 +16,44 @@
  */
 
 package org.apache.openwhisk.core.scheduler.container
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.ThreadLocalRandom
 import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
 import akka.event.Logging.InfoLevel
 import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
-import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.connector.ContainerCreationError.{
+  containerCreationErrorToString,
   NoAvailableInvokersError,
   NoAvailableResourceInvokersError
 }
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.entity.size._
-import org.apache.openwhisk.core.entity.{
-  Annotations,
-  ByteSize,
-  DocRevision,
-  FullyQualifiedEntityName,
-  InvokerInstanceId,
-  MemoryLimit,
-  SchedulerInstanceId
-}
+import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.etcd.EtcdClient
 import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
 import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys}
 import org.apache.openwhisk.core.etcd.EtcdType._
 import org.apache.openwhisk.core.scheduler.Scheduler
-import org.apache.openwhisk.core.scheduler.message.{
-  ContainerCreation,
-  ContainerDeletion,
-  ContainerKeyMeta,
-  CreationJobState,
-  FailedCreationJob,
-  RegisterCreationJob,
-  ReschedulingCreationJob,
-  SuccessfulCreationJob
-}
+import org.apache.openwhisk.core.scheduler.container.ContainerManager.{sendState, updateInvokerMemory}
+import org.apache.openwhisk.core.scheduler.message._
 import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool}
-import org.apache.openwhisk.core.service.{
-  DeleteEvent,
-  PutEvent,
-  UnwatchEndpoint,
-  WatchEndpoint,
-  WatchEndpointInserted,
-  WatchEndpointRemoved
-}
+import org.apache.openwhisk.core.service._
 import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
-import pureconfig.generic.auto._
 import pureconfig.loadConfigOrThrow
 import spray.json.DefaultJsonProtocol._
+import pureconfig.generic.auto._
 
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.ThreadLocalRandom
+import scala.annotation.tailrec
 import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
 import scala.collection.concurrent.TrieMap
+import scala.collection.immutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
 import scala.util.{Failure, Success}
 
-case class ScheduledPair(msg: ContainerCreationMessage, invokerId: InvokerInstanceId)
+case class ScheduledPair(msg: ContainerCreationMessage,
+                         invokerId: Option[InvokerInstanceId],
+                         err: Option[ContainerCreationError] = None)
 
 case class BlackboxFractionConfig(managedFraction: Double, blackboxFraction: Double)
 
@@ -157,43 +139,64 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
     case _ =>
   }
 
-  private def createContainer(msgs: List[ContainerCreationMessage],
-                              memory: ByteSize,
-                              invocationNamespace: String): Unit = {
+  private def createContainer(msgs: List[ContainerCreationMessage], memory: ByteSize, invocationNamespace: String)(
+    implicit logging: Logging): Unit = {
     logging.info(this, s"received ${msgs.size} creation message [${msgs.head.invocationNamespace}:${msgs.head.action}]")
-    val coldCreations = filterWarmedCreations(msgs)
-    if (coldCreations.nonEmpty)
-      ContainerManager
-        .getAvailableInvokers(etcdClient, memory, invocationNamespace)
-        .flatMap { invokers =>
-          if (invokers.isEmpty) {
-            coldCreations.foreach { msg =>
-              ContainerManager.sendState(
-                FailedCreationJob(
-                  msg.creationId,
-                  msg.invocationNamespace,
-                  msg.action,
-                  msg.revision,
-                  NoAvailableInvokersError,
-                  s"No available invokers."))
-            }
-            Future.failed(NoCapacityException("No available invokers."))
-          } else {
-            coldCreations.foreach { msg =>
-              creationJobManager ! RegisterCreationJob(msg)
-            }
+    ContainerManager
+      .getAvailableInvokers(etcdClient, memory, invocationNamespace)
+      .foreach { invokers =>
+        if (invokers.isEmpty) {
+          logging.error(this, "there is no available invoker to schedule.")
+          msgs.foreach(ContainerManager.sendState(_, NoAvailableInvokersError, NoAvailableInvokersError))
+        } else {
+          val (coldCreations, warmedCreations) =
+            ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs)
+
+          // handle warmed creation
+          val chosenInvokers: immutable.Seq[Option[(Int, ContainerCreationMessage)]] = warmedCreations.map {
+            warmedCreation =>
+              // update the in-progress map for warmed containers.
+              // even if it is done in the filterWarmedCreations method, it is still necessary to apply the change to the original map.
+              warmedCreation._3.foreach(inProgressWarmedContainers.update(warmedCreation._1.creationId.asString, _))
+
+              // send creation message to the target invoker.
+              warmedCreation._2 map { chosenInvoker =>
+                val msg = warmedCreation._1
+                creationJobManager ! RegisterCreationJob(msg)
+                sendCreationContainerToInvoker(messagingProducer, chosenInvoker, msg)
+                (chosenInvoker, msg)
+              }
+          }
 
-            Future {
-              ContainerManager
-                .schedule(invokers, coldCreations, memory)
-                .map { pair =>
-                  sendCreationContainerToInvoker(messagingProducer, pair.invokerId.toInt, pair.msg)
-                }
+          // update the resource usage of invokers to apply changes from warmed creations.
+          val updatedInvokers = chosenInvokers.foldLeft(invokers) { (invokers, chosenInvoker) =>
+            chosenInvoker match {
+              case Some((chosenInvoker, msg)) =>
+                updateInvokerMemory(chosenInvoker, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+              case err =>
+                // this is not supposed to happen.
+                logging.error(this, s"warmed creation is scheduled but no invoker is chosen: $err")
+                invokers
             }
-          }.andThen {
-            case Failure(t) => logging.warn(this, s"Failed to create container caused by: $t")
           }
+
+          // handle cold creations
+          ContainerManager
+            .schedule(updatedInvokers, coldCreations.map(_._1), memory)
+            .map { pair =>
+              pair.invokerId match {
+                // an invoker is assigned for the msg
+                case Some(instanceId) =>
+                  creationJobManager ! RegisterCreationJob(pair.msg)
+                  sendCreationContainerToInvoker(messagingProducer, instanceId.instance, pair.msg)
+
+                // if a chosen invoker does not exist, it means it failed to find a matching invoker for the msg.
+                case _ =>
+                  pair.err.foreach(error => sendState(pair.msg, error, error))
+              }
+            }
         }
+      }
   }
 
   private def getInvokersWithOldContainer(invocationNamespace: String,
@@ -248,31 +251,6 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
     ContainerKeyMeta(revision, invokerId, containerId)
   }
 
-  // Filter out messages which can use warmed container
-  private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
-    msgs.filter { msg =>
-      val warmedPrefix =
-        containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
-      val chosenInvoker = warmedContainers
-        .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
-        .find { container =>
-          if (container.startsWith(warmedPrefix)) {
-            logging.info(this, s"Choose a warmed container $container")
-            inProgressWarmedContainers.update(msg.creationId.asString, container)
-            true
-          } else
-            false
-        }
-        .map(_.split("/").takeRight(3).apply(0))
-      if (chosenInvoker.nonEmpty) {
-        creationJobManager ! RegisterCreationJob(msg)
-        sendCreationContainerToInvoker(messagingProducer, chosenInvoker.get.toInt, msg)
-        false
-      } else
-        true
-    }
-  }
-
   private def sendCreationContainerToInvoker(producer: MessageProducer,
                                              invoker: Int,
                                              msg: ContainerCreationMessage): Future[ResultMetadata] = {
@@ -360,6 +338,87 @@ object ContainerManager {
    */
   def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
 
+  // Partition messages that can use warmed containers.
+  // return: (list of messages that cannot use warmed containers, list of messages that can take advantage of warmed containers)
+  protected[container] def filterWarmedCreations(warmedContainers: Set[String],
+                                                 inProgressWarmedContainers: TrieMap[String, String],
+                                                 invokers: List[InvokerHealth],
+                                                 msgs: List[ContainerCreationMessage])(
+    implicit logging: Logging): (List[(ContainerCreationMessage, Option[Int], Option[String])],
+                                 List[(ContainerCreationMessage, Option[Int], Option[String])]) = {
+    val warmedApplied = msgs.map { msg =>
+      val warmedPrefix =
+        containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
+      val container = warmedContainers
+        .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
+        .find { container =>
+          if (container.startsWith(warmedPrefix)) {
+            logging.info(this, s"Choose a warmed container $container")
+
+            // this is required to exclude already chosen invokers
+            inProgressWarmedContainers.update(msg.creationId.asString, container)
+            true
+          } else
+            false
+        }
+
+      // chosenInvoker is supposed to have only one item
+      val chosenInvoker = container
+        .map(_.split("/").takeRight(3).apply(0))
+        // filter warmed containers in disabled invokers
+        .filter(
+          invoker =>
+            invokers
+            // filterWarmedCreations method is supposed to receive healthy invokers only but this will make sure again only healthy invokers are used.
+              .filter(invoker => invoker.status.isUsable)
+              .map(_.id.instance)
+              .contains(invoker.toInt))
+
+      if (chosenInvoker.nonEmpty && container.nonEmpty) {
+        (msg, Some(chosenInvoker.get.toInt), Some(container.get))
+      } else
+        (msg, None, None)
+    }
+
+    warmedApplied.partition { item =>
+      if (item._2.nonEmpty) false
+      else true
+    }
+  }
+
+  protected[container] def updateInvokerMemory(invokerId: Int,
+                                               requiredMemory: Long,
+                                               invokers: List[InvokerHealth]): List[InvokerHealth] = {
+    // it must be compared to the instance unique id
+    val index = invokers.indexOf(invokers.filter(p => p.id.instance == invokerId).head)
+    val invoker = invokers(index)
+
+    // if the invoker has less than minimum memory, drop it from the list.
+    if (invoker.id.userMemory.toMB - requiredMemory < MemoryLimit.MIN_MEMORY.toMB) {
+      // drop the nth element
+      val split = invokers.splitAt(index)
+      val _ :: t1 = split._2
+      split._1 ::: t1
+    } else {
+      invokers.updated(
+        index,
+        invoker.copy(id = invoker.id.copy(userMemory = invoker.id.userMemory - requiredMemory.MB)))
+    }
+  }
+
+  protected[container] def updateInvokerMemory(invokerId: Option[InvokerInstanceId],
+                                               requiredMemory: Long,
+                                               invokers: List[InvokerHealth]): List[InvokerHealth] = {
+    invokerId match {
+      case Some(instanceId) =>
+        updateInvokerMemory(instanceId.instance, requiredMemory, invokers)
+
+      case None =>
+        // do nothing
+        invokers
+    }
+  }
+
   /**
    * Assign an invoker to a message
    *
@@ -395,64 +454,92 @@ object ContainerManager {
         val resourcesStrictPolicy = msg.whiskActionMetaData.annotations
           .getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName)
           .getOrElse(true)
-        val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.map(_.exec.pull).getOrElse(false)
+        val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.exists(_.exec.pull)
         if (requiredResources.isEmpty) {
           // only choose managed invokers or blackbox invokers
           val wantedInvokers = if (isBlackboxInvocation) {
-            candidates.filter(c => blackboxInvokers.map(b => b.id.instance).contains(c.id.instance)).toSet
+            logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for blackbox invokers to schedule.")
+            candidates
+              .filter(
+                c =>
+                  blackboxInvokers
+                    .map(b => b.id.instance)
+                    .contains(c.id.instance) && c.id.userMemory.toMB >= msg.whiskActionMetaData.limits.memory.megabytes)
+              .toSet
           } else {
-            candidates.filter(c => managedInvokers.map(m => m.id.instance).contains(c.id.instance)).toSet
+            logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for managed invokers to schedule.")
+            candidates
+              .filter(
+                c =>
+                  managedInvokers
+                    .map(m => m.id.instance)
+                    .contains(c.id.instance) && c.id.userMemory.toMB >= msg.whiskActionMetaData.limits.memory.megabytes)
+              .toSet
           }
           val taggedInvokers = candidates.filter(_.id.tags.nonEmpty)
 
           if (wantedInvokers.nonEmpty) {
-            chooseInvokerFromCandidates(wantedInvokers.toList, invokers, pairs, msg)
+            val scheduledPair = chooseInvokerFromCandidates(wantedInvokers.toList, msg)
+            val updatedInvokers =
+              updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+            (scheduledPair :: pairs, updatedInvokers)
           } else if (taggedInvokers.nonEmpty) { // if not found from the wanted invokers, choose tagged invokers then
-            chooseInvokerFromCandidates(taggedInvokers, invokers, pairs, msg)
+            logging.info(
+              this,
+              s"[${msg.invocationNamespace}/${msg.action}] since there is no available non-tagged invoker, choose one among tagged invokers.")
+            val scheduledPair = chooseInvokerFromCandidates(taggedInvokers, msg)
+            val updatedInvokers =
+              updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+            (scheduledPair :: pairs, updatedInvokers)
           } else {
-            sendState(
-              FailedCreationJob(
-                msg.creationId,
-                msg.invocationNamespace,
-                msg.action,
-                msg.revision,
-                NoAvailableInvokersError,
-                s"No available invokers."))
-            (pairs, candidates)
+            logging.error(
+              this,
+              s"[${msg.invocationNamespace}/${msg.action}] there is no invoker available to schedule to schedule.")
+            val scheduledPair =
+              ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError))
+            (scheduledPair :: pairs, invokers)
           }
         } else {
+          logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for tagged invokers to schedule.")
           val wantedInvokers = candidates.filter(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
           if (wantedInvokers.nonEmpty) {
-            chooseInvokerFromCandidates(wantedInvokers, invokers, pairs, msg)
+            val scheduledPair = chooseInvokerFromCandidates(wantedInvokers, msg)
+            val updatedInvokers =
+              updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+            (scheduledPair :: pairs, updatedInvokers)
           } else if (resourcesStrictPolicy) {
-            sendState(
-              FailedCreationJob(
-                msg.creationId,
-                msg.invocationNamespace,
-                msg.action,
-                msg.revision,
-                NoAvailableResourceInvokersError,
-                s"No available invokers with resources $requiredResources."))
-            (pairs, candidates)
+            logging.error(
+              this,
+              s"[${msg.invocationNamespace}/${msg.action}] there is no available invoker with the resource: ${requiredResources}")
+            val scheduledPair =
+              ScheduledPair(msg, invokerId = None, Some(NoAvailableResourceInvokersError))
+            (scheduledPair :: pairs, invokers)
           } else {
+            logging.info(
+              this,
+              s"[${msg.invocationNamespace}/${msg.action}] since there is no available invoker with the resource, choose any invokers without the resource.")
             val (noTaggedInvokers, taggedInvokers) = candidates.partition(_.id.tags.isEmpty)
             if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first
-              chooseInvokerFromCandidates(noTaggedInvokers, invokers, pairs, msg)
+              val scheduledPair = chooseInvokerFromCandidates(noTaggedInvokers, msg)
+              val updatedInvokers =
+                updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+              (scheduledPair :: pairs, updatedInvokers)
             } else {
               val leftInvokers =
                 taggedInvokers.filterNot(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
-              if (leftInvokers.nonEmpty)
-                chooseInvokerFromCandidates(leftInvokers, invokers, pairs, msg)
-              else {
-                sendState(
-                  FailedCreationJob(
-                    msg.creationId,
-                    msg.invocationNamespace,
-                    msg.action,
-                    msg.revision,
-                    NoAvailableInvokersError,
-                    s"No available invokers."))
-                (pairs, candidates)
+              if (leftInvokers.nonEmpty) {
+                val scheduledPair = chooseInvokerFromCandidates(leftInvokers, msg)
+                val updatedInvokers =
+                  updateInvokerMemory(
+                    scheduledPair.invokerId,
+                    msg.whiskActionMetaData.limits.memory.megabytes,
+                    invokers)
+                (scheduledPair :: pairs, updatedInvokers)
+              } else {
+                logging.error(this, s"[${msg.invocationNamespace}/${msg.action}] no available invoker is found")
+                val scheduledPair =
+                  ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError))
+                (scheduledPair :: pairs, invokers)
               }
             }
           }
@@ -462,32 +549,30 @@ object ContainerManager {
     list
   }
 
-  private def chooseInvokerFromCandidates(
-    candidates: List[InvokerHealth],
-    wholeInvokers: List[InvokerHealth],
-    pairs: List[ScheduledPair],
-    msg: ContainerCreationMessage)(implicit logging: Logging): (List[ScheduledPair], List[InvokerHealth]) = {
-    val idx = rng(mod = candidates.size)
-    val instance = candidates(idx)
-    // it must be compared to the instance unique id
-    val idxInWhole = wholeInvokers.indexOf(wholeInvokers.filter(p => p.id.instance == instance.id.instance).head)
-    val requiredMemory = msg.whiskActionMetaData.limits.memory.megabytes
-    val updated =
-      if (instance.id.userMemory.toMB - requiredMemory >= requiredMemory) { // Since ByteSize is negative, it converts to long type and compares.
-        wholeInvokers.updated(
-          idxInWhole,
-          instance.copy(id = instance.id.copy(userMemory = instance.id.userMemory - requiredMemory.MB)))
+  @tailrec
+  protected[container] def chooseInvokerFromCandidates(candidates: List[InvokerHealth], msg: ContainerCreationMessage)(
+    implicit logging: Logging): ScheduledPair = {
+    val requiredMemory = msg.whiskActionMetaData.limits.memory
+    if (candidates.isEmpty) {
+      ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError))
+    } else if (candidates.forall(p => p.id.userMemory.toMB < requiredMemory.megabytes)) {
+      ScheduledPair(msg, invokerId = None, Some(NoAvailableResourceInvokersError))
+    } else {
+      val idx = rng(mod = candidates.size)
+      val instance = candidates(idx)
+      if (instance.id.userMemory.toMB < requiredMemory.megabytes) {
+        val split = candidates.splitAt(idx)
+        val _ :: t1 = split._2
+        chooseInvokerFromCandidates(split._1 ::: t1, msg)
       } else {
-        // drop the nth element
-        val split = wholeInvokers.splitAt(idxInWhole)
-        val _ :: t = split._2
-        split._1 ::: t
+        ScheduledPair(msg, invokerId = Some(instance.id))
       }
-
-    (ScheduledPair(msg, instance.id) :: pairs, updated)
+    }
   }
 
-  private def sendState(state: CreationJobState)(implicit logging: Logging): Unit = {
+  private def sendState(msg: ContainerCreationMessage, err: ContainerCreationError, reason: String)(
+    implicit logging: Logging): Unit = {
+    val state = FailedCreationJob(msg.creationId, msg.invocationNamespace, msg.action, msg.revision, err, reason)
     QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match {
       case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
         memoryQueueValue.queue ! state
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
index bf7f14d58..d1f43270b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -39,15 +39,16 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
 import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys}
 import org.apache.openwhisk.core.etcd.EtcdType._
 import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
-import org.apache.openwhisk.core.scheduler.container._
+import org.apache.openwhisk.core.scheduler.container.{ScheduledPair, _}
 import org.apache.openwhisk.core.scheduler.message.{
   ContainerCreation,
   ContainerDeletion,
   FailedCreationJob,
   RegisterCreationJob,
-  ReschedulingCreationJob,
-  SuccessfulCreationJob
+  ReschedulingCreationJob
 }
+
+import scala.language.postfixOps
 import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
 import org.apache.openwhisk.core.service.{WatchEndpointInserted, WatchEndpointRemoved}
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
@@ -60,6 +61,7 @@ import pureconfig.loadConfigOrThrow
 import spray.json.{JsArray, JsBoolean, JsString}
 import pureconfig.generic.auto._
 
+import scala.collection.concurrent.TrieMap
 import scala.collection.mutable
 import scala.concurrent.Future
 import scala.concurrent.duration.{FiniteDuration, _}
@@ -182,8 +184,8 @@ class ContainerManagerTests
             val msg = InvokerResourceMessage(
               invoker.status.asString,
               invoker.id.userMemory.toMB,
-              invoker.id.userMemory.toMB,
-              invoker.id.userMemory.toMB,
+              invoker.id.busyMemory.getOrElse(0.MB).toMB,
+              0,
               invoker.id.tags,
               invoker.id.dedicatedNamespaces)
 
@@ -271,16 +273,22 @@ class ContainerManagerTests
   it should "try warmed containers first" in {
     val mockEtcd = mock[EtcdClient]
 
-    // for test, only invoker2 is healthy, so that no-warmed creations can be only sent to invoker2
+    // at first, invoker states look like this.
     val invokers: List[InvokerHealth] = List(
-      InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
-      InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
-      InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+      InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 512.MB, tags = Seq.empty[String]), Healthy),
     )
-    expectGetInvokers(mockEtcd, invokers)
-    expectGetInvokers(mockEtcd, invokers)
-    expectGetInvokers(mockEtcd, invokers)
-    expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` for 3 times, and another one for warmup
+
+    // after then, invoker states changes like this.
+    val updatedInvokers: List[InvokerHealth] = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB, tags = Seq.empty[String]), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+    )
+    expectGetInvokers(mockEtcd, invokers) // for warm up
+    expectGetInvokers(mockEtcd, invokers) // for first creation
+    expectGetInvokers(mockEtcd, updatedInvokers) // for second creation
 
     val mockJobManager = TestProbe()
     val mockWatcher = TestProbe()
@@ -294,7 +302,7 @@ class ContainerManagerTests
       system.actorOf(ContainerManager
         .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
 
-    // there are 1 warmed container for `test-namespace/test-action` and 1 for `test-namespace/test-action-2`
+    // Add warmed containers for action1 and action2 in invoker0 and invoker1 respectively
     manager ! WatchEndpointInserted(
       ContainerKeys.warmedPrefix,
       ContainerKeys.warmedContainers(
@@ -349,9 +357,9 @@ class ContainerManagerTests
     val msgs = List(msg1, msg2, msg3)
 
     // it should reuse 2 warmed containers
-    manager ! ContainerCreation(msgs, 128.MB, testInvocationNamespace)
+    manager ! ContainerCreation(msgs, 256.MB, testInvocationNamespace)
 
-    // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the healthy invoker
+    // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the remainder
     receiver.expectMsg(s"invoker0-$msg1")
     receiver.expectMsg(s"invoker1-$msg2")
     receiver.expectMsg(s"invoker2-$msg3")
@@ -362,28 +370,7 @@ class ContainerManagerTests
       case RegisterCreationJob(`msg3`) => true
     }
 
-    // now warmed container for action2 become warmed again
-    manager ! SuccessfulCreationJob(msg2.creationId, msg2.invocationNamespace, msg2.action, msg2.revision)
-    manager ! SuccessfulCreationJob(msg3.creationId, msg3.invocationNamespace, msg3.action, msg3.revision)
-    // it still need to use invoker2
-    manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
-    receiver.expectMsg(s"invoker2-$msg1")
-    // it will use warmed container on invoker1
-    manager ! ContainerCreation(List(msg2), 128.MB, testInvocationNamespace)
-    receiver.expectMsg(s"invoker1-$msg2")
-
-    // warmed container for action1 become warmed when received FailedCreationJob
-    manager ! FailedCreationJob(
-      msg1.creationId,
-      msg1.invocationNamespace,
-      msg1.action,
-      msg1.revision,
-      NoAvailableResourceInvokersError,
-      "")
-    manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
-    receiver.expectMsg(s"invoker0-$msg1")
-
-    // warmed container for action1 become unwarmed
+    // remove a warmed container from invoker0
     manager ! WatchEndpointRemoved(
       ContainerKeys.warmedPrefix,
       ContainerKeys.warmedContainers(
@@ -394,8 +381,56 @@ class ContainerManagerTests
         ContainerId("fake")),
       "",
       true)
-    manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
-    receiver.expectMsg(s"invoker2-$msg1")
+
+    // remove a warmed container from invoker1
+    manager ! WatchEndpointRemoved(
+      ContainerKeys.warmedPrefix,
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        InvokerInstanceId(1, userMemory = 0.bytes),
+        ContainerId("fake")),
+      "",
+      true)
+
+    // create a warmed container for action1 in from invoker1
+    manager ! WatchEndpointInserted(
+      ContainerKeys.warmedPrefix,
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        InvokerInstanceId(1, userMemory = 0.bytes),
+        ContainerId("fake")),
+      "",
+      true)
+
+    // create a warmed container for action2 in from invoker2
+    manager ! WatchEndpointInserted(
+      ContainerKeys.warmedPrefix,
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        InvokerInstanceId(2, userMemory = 0.bytes),
+        ContainerId("fake")),
+      "",
+      true)
+
+    // it should reuse 2 warmed containers
+    manager ! ContainerCreation(msgs, 256.MB, testInvocationNamespace)
+
+    // msg1 will use warmed container on invoker1, msg2 use warmed container on invoker2, msg3 use the remainder
+    receiver.expectMsg(s"invoker1-$msg1")
+    receiver.expectMsg(s"invoker2-$msg2")
+    receiver.expectMsg(s"invoker0-$msg3")
+
+    mockJobManager.expectMsgPF() {
+      case RegisterCreationJob(`msg1`) => true
+      case RegisterCreationJob(`msg2`) => true
+      case RegisterCreationJob(`msg3`) => true
+    }
   }
 
   it should "not try warmed containers if revision is unmatched" in {
@@ -521,7 +556,7 @@ class ContainerManagerTests
     })
   }
 
-  it should "choice invokers" in {
+  it should "choose invokers" in {
     val healthyInvokers: List[InvokerHealth] = List(
       InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB), Healthy),
       InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB), Healthy),
@@ -563,8 +598,8 @@ class ContainerManagerTests
     val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory)
 
     pairs.map(_.msg) should contain theSameElementsAs msgs
-    pairs.map(_.invokerId).foreach {
-      healthyInvokers.map(_.id) should contain(_)
+    pairs.flatMap(_.invokerId).foreach { invokerId =>
+      healthyInvokers.map(_.id) should contain(invokerId)
     }
   }
 
@@ -607,7 +642,7 @@ class ContainerManagerTests
     val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory)
 
     pairs.map(_.msg) should contain theSameElementsAs msgs
-    pairs.map(_.invokerId.instance).foreach {
+    pairs.map(_.invokerId.get.instance).foreach {
       healthyInvokers.map(_.id.instance) should contain(_)
     }
   }
@@ -692,21 +727,14 @@ class ContainerManagerTests
       List(msg1, msg2, msg3, msg4, msg5),
       msg1.whiskActionMetaData.limits.memory.megabytes.MB) // the memory is same for all msgs
     pairs should contain theSameElementsAs List(
-      ScheduledPair(msg1, healthyInvokers(0).id),
-      ScheduledPair(msg2, healthyInvokers(1).id),
-      ScheduledPair(msg3, healthyInvokers(2).id),
-      ScheduledPair(msg4, healthyInvokers(3).id))
-    probe.expectMsg(
-      FailedCreationJob(
-        msg5.creationId,
-        testInvocationNamespace,
-        msg5.action,
-        testRevision,
-        NoAvailableResourceInvokersError,
-        "No available invokers with resources List(fake)."))
+      ScheduledPair(msg1, Some(healthyInvokers(0).id), None),
+      ScheduledPair(msg2, Some(healthyInvokers(1).id), None),
+      ScheduledPair(msg3, Some(healthyInvokers(2).id), None),
+      ScheduledPair(msg4, Some(healthyInvokers(3).id), None),
+      ScheduledPair(msg5, None, Some(NoAvailableResourceInvokersError)))
   }
 
-  it should "choose tagged invokers when no invokers available which has no tags first" in {
+  it should "choose tagged invokers when no untagged invoker is available" in {
     val msg =
       ContainerCreationMessage(
         TransactionId.testing,
@@ -740,16 +768,9 @@ class ContainerManagerTests
     // and for msg2, it should return no available invokers
     val pairs =
       ContainerManager.schedule(healthyInvokers, List(msg, msg2), msg.whiskActionMetaData.limits.memory.megabytes.MB)
-    pairs should contain theSameElementsAs List(ScheduledPair(msg, healthyInvokers(0).id))
-
-    probe.expectMsg(
-      FailedCreationJob(
-        msg2.creationId,
-        testInvocationNamespace,
-        msg2.action,
-        testRevision,
-        NoAvailableInvokersError,
-        "No available invokers."))
+    pairs should contain theSameElementsAs List(
+      ScheduledPair(msg, Some(healthyInvokers(0).id), None),
+      ScheduledPair(msg2, None, Some(NoAvailableInvokersError)))
   }
 
   it should "respect the resource policy while use resource filter" in {
@@ -788,7 +809,7 @@ class ContainerManagerTests
         testfqn.resolve(EntityName("ns3")),
         testRevision,
         actionMetadata.copy(
-          limits = action.limits.copy(memory = MemoryLimit(512.MB)),
+          limits = action.limits.copy(memory = MemoryLimit(256.MB)),
           annotations =
             Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
               Annotations.InvokerResourcesStrictPolicyAnnotationName,
@@ -803,7 +824,7 @@ class ContainerManagerTests
         testfqn.resolve(EntityName("ns3")),
         testRevision,
         actionMetadata.copy(
-          limits = action.limits.copy(memory = MemoryLimit(512.MB)),
+          limits = action.limits.copy(memory = MemoryLimit(256.MB)),
           annotations =
             Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
               Annotations.InvokerResourcesStrictPolicyAnnotationName,
@@ -824,21 +845,13 @@ class ContainerManagerTests
     // while resourcesStrictPolicy is true, and there is no suitable invokers, return an error
     val pairs =
       ContainerManager.schedule(healthyInvokers, List(msg1), msg1.whiskActionMetaData.limits.memory.megabytes.MB)
-    pairs.size shouldBe 0
-    probe.expectMsg(
-      FailedCreationJob(
-        msg1.creationId,
-        testInvocationNamespace,
-        msg1.action,
-        testRevision,
-        NoAvailableResourceInvokersError,
-        "No available invokers with resources List(non-exist)."))
+    pairs should contain theSameElementsAs List(ScheduledPair(msg1, None, Some(NoAvailableResourceInvokersError)))
 
     // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
     // here is the invoker0
     val pairs2 =
       ContainerManager.schedule(healthyInvokers, List(msg2), msg2.whiskActionMetaData.limits.memory.megabytes.MB)
-    pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, healthyInvokers(0).id))
+    pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, Some(healthyInvokers(0).id), None))
 
     // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
     // if there is none, then choose invokers with other tags, if there is still none, return no available invokers
@@ -846,15 +859,9 @@ class ContainerManagerTests
       healthyInvokers.takeRight(1),
       List(msg3, msg4),
       msg3.whiskActionMetaData.limits.memory.megabytes.MB)
-    pairs3 should contain theSameElementsAs List(ScheduledPair(msg3, healthyInvokers(1).id))
-    probe.expectMsg(
-      FailedCreationJob(
-        msg4.creationId,
-        testInvocationNamespace,
-        msg4.action,
-        testRevision,
-        NoAvailableInvokersError,
-        "No available invokers."))
+    pairs3 should contain theSameElementsAs List(
+      ScheduledPair(msg3, Some(healthyInvokers(1).id)),
+      ScheduledPair(msg4, None, Some(NoAvailableInvokersError)))
   }
 
   it should "send FailedCreationJob to queue manager when no invokers are available" in {
@@ -900,7 +907,7 @@ class ContainerManagerTests
         msg.action,
         testRevision,
         NoAvailableInvokersError,
-        "No available invokers."))
+        NoAvailableInvokersError))
   }
 
   it should "schedule to the blackbox invoker when isBlackboxInvocation is true" in {
@@ -1155,6 +1162,285 @@ class ContainerManagerTests
       case _                                                                  => false
     }
   }
+
+  it should "choose an invoker from candidates" in {
+    val candidates = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 256 MB), Healthy),
+    )
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    // no matter how many time we schedule the msg, it should always choose invoker2.
+    (1 to 10).foreach { _ =>
+      val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg)
+      newPairs.invokerId shouldBe Some(InvokerInstanceId(2, userMemory = 256 MB))
+    }
+  }
+
+  it should "not choose an invoker when there is no candidate with enough memory" in {
+    val candidates = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 128 MB), Healthy),
+    )
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    // no matter how many time we schedule the msg, no invoker should be assigned.
+    (1 to 10).foreach { _ =>
+      val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg)
+      newPairs.invokerId shouldBe None
+    }
+  }
+
+  it should "not choose an invoker when there is no candidate" in {
+    val candidates = List()
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg)
+    newPairs.invokerId shouldBe None
+  }
+
+  it should "update invoker memory" in {
+    val invokers = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val expected = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 768 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val requiredMemory = 256.MB.toMB
+    val invokerId = Some(InvokerInstanceId(1, userMemory = 1024 MB))
+
+    val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, requiredMemory, invokers)
+
+    updatedInvokers shouldBe expected
+  }
+
+  it should "not update invoker memory when no invoker is assigned" in {
+    val invokers = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val requiredMemory = 256.MB.toMB
+
+    val updatedInvokers = ContainerManager.updateInvokerMemory(None, requiredMemory, invokers)
+
+    updatedInvokers shouldBe invokers
+  }
+
+  it should "drop an invoker with less memory than MIN_MEMORY" in {
+    val invokers = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 320 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val expected = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val requiredMemory = 256.MB.toMB
+    val invokerId = Some(InvokerInstanceId(1, userMemory = 320 MB))
+
+    val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, requiredMemory, invokers)
+
+    updatedInvokers shouldBe expected
+  }
+
+  it should "filter warmed creations when there is no warmed container" in {
+
+    val warmedContainers = Set.empty[String]
+    val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val msgs = List(msg1, msg2, msg3)
+
+    val (coldCreations, warmedCreations) =
+      ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs)
+
+    warmedCreations.isEmpty shouldBe true
+    coldCreations.size shouldBe 3
+  }
+
+  it should "filter warmed creations when there are warmed containers" in {
+    val warmedContainers = Set(
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        InvokerInstanceId(0, userMemory = 0.bytes),
+        ContainerId("fake")),
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        InvokerInstanceId(1, userMemory = 0.bytes),
+        ContainerId("fake")))
+    val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val msgs = List(msg1, msg2, msg3)
+
+    val (coldCreations, warmedCreations) =
+      ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs)
+
+    warmedCreations.size shouldBe 2
+    coldCreations.size shouldBe 1
+
+    warmedCreations.map(_._1).contains(msg1) shouldBe true
+    warmedCreations.map(_._1).contains(msg2) shouldBe true
+    coldCreations.map(_._1).contains(msg3) shouldBe true
+  }
+
+  it should "choose cold creation when warmed containers are in disabled invokers" in {
+    val warmedContainers = Set(
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        InvokerInstanceId(0, userMemory = 0.bytes),
+        ContainerId("fake")),
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        InvokerInstanceId(1, userMemory = 0.bytes),
+        ContainerId("fake")))
+    val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val msgs = List(msg1, msg2, msg3)
+
+    // unhealthy invokers should not be chosen even if they have warmed containers
+    val invokers: List[InvokerHealth] = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024.MB, tags = Seq.empty[String]), Healthy))
+
+    val (coldCreations, _) =
+      ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs)
+
+    coldCreations.size shouldBe 3
+    coldCreations.map(_._1).containsSlice(List(msg1, msg2, msg3)) shouldBe true
+  }
 }
 
 @RunWith(classOf[JUnitRunner])