You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/08/09 16:20:10 UTC

[openwhisk] branch master updated: Prewarm eviction variance (#4916)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 470eaf5  Prewarm eviction variance (#4916)
470eaf5 is described below

commit 470eaf5c56c8fa2a00486d09040e91ab1f7a064d
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Sun Aug 9 09:19:52 2020 -0700

    Prewarm eviction variance (#4916)
    
    * allow limiting number of prewarms to expire in one shot
    
    * fixing the order of prewarm removal/expiration tracking
---
 .../core/containerpool/ContainerFactory.scala      |   4 +-
 core/invoker/src/main/resources/application.conf   |   4 +-
 .../core/containerpool/ContainerPool.scala         | 117 +++++++++++++--------
 .../mesos/test/MesosContainerFactoryTest.scala     |   4 +-
 .../containerpool/test/ContainerPoolTests.scala    | 114 +++++++++++++++++---
 .../containerpool/test/ContainerProxyTests.scala   |   3 +-
 6 files changed, 182 insertions(+), 64 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index f2e5ab7..1603ba2 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -47,7 +47,9 @@ case class ContainerArgsConfig(network: String,
 case class ContainerPoolConfig(userMemory: ByteSize,
                                concurrentPeekFactor: Double,
                                akkaClient: Boolean,
-                               prewarmExpirationCheckInterval: FiniteDuration) {
+                               prewarmExpirationCheckInterval: FiniteDuration,
+                               prewarmExpirationCheckIntervalVariance: Option[FiniteDuration],
+                               prewarmExpirationLimit: Int) {
   require(
     concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
     s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index f5afc0c..a11f454 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -60,7 +60,9 @@ whisk {
     user-memory: 1024 m
     concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
     akka-client:  false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
-    prewarm-expiration-check-interval: 1 minute
+    prewarm-expiration-check-interval: 1 minute # period to check for prewarm expiration
+    prewarm-expiration-check-interval-variance: 10 seconds # varies expiration across invokers to avoid many concurrent expirations
+    prewarm-expiration-limit: 100 # number of prewarms to expire in one expiration cycle (remaining expired will be considered for expiration in next cycle)
   }
 
   kubernetes {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 58cad8c..7ebaec5 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -27,7 +27,7 @@ import org.apache.openwhisk.core.entity.size._
 import scala.annotation.tailrec
 import scala.collection.immutable
 import scala.concurrent.duration._
-import scala.util.Try
+import scala.util.{Random, Try}
 
 sealed trait WorkerState
 case object Busy extends WorkerState
@@ -90,11 +90,14 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
   adjustPrewarmedContainer(true, false)
 
   // check periodically, adjust prewarmed container(delete if unused for some time and create some increment containers)
-  context.system.scheduler.schedule(
-    2.seconds,
-    poolConfig.prewarmExpirationCheckInterval,
-    self,
-    AdjustPrewarmedContainer)
+  // add some random amount to this schedule to avoid a herd of container removal + creation
+  val interval = poolConfig.prewarmExpirationCheckInterval + poolConfig.prewarmExpirationCheckIntervalVariance
+    .map(v =>
+      Random
+        .nextInt(v.toSeconds.toInt))
+    .getOrElse(0)
+    .seconds
+  context.system.scheduler.schedule(2.seconds, interval, self, AdjustPrewarmedContainer)
 
   def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
     val namespaceName = r.msg.user.namespace.name
@@ -330,7 +333,20 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
 
   /** adjust prewarm containers up to the configured requirements for each kind/memory combination. */
   def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
-    //fill in missing prewarms
+    if (scheduled) {
+      //on scheduled time, remove expired prewarms
+      ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
+        prewarmedPool = prewarmedPool - p
+        p ! Remove
+      }
+      //on scheduled time, emit cold start counter metric with memory + kind
+      coldStartCount foreach { coldStart =>
+        val coldStartKey = coldStart._1
+        MetricEmitter.emitCounterMetric(
+          LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString, coldStartKey.kind))
+      }
+    }
+    //fill in missing prewarms (replaces any deletes)
     ContainerPool
       .increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, prewarmedPool, prewarmStartingPool)
       .foreach { c =>
@@ -344,15 +360,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
         }
       }
     if (scheduled) {
-      //on scheduled time, remove expired prewarms
-      ContainerPool.removeExpired(prewarmConfig, prewarmedPool).foreach(_ ! Remove)
-      //on scheduled time, emit cold start counter metric with memory + kind
-      coldStartCount foreach { coldStart =>
-        val coldStartKey = coldStart._1
-        MetricEmitter.emitCounterMetric(
-          LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString, coldStartKey.kind))
-      }
-      //   then clear coldStartCounts each time scheduled event is processed to reset counts
+      //   lastly, clear coldStartCounts each time scheduled event is processed to reset counts
       coldStartCount = immutable.Map.empty[ColdStartKey, Int]
     }
   }
@@ -452,10 +460,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     MetricEmitter.emitGaugeMetric(
       LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE,
       containersInUse.map(_._2.memoryLimit.toMB).sum)
-    MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, prewarmedPool.size)
+    MetricEmitter.emitGaugeMetric(
+      LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT,
+      prewarmedPool.size + prewarmStartingPool.size)
     MetricEmitter.emitGaugeMetric(
       LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
-      prewarmedPool.map(_._2.memoryLimit.toMB).sum)
+      prewarmedPool.map(_._2.memoryLimit.toMB).sum + prewarmStartingPool.map(_._2._2.toMB).sum)
     val unused = freePool.filter(_._2.activeActivationCount == 0)
     val unusedMB = unused.map(_._2.memoryLimit.toMB).sum
     MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_IDLES_COUNT, unused.size)
@@ -555,34 +565,53 @@ object ContainerPool {
   /**
    * Find the expired actor in prewarmedPool
    *
+   * @param poolConfig
    * @param prewarmConfig
    * @param prewarmedPool
    * @param logging
    * @return a list of expired actor
    */
-  def removeExpired(prewarmConfig: List[PrewarmingConfig], prewarmedPool: Map[ActorRef, PreWarmedData])(
-    implicit logging: Logging): List[ActorRef] = {
-    prewarmConfig.flatMap { config =>
-      val kind = config.exec.kind
-      val memory = config.memoryLimit
-      config.reactive
-        .map { _ =>
-          val expiredPrewarmedContainer = prewarmedPool
-            .filter { warmInfo =>
-              warmInfo match {
-                case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if p.isExpired() => true
-                case _                                                                  => false
+  def removeExpired[A](poolConfig: ContainerPoolConfig,
+                       prewarmConfig: List[PrewarmingConfig],
+                       prewarmedPool: Map[A, PreWarmedData])(implicit logging: Logging): List[A] = {
+    val now = Deadline.now
+    val expireds = prewarmConfig
+      .flatMap { config =>
+        val kind = config.exec.kind
+        val memory = config.memoryLimit
+        config.reactive
+          .map { c =>
+            val expiredPrewarmedContainer = prewarmedPool.toSeq
+              .filter { warmInfo =>
+                warmInfo match {
+                  case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if p.isExpired() => true
+                  case _                                                                  => false
+                }
               }
+              .sortBy(_._2.expires.getOrElse(now))
+
+            // emit expired container counter metric with memory + kind
+            MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
+            if (expiredPrewarmedContainer.nonEmpty) {
+              logging.info(
+                this,
+                s"[kind: ${kind} memory: ${memory.toString}] ${expiredPrewarmedContainer.size} expired prewarmed containers")
             }
-          // emit expired container counter metric with memory + kind
-          MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
-          logging.info(
-            this,
-            s"[kind: ${kind} memory: ${memory.toString}] removed ${expiredPrewarmedContainer.size} expired prewarmed container")
-          expiredPrewarmedContainer.keys
+            expiredPrewarmedContainer.map(e => (e._1, e._2.expires.getOrElse(now)))
+          }
+          .getOrElse(List.empty)
+      }
+      .sortBy(_._2) //need to sort these so that if the results are limited, we take the oldest
+      .map(_._1)
+    if (expireds.nonEmpty) {
+      logging.info(this, s"removing up to ${poolConfig.prewarmExpirationLimit} of ${expireds.size} expired containers")
+      expireds.take(poolConfig.prewarmExpirationLimit).foreach { e =>
+        prewarmedPool.get(e).map { d =>
+          logging.info(this, s"removing expired prewarm of kind ${d.kind} with container ${d.container} ")
         }
-        .getOrElse(List.empty)
+      }
     }
+    expireds.take(poolConfig.prewarmExpirationLimit)
   }
 
   /**
@@ -609,8 +638,8 @@ object ContainerPool {
       val memory = config.memoryLimit
 
       val runningCount = prewarmedPool.count {
-        // done starting, and not expired
-        case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if !p.isExpired() => true
+        // done starting (include expired, since they may not have been removed yet)
+        case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) => true
         // started but not finished starting (or expired)
         case _ => false
       }
@@ -632,10 +661,12 @@ object ContainerPool {
           }
         }
 
-      logging.info(
-        this,
-        s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${desiredCount - currentCount} pre-warms to desired count: ${desiredCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
-        TransactionId.invokerWarmup)
+      if (currentCount < desiredCount) {
+        logging.info(
+          this,
+          s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${desiredCount - currentCount} pre-warms to desired count: ${desiredCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
+          TransactionId.invokerWarmup)
+      }
       (config, (currentCount, desiredCount))
     }.toMap
   }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index 656c89c..e6cd99f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -17,8 +17,6 @@
 
 package org.apache.openwhisk.core.containerpool.mesos.test
 
-import java.util.concurrent.TimeUnit
-
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.Sink
@@ -86,7 +84,7 @@ class MesosContainerFactoryTest
   }
 
   // 80 slots, each 265MB
-  val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, FiniteDuration(1, TimeUnit.MINUTES))
+  val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, 1.minute, None, 100)
   val actionMemory = 265.MB
   val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index d684342..8c2fea5 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -17,6 +17,7 @@
 
 package org.apache.openwhisk.core.containerpool.test
 
+import java.io.{ByteArrayOutputStream, PrintStream}
 import java.time.Instant
 import java.util.concurrent.TimeUnit
 
@@ -35,7 +36,7 @@ import akka.testkit.ImplicitSender
 import akka.testkit.TestKit
 import akka.testkit.TestProbe
 import common.{StreamLogging, WhiskProperties}
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId}
 import org.apache.openwhisk.core.connector.ActivationMessage
 import org.apache.openwhisk.core.containerpool._
 import org.apache.openwhisk.core.entity._
@@ -132,7 +133,7 @@ class ContainerPoolTests
   }
 
   def poolConfig(userMemory: ByteSize) =
-    ContainerPoolConfig(userMemory, 0.5, false, FiniteDuration(1, TimeUnit.MINUTES))
+    ContainerPoolConfig(userMemory, 0.5, false, 1.minute, None, 100)
 
   behavior of "ContainerPool"
 
@@ -792,7 +793,8 @@ class ContainerPoolTests
 
     stream.reset()
     val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
-    val poolConfig = ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5, false, prewarmExpirationCheckIntervel)
+    val poolConfig =
+      ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
     val initialCount = 2
     val pool =
       system.actorOf(
@@ -816,7 +818,7 @@ class ContainerPoolTests
 
     // Because already supplemented the prewarmed container, so currentCount should equal with initialCount
     eventually {
-      stream.toString should include(s"found ${initialCount} started")
+      stream.toString should not include ("started")
     }
   }
 
@@ -825,8 +827,9 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     stream.reset()
-    val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
-    val poolConfig = ContainerPoolConfig(MemoryLimit.STD_MEMORY * 8, 0.5, false, prewarmExpirationCheckIntervel)
+    val prewarmExpirationCheckIntervel = 2.seconds
+    val poolConfig =
+      ContainerPoolConfig(MemoryLimit.STD_MEMORY * 8, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
     val minCount = 0
     val initialCount = 2
     val maxCount = 4
@@ -837,6 +840,7 @@ class ContainerPoolTests
       system.actorOf(
         ContainerPool
           .props(factory, poolConfig, feed.ref, List(PrewarmingConfig(initialCount, exec, memoryLimit, reactive))))
+    //start 2 prewarms
     containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
     containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
     containers(0).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
@@ -852,19 +856,19 @@ class ContainerPoolTests
 
     // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
     Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
-
+    //expire 2 prewarms
     containers(0).expectMsg(Remove)
     containers(1).expectMsg(Remove)
     containers(0).send(pool, ContainerRemoved(false))
     containers(1).send(pool, ContainerRemoved(false))
 
     // currentCount should equal with 0 due to these 2 prewarmed containers are expired
-    stream.toString should include(s"found 0 started")
+    stream.toString should not include (s"found 0 started")
 
     // the desiredCount should equal with minCount because cold start didn't happen
-    stream.toString should include(s"desired count: ${minCount}")
+    stream.toString should not include (s"desired count: ${minCount}")
     // Previously created prewarmed containers should be removed
-    stream.toString should include(s"removed ${initialCount} expired prewarmed container")
+    stream.toString should not include (s"removed ${initialCount} expired prewarmed container")
 
     stream.reset()
     val action = ExecutableWhiskAction(
@@ -873,7 +877,7 @@ class ContainerPoolTests
       exec,
       limits = ActionLimits(memory = MemoryLimit(memoryLimit)))
     val run = createRunMessage(action, invocationNamespace)
-    // 2 code start happened
+    // 2 cold start happened
     pool ! run
     pool ! run
     containers(2).expectMsg(run)
@@ -888,7 +892,7 @@ class ContainerPoolTests
       // the desiredCount should equal with 2 due to cold start happened
       stream.toString should include(s"desired count: 2")
     }
-
+    //add 2 prewarms due to increments
     containers(4).expectMsg(Start(exec, memoryLimit, Some(ttl)))
     containers(5).expectMsg(Start(exec, memoryLimit, Some(ttl)))
     containers(4).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
@@ -905,7 +909,7 @@ class ContainerPoolTests
     containers(5).send(pool, ContainerRemoved(false))
 
     // removed previous 2 prewarmed container due to expired
-    stream.toString should include(s"removed 2 expired prewarmed container")
+    stream.toString should include(s"removing up to ${poolConfig.prewarmExpirationLimit} of 2 expired containers")
 
     stream.reset()
     // 5 code start happened(5 > maxCount)
@@ -984,7 +988,8 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
     WarmingColdData(EntityName(namespace), action, lastUsed, active)
 
   /** Helper to create PreWarmedData with sensible defaults */
-  def preWarmedData(kind: String = "anyKind") = PreWarmedData(stub[MockableContainer], kind, 256.MB)
+  def preWarmedData(kind: String = "anyKind", expires: Option[Deadline] = None) =
+    PreWarmedData(stub[MockableContainer], kind, 256.MB, expires = expires)
 
   /** Helper to create NoData */
   def noData() = NoData()
@@ -1195,4 +1200,85 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
     pool = pool - 'first
     ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('second)
   }
+
+  it should "remove expired in order of expiration" in {
+    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 1)
+    val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+    //use a second kind so that we know sorting is not isolated to the expired of each kind
+    val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None)
+    val memoryLimit = 256.MB
+    val prewarmConfig =
+      List(
+        PrewarmingConfig(1, exec, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))),
+        PrewarmingConfig(1, exec2, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))))
+    val oldestDeadline = Deadline.now - 1.seconds
+    val newerDeadline = Deadline.now
+    val newestDeadline = Deadline.now + 1.seconds
+    val prewarmedPool = Map(
+      'newest -> preWarmedData("actionKind", Some(newestDeadline)),
+      'oldest -> preWarmedData("actionKind2", Some(oldestDeadline)),
+      'newer -> preWarmedData("actionKind", Some(newerDeadline)))
+    lazy val stream = new ByteArrayOutputStream
+    lazy val printstream = new PrintStream(stream)
+    lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
+    ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List('oldest))
+  }
+
+  it should "remove only the prewarmExpirationLimit of expired prewarms" in {
+    //limit prewarm removal to 2
+    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 2)
+    val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+    val memoryLimit = 256.MB
+    val prewarmConfig =
+      List(PrewarmingConfig(3, exec, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))))
+    //all are overdue, with different expiration times
+    val oldestDeadline = Deadline.now - 5.seconds
+    val newerDeadline = Deadline.now - 4.seconds
+    //the newest* ones are expired, but not the oldest, and not within the limit of 2 prewarms, so won't be removed
+    val newestDeadline = Deadline.now - 3.seconds
+    val newestDeadline2 = Deadline.now - 2.seconds
+    val newestDeadline3 = Deadline.now - 1.seconds
+    val prewarmedPool = Map(
+      'newest -> preWarmedData("actionKind", Some(newestDeadline)),
+      'oldest -> preWarmedData("actionKind", Some(oldestDeadline)),
+      'newest3 -> preWarmedData("actionKind", Some(newestDeadline3)),
+      'newer -> preWarmedData("actionKind", Some(newerDeadline)),
+      'newest2 -> preWarmedData("actionKind", Some(newestDeadline2)))
+    lazy val stream = new ByteArrayOutputStream
+    lazy val printstream = new PrintStream(stream)
+    lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
+    ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List('oldest, 'newer))
+  }
+
+  it should "remove only the expired prewarms regardless of minCount" in {
+    //limit prewarm removal to 100
+    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 100)
+    val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+    val memoryLimit = 256.MB
+    //minCount is 2 - should leave at least 2 prewarms when removing expired
+    val prewarmConfig =
+      List(PrewarmingConfig(3, exec, memoryLimit, Some(ReactivePrewarmingConfig(2, 10, 10.seconds, 1, 1))))
+    //all are overdue, with different expiration times
+    val oldestDeadline = Deadline.now - 5.seconds
+    val newerDeadline = Deadline.now - 4.seconds
+    //the newest* ones are expired, but not the oldest, and not within the limit of 2 prewarms, so won't be removed
+    val newestDeadline = Deadline.now - 3.seconds
+    val newestDeadline2 = Deadline.now - 2.seconds
+    val newestDeadline3 = Deadline.now - 1.seconds
+    val prewarmedPool = Map(
+      'newest -> preWarmedData("actionKind", Some(newestDeadline)),
+      'oldest -> preWarmedData("actionKind", Some(oldestDeadline)),
+      'newest3 -> preWarmedData("actionKind", Some(newestDeadline3)),
+      'newer -> preWarmedData("actionKind", Some(newerDeadline)),
+      'newest2 -> preWarmedData("actionKind", Some(newestDeadline2)))
+    lazy val stream = new ByteArrayOutputStream
+    lazy val printstream = new PrintStream(stream)
+    lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
+    ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List(
+      'oldest,
+      'newer,
+      'newest,
+      'newest2,
+      'newest3))
+  }
 }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 46e334c..3316dc5 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -27,7 +27,6 @@ import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe
 import akka.util.ByteString
 import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction, WhiskProperties}
 import java.time.temporal.ChronoUnit
-import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
 
 import akka.io.Tcp.{Close, CommandFailed, Connect, Connected}
@@ -275,7 +274,7 @@ class ContainerProxyTests
     (transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) =>
       Future.successful(())
   }
-  val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, FiniteDuration(1, TimeUnit.MINUTES))
+  val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 1.minute, None, 100)
   def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
   val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))