You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/08/22 05:08:10 UTC
[openwhisk] branch master updated: Exclude warmed containers in disabled invokers. (#5313)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e36b2d8c0 Exclude warmed containers in disabled invokers. (#5313)
e36b2d8c0 is described below
commit e36b2d8c0cd3ef362ea0f6220d9af0371acd10c2
Author: Dominic Kim <st...@apache.org>
AuthorDate: Mon Aug 22 14:08:03 2022 +0900
Exclude warmed containers in disabled invokers. (#5313)
* Exclude warmed containers in disabled invokers.
* Exclude warmed containers in disabled invokers.
* Find the first warmed container.
* Remove the code added by mistake.
* Add more logs for error cases.
---
.../apache/openwhisk/core/connector/Message.scala | 17 +
.../core/invoker/FPCInvokerReactive.scala | 16 +-
.../scheduler/container/ContainerManager.scala | 383 ++++++++++-------
.../container/test/ContainerManagerTests.scala | 464 +++++++++++++++++----
4 files changed, 627 insertions(+), 253 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index d3c550eb2..b65b11496 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -636,6 +636,23 @@ object ContainerMessage extends DefaultJsonProtocol {
sealed trait ContainerCreationError
object ContainerCreationError extends Enumeration {
+ import scala.language.implicitConversions
+ implicit def containerCreationErrorToString(x: ContainerCreationError): String = {
+ x match {
+ case NoAvailableInvokersError => "no available invoker is found"
+ case NoAvailableResourceInvokersError => "no available invoker with the resources is found: "
+ case ResourceNotEnoughError => "invoker(s) have not enough resources"
+ case WhiskError => "whisk error(recoverable) happens"
+ case UnknownError => "a unknown error happens"
+ case TimeoutError => "a timeout error happens"
+ case ShuttingDownError => "shutting down error happens"
+ case NonExecutableActionError => "no executable found for the action"
+ case DBFetchError => "an error happens while fetching data from DB"
+ case BlackBoxError => "a blackbox error happens"
+ case ZeroNamespaceLimit => "the namespace has 0 limit configured"
+ case TooManyConcurrentRequests => "too many concurrent requests are in flight."
+ }
+ }
case object NoAvailableInvokersError extends ContainerCreationError
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index e0393c056..e38c39701 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -376,18 +376,6 @@ class FPCInvokerReactive(config: WhiskConfig,
override def enable(): Route = {
invokerHealthManager ! Enable
pool ! Enable
- // re-enable consumer
- if (consumer.isEmpty)
- consumer = Some(
- new ContainerMessageConsumer(
- instance,
- pool,
- entityStore,
- cfg,
- msgProvider,
- longPollDuration = 1.second,
- maxPeek,
- sendAckToScheduler))
warmUp()
complete("Success enable invoker")
}
@@ -395,15 +383,13 @@ class FPCInvokerReactive(config: WhiskConfig,
override def disable(): Route = {
invokerHealthManager ! GracefulShutdown
pool ! GracefulShutdown
- consumer.foreach(_.close())
- consumer = None
warmUpWatcher.foreach(_.close())
warmUpWatcher = None
complete("Successfully disabled invoker")
}
override def isEnabled(): Route = {
- complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).serialize())
+ complete(InvokerEnabled(warmUpWatcher.nonEmpty).serialize())
}
override def backfillPrewarm(): Route = {
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
index 837045ed2..038dbe09c 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -16,62 +16,44 @@
*/
package org.apache.openwhisk.core.scheduler.container
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.ThreadLocalRandom
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.event.Logging.InfoLevel
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
-import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector.ContainerCreationError.{
+ containerCreationErrorToString,
NoAvailableInvokersError,
NoAvailableResourceInvokersError
}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity.size._
-import org.apache.openwhisk.core.entity.{
- Annotations,
- ByteSize,
- DocRevision,
- FullyQualifiedEntityName,
- InvokerInstanceId,
- MemoryLimit,
- SchedulerInstanceId
-}
+import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys}
import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.scheduler.Scheduler
-import org.apache.openwhisk.core.scheduler.message.{
- ContainerCreation,
- ContainerDeletion,
- ContainerKeyMeta,
- CreationJobState,
- FailedCreationJob,
- RegisterCreationJob,
- ReschedulingCreationJob,
- SuccessfulCreationJob
-}
+import org.apache.openwhisk.core.scheduler.container.ContainerManager.{sendState, updateInvokerMemory}
+import org.apache.openwhisk.core.scheduler.message._
import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool}
-import org.apache.openwhisk.core.service.{
- DeleteEvent,
- PutEvent,
- UnwatchEndpoint,
- WatchEndpoint,
- WatchEndpointInserted,
- WatchEndpointRemoved
-}
+import org.apache.openwhisk.core.service._
import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
-import pureconfig.generic.auto._
import pureconfig.loadConfigOrThrow
import spray.json.DefaultJsonProtocol._
+import pureconfig.generic.auto._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.ThreadLocalRandom
+import scala.annotation.tailrec
import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.collection.concurrent.TrieMap
+import scala.collection.immutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
-case class ScheduledPair(msg: ContainerCreationMessage, invokerId: InvokerInstanceId)
+case class ScheduledPair(msg: ContainerCreationMessage,
+ invokerId: Option[InvokerInstanceId],
+ err: Option[ContainerCreationError] = None)
case class BlackboxFractionConfig(managedFraction: Double, blackboxFraction: Double)
@@ -157,43 +139,64 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
case _ =>
}
- private def createContainer(msgs: List[ContainerCreationMessage],
- memory: ByteSize,
- invocationNamespace: String): Unit = {
+ private def createContainer(msgs: List[ContainerCreationMessage], memory: ByteSize, invocationNamespace: String)(
+ implicit logging: Logging): Unit = {
logging.info(this, s"received ${msgs.size} creation message [${msgs.head.invocationNamespace}:${msgs.head.action}]")
- val coldCreations = filterWarmedCreations(msgs)
- if (coldCreations.nonEmpty)
- ContainerManager
- .getAvailableInvokers(etcdClient, memory, invocationNamespace)
- .flatMap { invokers =>
- if (invokers.isEmpty) {
- coldCreations.foreach { msg =>
- ContainerManager.sendState(
- FailedCreationJob(
- msg.creationId,
- msg.invocationNamespace,
- msg.action,
- msg.revision,
- NoAvailableInvokersError,
- s"No available invokers."))
- }
- Future.failed(NoCapacityException("No available invokers."))
- } else {
- coldCreations.foreach { msg =>
- creationJobManager ! RegisterCreationJob(msg)
- }
+ ContainerManager
+ .getAvailableInvokers(etcdClient, memory, invocationNamespace)
+ .foreach { invokers =>
+ if (invokers.isEmpty) {
+ logging.error(this, "there is no available invoker to schedule.")
+ msgs.foreach(ContainerManager.sendState(_, NoAvailableInvokersError, NoAvailableInvokersError))
+ } else {
+ val (coldCreations, warmedCreations) =
+ ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs)
+
+ // handle warmed creation
+ val chosenInvokers: immutable.Seq[Option[(Int, ContainerCreationMessage)]] = warmedCreations.map {
+ warmedCreation =>
+ // update the in-progress map for warmed containers.
+ // even if it is done in the filterWarmedCreations method, it is still necessary to apply the change to the original map.
+ warmedCreation._3.foreach(inProgressWarmedContainers.update(warmedCreation._1.creationId.asString, _))
+
+ // send creation message to the target invoker.
+ warmedCreation._2 map { chosenInvoker =>
+ val msg = warmedCreation._1
+ creationJobManager ! RegisterCreationJob(msg)
+ sendCreationContainerToInvoker(messagingProducer, chosenInvoker, msg)
+ (chosenInvoker, msg)
+ }
+ }
- Future {
- ContainerManager
- .schedule(invokers, coldCreations, memory)
- .map { pair =>
- sendCreationContainerToInvoker(messagingProducer, pair.invokerId.toInt, pair.msg)
- }
+ // update the resource usage of invokers to apply changes from warmed creations.
+ val updatedInvokers = chosenInvokers.foldLeft(invokers) { (invokers, chosenInvoker) =>
+ chosenInvoker match {
+ case Some((chosenInvoker, msg)) =>
+ updateInvokerMemory(chosenInvoker, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+ case err =>
+ // this is not supposed to happen.
+ logging.error(this, s"warmed creation is scheduled but no invoker is chosen: $err")
+ invokers
}
- }.andThen {
- case Failure(t) => logging.warn(this, s"Failed to create container caused by: $t")
}
+
+ // handle cold creations
+ ContainerManager
+ .schedule(updatedInvokers, coldCreations.map(_._1), memory)
+ .map { pair =>
+ pair.invokerId match {
+ // an invoker is assigned for the msg
+ case Some(instanceId) =>
+ creationJobManager ! RegisterCreationJob(pair.msg)
+ sendCreationContainerToInvoker(messagingProducer, instanceId.instance, pair.msg)
+
+ // if a chosen invoker does not exist, it means it failed to find a matching invoker for the msg.
+ case _ =>
+ pair.err.foreach(error => sendState(pair.msg, error, error))
+ }
+ }
}
+ }
}
private def getInvokersWithOldContainer(invocationNamespace: String,
@@ -248,31 +251,6 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
ContainerKeyMeta(revision, invokerId, containerId)
}
- // Filter out messages which can use warmed container
- private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
- msgs.filter { msg =>
- val warmedPrefix =
- containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
- val chosenInvoker = warmedContainers
- .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
- .find { container =>
- if (container.startsWith(warmedPrefix)) {
- logging.info(this, s"Choose a warmed container $container")
- inProgressWarmedContainers.update(msg.creationId.asString, container)
- true
- } else
- false
- }
- .map(_.split("/").takeRight(3).apply(0))
- if (chosenInvoker.nonEmpty) {
- creationJobManager ! RegisterCreationJob(msg)
- sendCreationContainerToInvoker(messagingProducer, chosenInvoker.get.toInt, msg)
- false
- } else
- true
- }
- }
-
private def sendCreationContainerToInvoker(producer: MessageProducer,
invoker: Int,
msg: ContainerCreationMessage): Future[ResultMetadata] = {
@@ -360,6 +338,87 @@ object ContainerManager {
*/
def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
+ // Partition messages that can use warmed containers.
+ // return: (list of messages that cannot use warmed containers, list of messages that can take advantage of warmed containers)
+ protected[container] def filterWarmedCreations(warmedContainers: Set[String],
+ inProgressWarmedContainers: TrieMap[String, String],
+ invokers: List[InvokerHealth],
+ msgs: List[ContainerCreationMessage])(
+ implicit logging: Logging): (List[(ContainerCreationMessage, Option[Int], Option[String])],
+ List[(ContainerCreationMessage, Option[Int], Option[String])]) = {
+ val warmedApplied = msgs.map { msg =>
+ val warmedPrefix =
+ containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
+ val container = warmedContainers
+ .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
+ .find { container =>
+ if (container.startsWith(warmedPrefix)) {
+ logging.info(this, s"Choose a warmed container $container")
+
+ // this is required to exclude already chosen invokers
+ inProgressWarmedContainers.update(msg.creationId.asString, container)
+ true
+ } else
+ false
+ }
+
+ // chosenInvoker is supposed to have only one item
+ val chosenInvoker = container
+ .map(_.split("/").takeRight(3).apply(0))
+ // filter warmed containers in disabled invokers
+ .filter(
+ invoker =>
+ invokers
+ // filterWarmedCreations method is supposed to receive healthy invokers only but this will make sure again only healthy invokers are used.
+ .filter(invoker => invoker.status.isUsable)
+ .map(_.id.instance)
+ .contains(invoker.toInt))
+
+ if (chosenInvoker.nonEmpty && container.nonEmpty) {
+ (msg, Some(chosenInvoker.get.toInt), Some(container.get))
+ } else
+ (msg, None, None)
+ }
+
+ warmedApplied.partition { item =>
+ if (item._2.nonEmpty) false
+ else true
+ }
+ }
+
+ protected[container] def updateInvokerMemory(invokerId: Int,
+ requiredMemory: Long,
+ invokers: List[InvokerHealth]): List[InvokerHealth] = {
+ // it must be compared to the instance unique id
+ val index = invokers.indexOf(invokers.filter(p => p.id.instance == invokerId).head)
+ val invoker = invokers(index)
+
+ // if the invoker has less than minimum memory, drop it from the list.
+ if (invoker.id.userMemory.toMB - requiredMemory < MemoryLimit.MIN_MEMORY.toMB) {
+ // drop the nth element
+ val split = invokers.splitAt(index)
+ val _ :: t1 = split._2
+ split._1 ::: t1
+ } else {
+ invokers.updated(
+ index,
+ invoker.copy(id = invoker.id.copy(userMemory = invoker.id.userMemory - requiredMemory.MB)))
+ }
+ }
+
+ protected[container] def updateInvokerMemory(invokerId: Option[InvokerInstanceId],
+ requiredMemory: Long,
+ invokers: List[InvokerHealth]): List[InvokerHealth] = {
+ invokerId match {
+ case Some(instanceId) =>
+ updateInvokerMemory(instanceId.instance, requiredMemory, invokers)
+
+ case None =>
+ // do nothing
+ invokers
+ }
+ }
+
/**
* Assign an invoker to a message
*
@@ -395,64 +454,92 @@ object ContainerManager {
val resourcesStrictPolicy = msg.whiskActionMetaData.annotations
.getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName)
.getOrElse(true)
- val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.map(_.exec.pull).getOrElse(false)
+ val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.exists(_.exec.pull)
if (requiredResources.isEmpty) {
// only choose managed invokers or blackbox invokers
val wantedInvokers = if (isBlackboxInvocation) {
- candidates.filter(c => blackboxInvokers.map(b => b.id.instance).contains(c.id.instance)).toSet
+ logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for blackbox invokers to schedule.")
+ candidates
+ .filter(
+ c =>
+ blackboxInvokers
+ .map(b => b.id.instance)
+ .contains(c.id.instance) && c.id.userMemory.toMB >= msg.whiskActionMetaData.limits.memory.megabytes)
+ .toSet
} else {
- candidates.filter(c => managedInvokers.map(m => m.id.instance).contains(c.id.instance)).toSet
+ logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for managed invokers to schedule.")
+ candidates
+ .filter(
+ c =>
+ managedInvokers
+ .map(m => m.id.instance)
+ .contains(c.id.instance) && c.id.userMemory.toMB >= msg.whiskActionMetaData.limits.memory.megabytes)
+ .toSet
}
val taggedInvokers = candidates.filter(_.id.tags.nonEmpty)
if (wantedInvokers.nonEmpty) {
- chooseInvokerFromCandidates(wantedInvokers.toList, invokers, pairs, msg)
+ val scheduledPair = chooseInvokerFromCandidates(wantedInvokers.toList, msg)
+ val updatedInvokers =
+ updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+ (scheduledPair :: pairs, updatedInvokers)
} else if (taggedInvokers.nonEmpty) { // if not found from the wanted invokers, choose tagged invokers then
- chooseInvokerFromCandidates(taggedInvokers, invokers, pairs, msg)
+ logging.info(
+ this,
+ s"[${msg.invocationNamespace}/${msg.action}] since there is no available non-tagged invoker, choose one among tagged invokers.")
+ val scheduledPair = chooseInvokerFromCandidates(taggedInvokers, msg)
+ val updatedInvokers =
+ updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+ (scheduledPair :: pairs, updatedInvokers)
} else {
- sendState(
- FailedCreationJob(
- msg.creationId,
- msg.invocationNamespace,
- msg.action,
- msg.revision,
- NoAvailableInvokersError,
- s"No available invokers."))
- (pairs, candidates)
+ logging.error(
+ this,
+ s"[${msg.invocationNamespace}/${msg.action}] there is no invoker available to schedule to schedule.")
+ val scheduledPair =
+ ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError))
+ (scheduledPair :: pairs, invokers)
}
} else {
+ logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for tagged invokers to schedule.")
val wantedInvokers = candidates.filter(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
if (wantedInvokers.nonEmpty) {
- chooseInvokerFromCandidates(wantedInvokers, invokers, pairs, msg)
+ val scheduledPair = chooseInvokerFromCandidates(wantedInvokers, msg)
+ val updatedInvokers =
+ updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+ (scheduledPair :: pairs, updatedInvokers)
} else if (resourcesStrictPolicy) {
- sendState(
- FailedCreationJob(
- msg.creationId,
- msg.invocationNamespace,
- msg.action,
- msg.revision,
- NoAvailableResourceInvokersError,
- s"No available invokers with resources $requiredResources."))
- (pairs, candidates)
+ logging.error(
+ this,
+ s"[${msg.invocationNamespace}/${msg.action}] there is no available invoker with the resource: ${requiredResources}")
+ val scheduledPair =
+ ScheduledPair(msg, invokerId = None, Some(NoAvailableResourceInvokersError))
+ (scheduledPair :: pairs, invokers)
} else {
+ logging.info(
+ this,
+ s"[${msg.invocationNamespace}/${msg.action}] since there is no available invoker with the resource, choose any invokers without the resource.")
val (noTaggedInvokers, taggedInvokers) = candidates.partition(_.id.tags.isEmpty)
if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first
- chooseInvokerFromCandidates(noTaggedInvokers, invokers, pairs, msg)
+ val scheduledPair = chooseInvokerFromCandidates(noTaggedInvokers, msg)
+ val updatedInvokers =
+ updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+ (scheduledPair :: pairs, updatedInvokers)
} else {
val leftInvokers =
taggedInvokers.filterNot(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
- if (leftInvokers.nonEmpty)
- chooseInvokerFromCandidates(leftInvokers, invokers, pairs, msg)
- else {
- sendState(
- FailedCreationJob(
- msg.creationId,
- msg.invocationNamespace,
- msg.action,
- msg.revision,
- NoAvailableInvokersError,
- s"No available invokers."))
- (pairs, candidates)
+ if (leftInvokers.nonEmpty) {
+ val scheduledPair = chooseInvokerFromCandidates(leftInvokers, msg)
+ val updatedInvokers =
+ updateInvokerMemory(
+ scheduledPair.invokerId,
+ msg.whiskActionMetaData.limits.memory.megabytes,
+ invokers)
+ (scheduledPair :: pairs, updatedInvokers)
+ } else {
+ logging.error(this, s"[${msg.invocationNamespace}/${msg.action}] no available invoker is found")
+ val scheduledPair =
+ ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError))
+ (scheduledPair :: pairs, invokers)
}
}
}
@@ -462,32 +549,30 @@ object ContainerManager {
list
}
- private def chooseInvokerFromCandidates(
- candidates: List[InvokerHealth],
- wholeInvokers: List[InvokerHealth],
- pairs: List[ScheduledPair],
- msg: ContainerCreationMessage)(implicit logging: Logging): (List[ScheduledPair], List[InvokerHealth]) = {
- val idx = rng(mod = candidates.size)
- val instance = candidates(idx)
- // it must be compared to the instance unique id
- val idxInWhole = wholeInvokers.indexOf(wholeInvokers.filter(p => p.id.instance == instance.id.instance).head)
- val requiredMemory = msg.whiskActionMetaData.limits.memory.megabytes
- val updated =
- if (instance.id.userMemory.toMB - requiredMemory >= requiredMemory) { // Since ByteSize is negative, it converts to long type and compares.
- wholeInvokers.updated(
- idxInWhole,
- instance.copy(id = instance.id.copy(userMemory = instance.id.userMemory - requiredMemory.MB)))
+ @tailrec
+ protected[container] def chooseInvokerFromCandidates(candidates: List[InvokerHealth], msg: ContainerCreationMessage)(
+ implicit logging: Logging): ScheduledPair = {
+ val requiredMemory = msg.whiskActionMetaData.limits.memory
+ if (candidates.isEmpty) {
+ ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError))
+ } else if (candidates.forall(p => p.id.userMemory.toMB < requiredMemory.megabytes)) {
+ ScheduledPair(msg, invokerId = None, Some(NoAvailableResourceInvokersError))
+ } else {
+ val idx = rng(mod = candidates.size)
+ val instance = candidates(idx)
+ if (instance.id.userMemory.toMB < requiredMemory.megabytes) {
+ val split = candidates.splitAt(idx)
+ val _ :: t1 = split._2
+ chooseInvokerFromCandidates(split._1 ::: t1, msg)
} else {
- // drop the nth element
- val split = wholeInvokers.splitAt(idxInWhole)
- val _ :: t = split._2
- split._1 ::: t
+ ScheduledPair(msg, invokerId = Some(instance.id))
}
-
- (ScheduledPair(msg, instance.id) :: pairs, updated)
+ }
}
- private def sendState(state: CreationJobState)(implicit logging: Logging): Unit = {
+ private def sendState(msg: ContainerCreationMessage, err: ContainerCreationError, reason: String)(
+ implicit logging: Logging): Unit = {
+ val state = FailedCreationJob(msg.creationId, msg.invocationNamespace, msg.action, msg.revision, err, reason)
QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match {
case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
memoryQueueValue.queue ! state
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
index bf7f14d58..d1f43270b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -39,15 +39,16 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys}
import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
-import org.apache.openwhisk.core.scheduler.container._
+import org.apache.openwhisk.core.scheduler.container.{ScheduledPair, _}
import org.apache.openwhisk.core.scheduler.message.{
ContainerCreation,
ContainerDeletion,
FailedCreationJob,
RegisterCreationJob,
- ReschedulingCreationJob,
- SuccessfulCreationJob
+ ReschedulingCreationJob
}
+
+import scala.language.postfixOps
import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
import org.apache.openwhisk.core.service.{WatchEndpointInserted, WatchEndpointRemoved}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
@@ -60,6 +61,7 @@ import pureconfig.loadConfigOrThrow
import spray.json.{JsArray, JsBoolean, JsString}
import pureconfig.generic.auto._
+import scala.collection.concurrent.TrieMap
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{FiniteDuration, _}
@@ -182,8 +184,8 @@ class ContainerManagerTests
val msg = InvokerResourceMessage(
invoker.status.asString,
invoker.id.userMemory.toMB,
- invoker.id.userMemory.toMB,
- invoker.id.userMemory.toMB,
+ invoker.id.busyMemory.getOrElse(0.MB).toMB,
+ 0,
invoker.id.tags,
invoker.id.dedicatedNamespaces)
@@ -271,16 +273,22 @@ class ContainerManagerTests
it should "try warmed containers first" in {
val mockEtcd = mock[EtcdClient]
- // for test, only invoker2 is healthy, so that no-warmed creations can be only sent to invoker2
+ // at first, invoker states look like this.
val invokers: List[InvokerHealth] = List(
- InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
- InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
- InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 512.MB, tags = Seq.empty[String]), Healthy),
)
- expectGetInvokers(mockEtcd, invokers)
- expectGetInvokers(mockEtcd, invokers)
- expectGetInvokers(mockEtcd, invokers)
- expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` for 3 times, and another one for warmup
+
+ // after then, invoker states changes like this.
+ val updatedInvokers: List[InvokerHealth] = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+ )
+ expectGetInvokers(mockEtcd, invokers) // for warm up
+ expectGetInvokers(mockEtcd, invokers) // for first creation
+ expectGetInvokers(mockEtcd, updatedInvokers) // for second creation
val mockJobManager = TestProbe()
val mockWatcher = TestProbe()
@@ -294,7 +302,7 @@ class ContainerManagerTests
system.actorOf(ContainerManager
.props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
- // there are 1 warmed container for `test-namespace/test-action` and 1 for `test-namespace/test-action-2`
+ // Add warmed containers for action1 and action2 in invoker0 and invoker1 respectively
manager ! WatchEndpointInserted(
ContainerKeys.warmedPrefix,
ContainerKeys.warmedContainers(
@@ -349,9 +357,9 @@ class ContainerManagerTests
val msgs = List(msg1, msg2, msg3)
// it should reuse 2 warmed containers
- manager ! ContainerCreation(msgs, 128.MB, testInvocationNamespace)
+ manager ! ContainerCreation(msgs, 256.MB, testInvocationNamespace)
- // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the healthy invoker
+ // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the remainder
receiver.expectMsg(s"invoker0-$msg1")
receiver.expectMsg(s"invoker1-$msg2")
receiver.expectMsg(s"invoker2-$msg3")
@@ -362,28 +370,7 @@ class ContainerManagerTests
case RegisterCreationJob(`msg3`) => true
}
- // now warmed container for action2 become warmed again
- manager ! SuccessfulCreationJob(msg2.creationId, msg2.invocationNamespace, msg2.action, msg2.revision)
- manager ! SuccessfulCreationJob(msg3.creationId, msg3.invocationNamespace, msg3.action, msg3.revision)
- // it still need to use invoker2
- manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
- receiver.expectMsg(s"invoker2-$msg1")
- // it will use warmed container on invoker1
- manager ! ContainerCreation(List(msg2), 128.MB, testInvocationNamespace)
- receiver.expectMsg(s"invoker1-$msg2")
-
- // warmed container for action1 become warmed when received FailedCreationJob
- manager ! FailedCreationJob(
- msg1.creationId,
- msg1.invocationNamespace,
- msg1.action,
- msg1.revision,
- NoAvailableResourceInvokersError,
- "")
- manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
- receiver.expectMsg(s"invoker0-$msg1")
-
- // warmed container for action1 become unwarmed
+ // remove a warmed container from invoker0
manager ! WatchEndpointRemoved(
ContainerKeys.warmedPrefix,
ContainerKeys.warmedContainers(
@@ -394,8 +381,56 @@ class ContainerManagerTests
ContainerId("fake")),
"",
true)
- manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
- receiver.expectMsg(s"invoker2-$msg1")
+
+ // remove a warmed container from invoker1
+ manager ! WatchEndpointRemoved(
+ ContainerKeys.warmedPrefix,
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn.copy(name = EntityName("test-action-2")),
+ testRevision,
+ InvokerInstanceId(1, userMemory = 0.bytes),
+ ContainerId("fake")),
+ "",
+ true)
+
+ // create a warmed container for action1 in from invoker1
+ manager ! WatchEndpointInserted(
+ ContainerKeys.warmedPrefix,
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ InvokerInstanceId(1, userMemory = 0.bytes),
+ ContainerId("fake")),
+ "",
+ true)
+
+ // create a warmed container for action2 in from invoker2
+ manager ! WatchEndpointInserted(
+ ContainerKeys.warmedPrefix,
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn.copy(name = EntityName("test-action-2")),
+ testRevision,
+ InvokerInstanceId(2, userMemory = 0.bytes),
+ ContainerId("fake")),
+ "",
+ true)
+
+ // it should reuse 2 warmed containers
+ manager ! ContainerCreation(msgs, 256.MB, testInvocationNamespace)
+
+ // msg1 will use warmed container on invoker1, msg2 use warmed container on invoker2, msg3 use the remainder
+ receiver.expectMsg(s"invoker1-$msg1")
+ receiver.expectMsg(s"invoker2-$msg2")
+ receiver.expectMsg(s"invoker0-$msg3")
+
+ mockJobManager.expectMsgPF() {
+ case RegisterCreationJob(`msg1`) => true
+ case RegisterCreationJob(`msg2`) => true
+ case RegisterCreationJob(`msg3`) => true
+ }
}
it should "not try warmed containers if revision is unmatched" in {
@@ -521,7 +556,7 @@ class ContainerManagerTests
})
}
- it should "choice invokers" in {
+ it should "choose invokers" in {
val healthyInvokers: List[InvokerHealth] = List(
InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB), Healthy),
InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB), Healthy),
@@ -563,8 +598,8 @@ class ContainerManagerTests
val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory)
pairs.map(_.msg) should contain theSameElementsAs msgs
- pairs.map(_.invokerId).foreach {
- healthyInvokers.map(_.id) should contain(_)
+ pairs.flatMap(_.invokerId).foreach { invokerId =>
+ healthyInvokers.map(_.id) should contain(invokerId)
}
}
@@ -607,7 +642,7 @@ class ContainerManagerTests
val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory)
pairs.map(_.msg) should contain theSameElementsAs msgs
- pairs.map(_.invokerId.instance).foreach {
+ pairs.map(_.invokerId.get.instance).foreach {
healthyInvokers.map(_.id.instance) should contain(_)
}
}
@@ -692,21 +727,14 @@ class ContainerManagerTests
List(msg1, msg2, msg3, msg4, msg5),
msg1.whiskActionMetaData.limits.memory.megabytes.MB) // the memory is same for all msgs
pairs should contain theSameElementsAs List(
- ScheduledPair(msg1, healthyInvokers(0).id),
- ScheduledPair(msg2, healthyInvokers(1).id),
- ScheduledPair(msg3, healthyInvokers(2).id),
- ScheduledPair(msg4, healthyInvokers(3).id))
- probe.expectMsg(
- FailedCreationJob(
- msg5.creationId,
- testInvocationNamespace,
- msg5.action,
- testRevision,
- NoAvailableResourceInvokersError,
- "No available invokers with resources List(fake)."))
+ ScheduledPair(msg1, Some(healthyInvokers(0).id), None),
+ ScheduledPair(msg2, Some(healthyInvokers(1).id), None),
+ ScheduledPair(msg3, Some(healthyInvokers(2).id), None),
+ ScheduledPair(msg4, Some(healthyInvokers(3).id), None),
+ ScheduledPair(msg5, None, Some(NoAvailableResourceInvokersError)))
}
- it should "choose tagged invokers when no invokers available which has no tags first" in {
+ it should "choose tagged invokers when no untagged invoker is available" in {
val msg =
ContainerCreationMessage(
TransactionId.testing,
@@ -740,16 +768,9 @@ class ContainerManagerTests
// and for msg2, it should return no available invokers
val pairs =
ContainerManager.schedule(healthyInvokers, List(msg, msg2), msg.whiskActionMetaData.limits.memory.megabytes.MB)
- pairs should contain theSameElementsAs List(ScheduledPair(msg, healthyInvokers(0).id))
-
- probe.expectMsg(
- FailedCreationJob(
- msg2.creationId,
- testInvocationNamespace,
- msg2.action,
- testRevision,
- NoAvailableInvokersError,
- "No available invokers."))
+ pairs should contain theSameElementsAs List(
+ ScheduledPair(msg, Some(healthyInvokers(0).id), None),
+ ScheduledPair(msg2, None, Some(NoAvailableInvokersError)))
}
it should "respect the resource policy while use resource filter" in {
@@ -788,7 +809,7 @@ class ContainerManagerTests
testfqn.resolve(EntityName("ns3")),
testRevision,
actionMetadata.copy(
- limits = action.limits.copy(memory = MemoryLimit(512.MB)),
+ limits = action.limits.copy(memory = MemoryLimit(256.MB)),
annotations =
Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
Annotations.InvokerResourcesStrictPolicyAnnotationName,
@@ -803,7 +824,7 @@ class ContainerManagerTests
testfqn.resolve(EntityName("ns3")),
testRevision,
actionMetadata.copy(
- limits = action.limits.copy(memory = MemoryLimit(512.MB)),
+ limits = action.limits.copy(memory = MemoryLimit(256.MB)),
annotations =
Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
Annotations.InvokerResourcesStrictPolicyAnnotationName,
@@ -824,21 +845,13 @@ class ContainerManagerTests
// while resourcesStrictPolicy is true, and there is no suitable invokers, return an error
val pairs =
ContainerManager.schedule(healthyInvokers, List(msg1), msg1.whiskActionMetaData.limits.memory.megabytes.MB)
- pairs.size shouldBe 0
- probe.expectMsg(
- FailedCreationJob(
- msg1.creationId,
- testInvocationNamespace,
- msg1.action,
- testRevision,
- NoAvailableResourceInvokersError,
- "No available invokers with resources List(non-exist)."))
+ pairs should contain theSameElementsAs List(ScheduledPair(msg1, None, Some(NoAvailableResourceInvokersError)))
// while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
// here is the invoker0
val pairs2 =
ContainerManager.schedule(healthyInvokers, List(msg2), msg2.whiskActionMetaData.limits.memory.megabytes.MB)
- pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, healthyInvokers(0).id))
+ pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, Some(healthyInvokers(0).id), None))
// while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
// if there is none, then choose invokers with other tags, if there is still none, return no available invokers
@@ -846,15 +859,9 @@ class ContainerManagerTests
healthyInvokers.takeRight(1),
List(msg3, msg4),
msg3.whiskActionMetaData.limits.memory.megabytes.MB)
- pairs3 should contain theSameElementsAs List(ScheduledPair(msg3, healthyInvokers(1).id))
- probe.expectMsg(
- FailedCreationJob(
- msg4.creationId,
- testInvocationNamespace,
- msg4.action,
- testRevision,
- NoAvailableInvokersError,
- "No available invokers."))
+ pairs3 should contain theSameElementsAs List(
+ ScheduledPair(msg3, Some(healthyInvokers(1).id)),
+ ScheduledPair(msg4, None, Some(NoAvailableInvokersError)))
}
it should "send FailedCreationJob to queue manager when no invokers are available" in {
@@ -900,7 +907,7 @@ class ContainerManagerTests
msg.action,
testRevision,
NoAvailableInvokersError,
- "No available invokers."))
+ NoAvailableInvokersError))
}
it should "schedule to the blackbox invoker when isBlackboxInvocation is true" in {
@@ -1155,6 +1162,285 @@ class ContainerManagerTests
case _ => false
}
}
+
+ it should "choose an invoker from candidates" in {
+ val candidates = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 256 MB), Healthy),
+ )
+ val msg = ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ // no matter how many time we schedule the msg, it should always choose invoker2.
+ (1 to 10).foreach { _ =>
+ val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg)
+ newPairs.invokerId shouldBe Some(InvokerInstanceId(2, userMemory = 256 MB))
+ }
+ }
+
+ it should "not choose an invoker when there is no candidate with enough memory" in {
+ val candidates = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 128 MB), Healthy),
+ )
+ val msg = ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ // no matter how many time we schedule the msg, no invoker should be assigned.
+ (1 to 10).foreach { _ =>
+ val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg)
+ newPairs.invokerId shouldBe None
+ }
+ }
+
+ it should "not choose an invoker when there is no candidate" in {
+ val candidates = List()
+ val msg = ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg)
+ newPairs.invokerId shouldBe None
+ }
+
+ it should "update invoker memory" in {
+ val invokers = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+ )
+ val expected = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 768 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+ )
+ val requiredMemory = 256.MB.toMB
+ val invokerId = Some(InvokerInstanceId(1, userMemory = 1024 MB))
+
+ val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, requiredMemory, invokers)
+
+ updatedInvokers shouldBe expected
+ }
+
+ it should "not update invoker memory when no invoker is assigned" in {
+ val invokers = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+ )
+ val requiredMemory = 256.MB.toMB
+
+ val updatedInvokers = ContainerManager.updateInvokerMemory(None, requiredMemory, invokers)
+
+ updatedInvokers shouldBe invokers
+ }
+
+ it should "drop an invoker with less memory than MIN_MEMORY" in {
+ val invokers = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 320 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+ )
+ val expected = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+ )
+ val requiredMemory = 256.MB.toMB
+ val invokerId = Some(InvokerInstanceId(1, userMemory = 320 MB))
+
+ val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, requiredMemory, invokers)
+
+ updatedInvokers shouldBe expected
+ }
+
+ it should "filter warmed creations when there is no warmed container" in {
+
+ val warmedContainers = Set.empty[String]
+ val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg3 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ val msgs = List(msg1, msg2, msg3)
+
+ val (coldCreations, warmedCreations) =
+ ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs)
+
+ warmedCreations.isEmpty shouldBe true
+ coldCreations.size shouldBe 3
+ }
+
+ it should "filter warmed creations when there are warmed containers" in {
+ val warmedContainers = Set(
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ InvokerInstanceId(0, userMemory = 0.bytes),
+ ContainerId("fake")),
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn.copy(name = EntityName("test-action-2")),
+ testRevision,
+ InvokerInstanceId(1, userMemory = 0.bytes),
+ ContainerId("fake")))
+ val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.copy(name = EntityName("test-action-2")),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg3 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ val msgs = List(msg1, msg2, msg3)
+
+ val (coldCreations, warmedCreations) =
+ ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs)
+
+ warmedCreations.size shouldBe 2
+ coldCreations.size shouldBe 1
+
+ warmedCreations.map(_._1).contains(msg1) shouldBe true
+ warmedCreations.map(_._1).contains(msg2) shouldBe true
+ coldCreations.map(_._1).contains(msg3) shouldBe true
+ }
+
+ it should "choose cold creation when warmed containers are in disabled invokers" in {
+ val warmedContainers = Set(
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ InvokerInstanceId(0, userMemory = 0.bytes),
+ ContainerId("fake")),
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn.copy(name = EntityName("test-action-2")),
+ testRevision,
+ InvokerInstanceId(1, userMemory = 0.bytes),
+ ContainerId("fake")))
+ val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.copy(name = EntityName("test-action-2")),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg3 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ val msgs = List(msg1, msg2, msg3)
+
+ // unhealthy invokers should not be chosen even if they have warmed containers
+ val invokers: List[InvokerHealth] = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 1024.MB, tags = Seq.empty[String]), Healthy))
+
+ val (coldCreations, _) =
+ ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs)
+
+ coldCreations.size shouldBe 3
+ coldCreations.map(_._1).containsSlice(List(msg1, msg2, msg3)) shouldBe true
+ }
}
@RunWith(classOf[JUnitRunner])