You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/08/09 16:20:10 UTC
[openwhisk] branch master updated: Prewarm eviction variance (#4916)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 470eaf5 Prewarm eviction variance (#4916)
470eaf5 is described below
commit 470eaf5c56c8fa2a00486d09040e91ab1f7a064d
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Sun Aug 9 09:19:52 2020 -0700
Prewarm eviction variance (#4916)
* allow limiting number of prewarms to expire in one shot
* fixing the order of prewarm removal/expiration tracking
---
.../core/containerpool/ContainerFactory.scala | 4 +-
core/invoker/src/main/resources/application.conf | 4 +-
.../core/containerpool/ContainerPool.scala | 117 +++++++++++++--------
.../mesos/test/MesosContainerFactoryTest.scala | 4 +-
.../containerpool/test/ContainerPoolTests.scala | 114 +++++++++++++++++---
.../containerpool/test/ContainerProxyTests.scala | 3 +-
6 files changed, 182 insertions(+), 64 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index f2e5ab7..1603ba2 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -47,7 +47,9 @@ case class ContainerArgsConfig(network: String,
case class ContainerPoolConfig(userMemory: ByteSize,
concurrentPeekFactor: Double,
akkaClient: Boolean,
- prewarmExpirationCheckInterval: FiniteDuration) {
+ prewarmExpirationCheckInterval: FiniteDuration,
+ prewarmExpirationCheckIntervalVariance: Option[FiniteDuration],
+ prewarmExpirationLimit: Int) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index f5afc0c..a11f454 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -60,7 +60,9 @@ whisk {
user-memory: 1024 m
concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
- prewarm-expiration-check-interval: 1 minute
+ prewarm-expiration-check-interval: 1 minute # period to check for prewarm expiration
+ prewarm-expiration-check-interval-variance: 10 seconds # varies expiration across invokers to avoid many concurrent expirations
+ prewarm-expiration-limit: 100 # number of prewarms to expire in one expiration cycle (remaining expired will be considered for expiration in next cycle)
}
kubernetes {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 58cad8c..7ebaec5 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -27,7 +27,7 @@ import org.apache.openwhisk.core.entity.size._
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration._
-import scala.util.Try
+import scala.util.{Random, Try}
sealed trait WorkerState
case object Busy extends WorkerState
@@ -90,11 +90,14 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
adjustPrewarmedContainer(true, false)
// check periodically, adjust prewarmed container(delete if unused for some time and create some increment containers)
- context.system.scheduler.schedule(
- 2.seconds,
- poolConfig.prewarmExpirationCheckInterval,
- self,
- AdjustPrewarmedContainer)
+ // add some random amount to this schedule to avoid a herd of container removal + creation
+ val interval = poolConfig.prewarmExpirationCheckInterval + poolConfig.prewarmExpirationCheckIntervalVariance
+ .map(v =>
+ Random
+ .nextInt(v.toSeconds.toInt))
+ .getOrElse(0)
+ .seconds
+ context.system.scheduler.schedule(2.seconds, interval, self, AdjustPrewarmedContainer)
def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
val namespaceName = r.msg.user.namespace.name
@@ -330,7 +333,20 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
/** adjust prewarm containers up to the configured requirements for each kind/memory combination. */
def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
- //fill in missing prewarms
+ if (scheduled) {
+ //on scheduled time, remove expired prewarms
+ ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
+ prewarmedPool = prewarmedPool - p
+ p ! Remove
+ }
+ //on scheduled time, emit cold start counter metric with memory + kind
+ coldStartCount foreach { coldStart =>
+ val coldStartKey = coldStart._1
+ MetricEmitter.emitCounterMetric(
+ LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString, coldStartKey.kind))
+ }
+ }
+ //fill in missing prewarms (replaces any deletes)
ContainerPool
.increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, prewarmedPool, prewarmStartingPool)
.foreach { c =>
@@ -344,15 +360,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
}
if (scheduled) {
- //on scheduled time, remove expired prewarms
- ContainerPool.removeExpired(prewarmConfig, prewarmedPool).foreach(_ ! Remove)
- //on scheduled time, emit cold start counter metric with memory + kind
- coldStartCount foreach { coldStart =>
- val coldStartKey = coldStart._1
- MetricEmitter.emitCounterMetric(
- LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString, coldStartKey.kind))
- }
- // then clear coldStartCounts each time scheduled event is processed to reset counts
+ // lastly, clear coldStartCounts each time scheduled event is processed to reset counts
coldStartCount = immutable.Map.empty[ColdStartKey, Int]
}
}
@@ -452,10 +460,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
MetricEmitter.emitGaugeMetric(
LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE,
containersInUse.map(_._2.memoryLimit.toMB).sum)
- MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, prewarmedPool.size)
+ MetricEmitter.emitGaugeMetric(
+ LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT,
+ prewarmedPool.size + prewarmStartingPool.size)
MetricEmitter.emitGaugeMetric(
LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
- prewarmedPool.map(_._2.memoryLimit.toMB).sum)
+ prewarmedPool.map(_._2.memoryLimit.toMB).sum + prewarmStartingPool.map(_._2._2.toMB).sum)
val unused = freePool.filter(_._2.activeActivationCount == 0)
val unusedMB = unused.map(_._2.memoryLimit.toMB).sum
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_IDLES_COUNT, unused.size)
@@ -555,34 +565,53 @@ object ContainerPool {
/**
* Find the expired actor in prewarmedPool
*
+ * @param poolConfig
* @param prewarmConfig
* @param prewarmedPool
* @param logging
* @return a list of expired actor
*/
- def removeExpired(prewarmConfig: List[PrewarmingConfig], prewarmedPool: Map[ActorRef, PreWarmedData])(
- implicit logging: Logging): List[ActorRef] = {
- prewarmConfig.flatMap { config =>
- val kind = config.exec.kind
- val memory = config.memoryLimit
- config.reactive
- .map { _ =>
- val expiredPrewarmedContainer = prewarmedPool
- .filter { warmInfo =>
- warmInfo match {
- case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if p.isExpired() => true
- case _ => false
+ def removeExpired[A](poolConfig: ContainerPoolConfig,
+ prewarmConfig: List[PrewarmingConfig],
+ prewarmedPool: Map[A, PreWarmedData])(implicit logging: Logging): List[A] = {
+ val now = Deadline.now
+ val expireds = prewarmConfig
+ .flatMap { config =>
+ val kind = config.exec.kind
+ val memory = config.memoryLimit
+ config.reactive
+ .map { c =>
+ val expiredPrewarmedContainer = prewarmedPool.toSeq
+ .filter { warmInfo =>
+ warmInfo match {
+ case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if p.isExpired() => true
+ case _ => false
+ }
}
+ .sortBy(_._2.expires.getOrElse(now))
+
+ // emit expired container counter metric with memory + kind
+ MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
+ if (expiredPrewarmedContainer.nonEmpty) {
+ logging.info(
+ this,
+ s"[kind: ${kind} memory: ${memory.toString}] ${expiredPrewarmedContainer.size} expired prewarmed containers")
}
- // emit expired container counter metric with memory + kind
- MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
- logging.info(
- this,
- s"[kind: ${kind} memory: ${memory.toString}] removed ${expiredPrewarmedContainer.size} expired prewarmed container")
- expiredPrewarmedContainer.keys
+ expiredPrewarmedContainer.map(e => (e._1, e._2.expires.getOrElse(now)))
+ }
+ .getOrElse(List.empty)
+ }
+ .sortBy(_._2) //need to sort these so that if the results are limited, we take the oldest
+ .map(_._1)
+ if (expireds.nonEmpty) {
+ logging.info(this, s"removing up to ${poolConfig.prewarmExpirationLimit} of ${expireds.size} expired containers")
+ expireds.take(poolConfig.prewarmExpirationLimit).foreach { e =>
+ prewarmedPool.get(e).map { d =>
+ logging.info(this, s"removing expired prewarm of kind ${d.kind} with container ${d.container} ")
}
- .getOrElse(List.empty)
+ }
}
+ expireds.take(poolConfig.prewarmExpirationLimit)
}
/**
@@ -609,8 +638,8 @@ object ContainerPool {
val memory = config.memoryLimit
val runningCount = prewarmedPool.count {
- // done starting, and not expired
- case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if !p.isExpired() => true
+ // done starting (include expired, since they may not have been removed yet)
+ case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) => true
// started but not finished starting (or expired)
case _ => false
}
@@ -632,10 +661,12 @@ object ContainerPool {
}
}
- logging.info(
- this,
- s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${desiredCount - currentCount} pre-warms to desired count: ${desiredCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
- TransactionId.invokerWarmup)
+ if (currentCount < desiredCount) {
+ logging.info(
+ this,
+ s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${desiredCount - currentCount} pre-warms to desired count: ${desiredCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
+ TransactionId.invokerWarmup)
+ }
(config, (currentCount, desiredCount))
}.toMap
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index 656c89c..e6cd99f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -17,8 +17,6 @@
package org.apache.openwhisk.core.containerpool.mesos.test
-import java.util.concurrent.TimeUnit
-
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
@@ -86,7 +84,7 @@ class MesosContainerFactoryTest
}
// 80 slots, each 265MB
- val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, FiniteDuration(1, TimeUnit.MINUTES))
+ val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, 1.minute, None, 100)
val actionMemory = 265.MB
val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index d684342..8c2fea5 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -17,6 +17,7 @@
package org.apache.openwhisk.core.containerpool.test
+import java.io.{ByteArrayOutputStream, PrintStream}
import java.time.Instant
import java.util.concurrent.TimeUnit
@@ -35,7 +36,7 @@ import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.testkit.TestProbe
import common.{StreamLogging, WhiskProperties}
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId}
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.entity._
@@ -132,7 +133,7 @@ class ContainerPoolTests
}
def poolConfig(userMemory: ByteSize) =
- ContainerPoolConfig(userMemory, 0.5, false, FiniteDuration(1, TimeUnit.MINUTES))
+ ContainerPoolConfig(userMemory, 0.5, false, 1.minute, None, 100)
behavior of "ContainerPool"
@@ -792,7 +793,8 @@ class ContainerPoolTests
stream.reset()
val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
- val poolConfig = ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5, false, prewarmExpirationCheckIntervel)
+ val poolConfig =
+ ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
val initialCount = 2
val pool =
system.actorOf(
@@ -816,7 +818,7 @@ class ContainerPoolTests
// Because already supplemented the prewarmed container, so currentCount should equal with initialCount
eventually {
- stream.toString should include(s"found ${initialCount} started")
+ stream.toString should not include ("started")
}
}
@@ -825,8 +827,9 @@ class ContainerPoolTests
val feed = TestProbe()
stream.reset()
- val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
- val poolConfig = ContainerPoolConfig(MemoryLimit.STD_MEMORY * 8, 0.5, false, prewarmExpirationCheckIntervel)
+ val prewarmExpirationCheckIntervel = 2.seconds
+ val poolConfig =
+ ContainerPoolConfig(MemoryLimit.STD_MEMORY * 8, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
val minCount = 0
val initialCount = 2
val maxCount = 4
@@ -837,6 +840,7 @@ class ContainerPoolTests
system.actorOf(
ContainerPool
.props(factory, poolConfig, feed.ref, List(PrewarmingConfig(initialCount, exec, memoryLimit, reactive))))
+ //start 2 prewarms
containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
@@ -852,19 +856,19 @@ class ContainerPoolTests
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
-
+ //expire 2 prewarms
containers(0).expectMsg(Remove)
containers(1).expectMsg(Remove)
containers(0).send(pool, ContainerRemoved(false))
containers(1).send(pool, ContainerRemoved(false))
// currentCount should equal with 0 due to these 2 prewarmed containers are expired
- stream.toString should include(s"found 0 started")
+ stream.toString should not include (s"found 0 started")
// the desiredCount should equal with minCount because cold start didn't happen
- stream.toString should include(s"desired count: ${minCount}")
+ stream.toString should not include (s"desired count: ${minCount}")
// Previously created prewarmed containers should be removed
- stream.toString should include(s"removed ${initialCount} expired prewarmed container")
+ stream.toString should not include (s"removed ${initialCount} expired prewarmed container")
stream.reset()
val action = ExecutableWhiskAction(
@@ -873,7 +877,7 @@ class ContainerPoolTests
exec,
limits = ActionLimits(memory = MemoryLimit(memoryLimit)))
val run = createRunMessage(action, invocationNamespace)
- // 2 code start happened
+ // 2 cold start happened
pool ! run
pool ! run
containers(2).expectMsg(run)
@@ -888,7 +892,7 @@ class ContainerPoolTests
// the desiredCount should equal with 2 due to cold start happened
stream.toString should include(s"desired count: 2")
}
-
+ //add 2 prewarms due to increments
containers(4).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(5).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(4).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
@@ -905,7 +909,7 @@ class ContainerPoolTests
containers(5).send(pool, ContainerRemoved(false))
// removed previous 2 prewarmed container due to expired
- stream.toString should include(s"removed 2 expired prewarmed container")
+ stream.toString should include(s"removing up to ${poolConfig.prewarmExpirationLimit} of 2 expired containers")
stream.reset()
// 5 code start happened(5 > maxCount)
@@ -984,7 +988,8 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
WarmingColdData(EntityName(namespace), action, lastUsed, active)
/** Helper to create PreWarmedData with sensible defaults */
- def preWarmedData(kind: String = "anyKind") = PreWarmedData(stub[MockableContainer], kind, 256.MB)
+ def preWarmedData(kind: String = "anyKind", expires: Option[Deadline] = None) =
+ PreWarmedData(stub[MockableContainer], kind, 256.MB, expires = expires)
/** Helper to create NoData */
def noData() = NoData()
@@ -1195,4 +1200,85 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
pool = pool - 'first
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('second)
}
+
+ it should "remove expired in order of expiration" in {
+ val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 1)
+ val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+ //use a second kind so that we know sorting is not isolated to the expired of each kind
+ val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None)
+ val memoryLimit = 256.MB
+ val prewarmConfig =
+ List(
+ PrewarmingConfig(1, exec, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))),
+ PrewarmingConfig(1, exec2, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))))
+ val oldestDeadline = Deadline.now - 1.seconds
+ val newerDeadline = Deadline.now
+ val newestDeadline = Deadline.now + 1.seconds
+ val prewarmedPool = Map(
+ 'newest -> preWarmedData("actionKind", Some(newestDeadline)),
+ 'oldest -> preWarmedData("actionKind2", Some(oldestDeadline)),
+ 'newer -> preWarmedData("actionKind", Some(newerDeadline)))
+ lazy val stream = new ByteArrayOutputStream
+ lazy val printstream = new PrintStream(stream)
+ lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
+ ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List('oldest))
+ }
+
+ it should "remove only the prewarmExpirationLimit of expired prewarms" in {
+ //limit prewarm removal to 2
+ val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 2)
+ val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+ val memoryLimit = 256.MB
+ val prewarmConfig =
+ List(PrewarmingConfig(3, exec, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))))
+ //all are overdue, with different expiration times
+ val oldestDeadline = Deadline.now - 5.seconds
+ val newerDeadline = Deadline.now - 4.seconds
+ //the newest* ones are expired, but not the oldest, and not within the limit of 2 prewarms, so won't be removed
+ val newestDeadline = Deadline.now - 3.seconds
+ val newestDeadline2 = Deadline.now - 2.seconds
+ val newestDeadline3 = Deadline.now - 1.seconds
+ val prewarmedPool = Map(
+ 'newest -> preWarmedData("actionKind", Some(newestDeadline)),
+ 'oldest -> preWarmedData("actionKind", Some(oldestDeadline)),
+ 'newest3 -> preWarmedData("actionKind", Some(newestDeadline3)),
+ 'newer -> preWarmedData("actionKind", Some(newerDeadline)),
+ 'newest2 -> preWarmedData("actionKind", Some(newestDeadline2)))
+ lazy val stream = new ByteArrayOutputStream
+ lazy val printstream = new PrintStream(stream)
+ lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
+ ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List('oldest, 'newer))
+ }
+
+ it should "remove only the expired prewarms regardless of minCount" in {
+ //limit prewarm removal to 100
+ val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 100)
+ val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+ val memoryLimit = 256.MB
+ //minCount is 2 - should leave at least 2 prewarms when removing expired
+ val prewarmConfig =
+ List(PrewarmingConfig(3, exec, memoryLimit, Some(ReactivePrewarmingConfig(2, 10, 10.seconds, 1, 1))))
+ //all are overdue, with different expiration times
+ val oldestDeadline = Deadline.now - 5.seconds
+ val newerDeadline = Deadline.now - 4.seconds
+ //the newest* ones are expired, but not the oldest, and not within the limit of 2 prewarms, so won't be removed
+ val newestDeadline = Deadline.now - 3.seconds
+ val newestDeadline2 = Deadline.now - 2.seconds
+ val newestDeadline3 = Deadline.now - 1.seconds
+ val prewarmedPool = Map(
+ 'newest -> preWarmedData("actionKind", Some(newestDeadline)),
+ 'oldest -> preWarmedData("actionKind", Some(oldestDeadline)),
+ 'newest3 -> preWarmedData("actionKind", Some(newestDeadline3)),
+ 'newer -> preWarmedData("actionKind", Some(newerDeadline)),
+ 'newest2 -> preWarmedData("actionKind", Some(newestDeadline2)))
+ lazy val stream = new ByteArrayOutputStream
+ lazy val printstream = new PrintStream(stream)
+ lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
+ ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List(
+ 'oldest,
+ 'newer,
+ 'newest,
+ 'newest2,
+ 'newest3))
+ }
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 46e334c..3316dc5 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -27,7 +27,6 @@ import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe
import akka.util.ByteString
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction, WhiskProperties}
import java.time.temporal.ChronoUnit
-import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import akka.io.Tcp.{Close, CommandFailed, Connect, Connected}
@@ -275,7 +274,7 @@ class ContainerProxyTests
(transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) =>
Future.successful(())
}
- val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, FiniteDuration(1, TimeUnit.MINUTES))
+ val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 1.minute, None, 100)
def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))