You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/05/31 23:09:00 UTC

[openwhisk] branch master updated: Introduce scheduling configurations. (#5232)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ef94d727 Introduce scheduling configurations. (#5232)
4ef94d727 is described below

commit 4ef94d727cdc90a772946330de3837649936a5db
Author: Dominic Kim <st...@apache.org>
AuthorDate: Wed Jun 1 08:08:54 2022 +0900

    Introduce scheduling configurations. (#5232)
    
    * Introduce scheduling configurations.
    
    * Apply SchedulingConfig to MemoryQueue.
    
    * Apply SchedulingConfig to SchedulingDecisionMaker.
    
    * Apply ScalaFmt
    
    * Remove unused import
    
    * Change configs.
    
    * Fix test cases.
    
    * Apply scalaFmt
    
    * Remove Java8-compat dependency.
---
 ansible/group_vars/all                             |  4 ++
 ansible/roles/schedulers/tasks/deploy.yml          |  3 ++
 .../org/apache/openwhisk/core/WhiskConfig.scala    |  2 +
 .../core/database/memory/NoopActivationStore.scala |  7 ++-
 .../core/loadBalancer/FPCPoolBalancer.scala        | 19 ++++---
 core/scheduler/src/main/resources/application.conf |  5 ++
 .../openwhisk/core/scheduler/Scheduler.scala       |  8 ++-
 .../scheduler/container/ContainerManager.scala     |  3 +-
 .../core/scheduler/queue/MemoryQueue.scala         | 27 +++-------
 .../scheduler/queue/SchedulingDecisionMaker.scala  | 15 +++---
 .../mongodb/MongoDBStoreBehaviorBase.scala         |  2 +-
 .../invoker/test/DefaultInvokerServerTests.scala   |  8 +--
 .../core/invoker/test/FPCInvokerServerTests.scala  |  8 +--
 .../queue/test/MemoryQueueFlowTests.scala          | 54 +++++++++++---------
 .../scheduler/queue/test/MemoryQueueTests.scala    | 51 ++++++++++---------
 .../queue/test/MemoryQueueTestsFixture.scala       |  7 ++-
 .../queue/test/SchedulingDecisionMakerTests.scala  | 59 ++++++++++++----------
 17 files changed, 159 insertions(+), 123 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index b94385cc0..5e45a3844 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -515,6 +515,10 @@ scheduler:
   inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20 seconds') }}"
   managedFraction: "{{ scheduler_managed_fraction | default(1.0 - (scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
   blackboxFraction: "{{ scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction) }}"
+  scheduling:
+    staleThreshold: "{{ scheduler_scheduling_staleThreshold | default('100 milliseconds') }}"
+    checkInterval: "{{ scheduler_scheduling_checkInterval | default('100 milliseconds') }}"
+    dropInterval: "{{ scheduler_scheduling_dropInterval | default('10 seconds') }}"
   queueManager:
     maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
     maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
diff --git a/ansible/roles/schedulers/tasks/deploy.yml b/ansible/roles/schedulers/tasks/deploy.yml
index 9bf00403a..507f4813f 100644
--- a/ansible/roles/schedulers/tasks/deploy.yml
+++ b/ansible/roles/schedulers/tasks/deploy.yml
@@ -113,6 +113,9 @@
       "CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
       "CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}"
       "CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetention }}"
+      "CONFIG_whisk_scheduler_scheduling_staleThreshold": "{{ scheduler.scheduling.staleThreshold }}"
+      "CONFIG_whisk_scheduler_scheduling_checkInterval": "{{ scheduler.scheduling.checkInterval }}"
+      "CONFIG_whisk_scheduler_scheduling_dropInterval": "{{ scheduler.scheduling.dropInterval }}"
       "CONFIG_whisk_scheduler_queueManager_maxSchedulingTime": "{{ scheduler.queueManager.maxSchedulingTime }}"
       "CONFIG_whisk_scheduler_queueManager_maxRetriesToGetQueue": "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
       "CONFIG_whisk_scheduler_queue_idleGrace": "{{ scheduler.queue.idleGrace }}"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 7ddbc1f4d..95520f242 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -301,9 +301,11 @@ object ConfigKeys {
 
   val schedulerGrpcService = "whisk.scheduler.grpc"
   val schedulerMaxPeek = "whisk.scheduler.max-peek"
+  val schedulerScheduling = "whisk.scheduler.scheduling"
   val schedulerQueue = "whisk.scheduler.queue"
   val schedulerQueueManager = "whisk.scheduler.queue-manager"
   val schedulerInProgressJobRetention = "whisk.scheduler.in-progress-job-retention"
+  val schedulerStaleThreshold = "whisk.scheduler.stale-threshold"
 
   val whiskClusterName = "whisk.cluster.name"
 
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
index 8d45ff02f..fb881fff3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
@@ -20,7 +20,12 @@ package org.apache.openwhisk.core.database.memory
 import java.time.Instant
 import akka.actor.ActorSystem
 import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId, WhiskInstants}
-import org.apache.openwhisk.core.database.{ActivationStore, ActivationStoreProvider, CacheChangeNotification, UserContext}
+import org.apache.openwhisk.core.database.{
+  ActivationStore,
+  ActivationStoreProvider,
+  CacheChangeNotification,
+  UserContext
+}
 import org.apache.openwhisk.core.entity.{ActivationId, DocInfo, EntityName, EntityPath, Subject, WhiskActivation}
 import spray.json.{JsNumber, JsObject}
 
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index 576d3b3b1..478d5cbb9 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -639,17 +639,16 @@ class FPCPoolBalancer(config: WhiskConfig,
       MetricEmitter.emitGaugeMetric(UNHEALTHY_INVOKERS, invokers.count(_.status == Unhealthy))
       MetricEmitter.emitGaugeMetric(OFFLINE_INVOKERS, invokers.count(_.status == Offline))
       // Add both user memory and busy memory because user memory represents free memory in this case
-      MetricEmitter.emitGaugeMetric(
-        INVOKER_TOTALMEM,
-        invokers.foldLeft(0L) { (total, curr) =>
-          if (curr.status.isUsable) {
-            curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
-          } else {
-            total
-          }
-        })
+      MetricEmitter.emitGaugeMetric(INVOKER_TOTALMEM, invokers.foldLeft(0L) { (total, curr) =>
+        if (curr.status.isUsable) {
+          curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
+        } else {
+          total
+        }
+      })
       MetricEmitter.emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
-      MetricEmitter.emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
+      MetricEmitter
+        .emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
     })
   }
 
diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf
index 860b6e0ff..211ae5f0d 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -64,6 +64,11 @@ whisk {
     grpc {
       tls = "false"
     }
+    scheduling {
+      stale-threshold = "100 milliseconds"
+      check-interval = "100 milliseconds"
+      drop-interval = "10 seconds"
+    }
     queue {
       idle-grace = "20 seconds"
       stop-grace = "20 seconds"
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index f93a766b1..84643c270 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -70,6 +70,8 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
   val leaseService =
     actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, schedulerId, watcherService))
 
+  val schedulingConfig = loadConfigOrThrow[SchedulingConfig](ConfigKeys.schedulerQueue)
+
   implicit val entityStore = WhiskEntityStore.datastore()
   private val activationStore =
     SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
@@ -191,7 +193,7 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
     : (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef =
     (factory, invocationNamespace, fqn, revision, actionMetaData) => {
       // Todo: Change this to SPI
-      val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn))
+      val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn, schedulingConfig))
 
       factory.actorOf(
         MemoryQueue.props(
@@ -199,7 +201,7 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
           durationChecker,
           fqn,
           producer,
-          config,
+          schedulingConfig,
           invocationNamespace,
           revision,
           schedulerEndpoints,
@@ -395,3 +397,5 @@ object SchedulerStates extends DefaultJsonProtocol {
 
   def parse(states: String) = Try(serdes.read(states.parseJson))
 }
+
+case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval: FiniteDuration, dropInterval: FiniteDuration)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
index eaeb70be4..837045ed2 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -251,7 +251,8 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
   // Filter out messages which can use warmed container
   private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
     msgs.filter { msg =>
-      val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
+      val warmedPrefix =
+        containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
       val chosenInvoker = warmedContainers
         .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
         .find { container =>
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 3215be99a..d54e697e2 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -24,6 +24,7 @@ import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
 import akka.util.Timeout
 import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
 import org.apache.openwhisk.core.connector._
@@ -33,13 +34,6 @@ import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.etcd.EtcdClient
 import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
 import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
-import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
-import org.apache.openwhisk.core.scheduler.message.{
-  ContainerCreation,
-  ContainerDeletion,
-  FailedCreationJob,
-  SuccessfulCreationJob
-}
 import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse}
 import org.apache.openwhisk.core.scheduler.message.{
   ContainerCreation,
@@ -47,8 +41,8 @@ import org.apache.openwhisk.core.scheduler.message.{
   FailedCreationJob,
   SuccessfulCreationJob
 }
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
 import org.apache.openwhisk.core.service._
-import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
 import pureconfig.generic.auto._
 import pureconfig.loadConfigOrThrow
@@ -59,7 +53,6 @@ import scala.collection.immutable.Queue
 import scala.collection.mutable
 import scala.concurrent.duration._
 import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
-import scala.language.postfixOps
 import scala.util.{Failure, Success}
 
 // States
@@ -116,7 +109,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
                   private val durationChecker: DurationChecker,
                   private val action: FullyQualifiedEntityName,
                   messagingProducer: MessageProducer,
-                  config: WhiskConfig,
+                  schedulingConfig: SchedulingConfig,
                   invocationNamespace: String,
                   revision: DocRevision,
                   endpoints: SchedulerEndpoints,
@@ -144,11 +137,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
   private implicit val timeout = Timeout(5.seconds)
   private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId)
 
+  private val StaleDuration = Duration.ofMillis(schedulingConfig.staleThreshold.toMillis)
   private val unversionedAction = action.copy(version = None)
-  private val checkInterval: FiniteDuration = 100 milliseconds
-  private val StaleThreshold: Double = 100.0
-  private val StaleDuration = Duration.ofMillis(StaleThreshold.toLong)
-  private val dropInterval: FiniteDuration = 10 seconds
   private val leaderKey = QueueKeys.queue(invocationNamespace, unversionedAction, leader = true)
   private val inProgressContainerPrefixKey =
     containerPrefix(ContainerKeys.inProgressPrefix, invocationNamespace, action, Some(revision))
@@ -834,7 +824,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
     }
   }
 
-
   private def handleStaleActivationsWhenActionUpdated(queueManager: ActorRef): Unit = {
     if (queue.size > 0) {
       // if doesn't exist old container to pull old memoryQueue's activation, send the old activations to queueManager
@@ -862,12 +851,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
   // since there is no initial delay, it will try to create a container at initialization time
   // these schedulers will run forever and stop when the memory queue stops
   private def startMonitoring(): (ActorRef, ActorRef) = {
-    val droppingScheduler = Scheduler.scheduleWaitAtLeast(dropInterval) { () =>
+    val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
       checkToDropStaleActivation(queue, queueConfig.maxRetentionMs, invocationNamespace, action, stateName, self)
       Future.successful(())
     }
 
-    val monitoringScheduler = Scheduler.scheduleWaitAtLeast(checkInterval) { () =>
+    val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () =>
       // the average duration is updated every checkInterval
       if (averageDurationBuffer.nonEmpty) {
         averageDuration = Some(averageDurationBuffer.average)
@@ -1048,7 +1037,7 @@ object MemoryQueue {
             durationChecker: DurationChecker,
             fqn: FullyQualifiedEntityName,
             messagingProducer: MessageProducer,
-            config: WhiskConfig,
+            schedulingConfig: SchedulingConfig,
             invocationNamespace: String,
             revision: DocRevision,
             endpoints: SchedulerEndpoints,
@@ -1067,7 +1056,7 @@ object MemoryQueue {
         durationChecker,
         fqn: FullyQualifiedEntityName,
         messagingProducer: MessageProducer,
-        config: WhiskConfig,
+        schedulingConfig: SchedulingConfig,
         invocationNamespace: String,
         revision,
         endpoints: SchedulerEndpoints,
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index 0bbc1d98b..0e1f4ffa8 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -20,6 +20,7 @@ package org.apache.openwhisk.core.scheduler.queue
 import akka.actor.{Actor, ActorSystem, Props}
 import org.apache.openwhisk.common.Logging
 import org.apache.openwhisk.core.entity.FullyQualifiedEntityName
+import org.apache.openwhisk.core.scheduler.SchedulingConfig
 
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
@@ -27,9 +28,11 @@ import scala.util.{Failure, Success}
 class SchedulingDecisionMaker(
   invocationNamespace: String,
   action: FullyQualifiedEntityName,
-  StaleThreshold: Double = 100.0)(implicit val actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging)
+  schedulingConfig: SchedulingConfig)(implicit val actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging)
     extends Actor {
 
+  private val staleThreshold: Double = schedulingConfig.staleThreshold.toMillis.toDouble
+
   override def receive: Receive = {
     case msg: QueueSnapshot =>
       decide(msg)
@@ -135,7 +138,7 @@ class SchedulingDecisionMaker(
 
           case (Running, Some(duration)) if staleActivationNum > 0 =>
             // we can safely get the value as we already checked the existence
-            val containerThroughput = StaleThreshold / duration
+            val containerThroughput = staleThreshold / duration
             val num = ceiling(availableMsg.toDouble / containerThroughput)
             // if it tries to create more containers than existing messages, we just create shortage
             val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
@@ -153,7 +156,7 @@ class SchedulingDecisionMaker(
           // need more containers and a message is already processed
           case (Running, Some(duration)) =>
             // we can safely get the value as we already checked the existence
-            val containerThroughput = StaleThreshold / duration
+            val containerThroughput = staleThreshold / duration
             val expectedTps = containerThroughput * (existing + inProgress)
 
             if (availableMsg >= expectedTps && existing + inProgress < availableMsg) {
@@ -180,7 +183,7 @@ class SchedulingDecisionMaker(
           // this case is for that as a last resort.
           case (Removing, Some(duration)) if staleActivationNum > 0 =>
             // we can safely get the value as we already checked the existence
-            val containerThroughput = StaleThreshold / duration
+            val containerThroughput = staleThreshold / duration
             val num = ceiling(availableMsg.toDouble / containerThroughput)
             // if it tries to create more containers than existing messages, we just create shortage
             val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
@@ -252,10 +255,10 @@ class SchedulingDecisionMaker(
 }
 
 object SchedulingDecisionMaker {
-  def props(invocationNamespace: String, action: FullyQualifiedEntityName, StaleThreshold: Double = 100.0)(
+  def props(invocationNamespace: String, action: FullyQualifiedEntityName, schedulingConfig: SchedulingConfig)(
     implicit actorSystem: ActorSystem,
     ec: ExecutionContext,
     logging: Logging): Props = {
-    Props(new SchedulingDecisionMaker(invocationNamespace, action, StaleThreshold))
+    Props(new SchedulingDecisionMaker(invocationNamespace, action, schedulingConfig))
   }
 }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
index f6bb04967..50825dd09 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
@@ -25,7 +25,7 @@ import org.testcontainers.containers.MongoDBContainer
 import pureconfig.loadConfigOrThrow
 import pureconfig.generic.auto._
 
-import scala.reflect.{ClassTag, classTag}
+import scala.reflect.{classTag, ClassTag}
 
 trait MongoDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase {
   val imageName = loadConfigOrThrow[String]("whisk.mongodb.docker-image")
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
index 57cb976d2..76f1410a7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -83,9 +83,11 @@ class DefaultInvokerServerTests
     val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
     Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
       status should be(OK)
-      Unmarshal(responseEntity).to[String].map(response => {
-        InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
-      })
+      Unmarshal(responseEntity)
+        .to[String]
+        .map(response => {
+          InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
+        })
     }
   }
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
index e7ab02e2b..2393b8775 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -83,9 +83,11 @@ class FPCInvokerServerTests
     val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
     Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
       status should be(OK)
-      Unmarshal(responseEntity).to[String].map(response => {
-        InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
-      })
+      Unmarshal(responseEntity)
+        .to[String]
+        .map(response => {
+          InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
+        })
     }
   }
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 1d523c20d..edacd26f3 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -106,7 +106,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -176,7 +176,8 @@ class MemoryQueueFlowTests
     val dataMgmtService = TestProbe()
     val containerManager = TestProbe()
     val probe = TestProbe()
-    val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+    val testSchedulingDecisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
 
     val messages = getActivationMessages(2)
     val container = TestProbe()
@@ -190,7 +191,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -313,7 +314,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -372,7 +373,8 @@ class MemoryQueueFlowTests
     val dataMgmtService = TestProbe()
     val containerManager = TestProbe()
     val probe = TestProbe()
-    val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+    val testSchedulingDecisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
 
     val getUserLimit = (_: String) => Future.successful(1)
     val container = TestProbe()
@@ -388,7 +390,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -494,7 +496,8 @@ class MemoryQueueFlowTests
     val dataMgmtService = TestProbe()
     val containerManager = TestProbe()
     val probe = TestProbe()
-    val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+    val testSchedulingDecisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
 
     // max retention size is 10 and throttling fraction is 0.8
     // queue will be action throttled at 10 messages and disabled action throttling at 8 messages
@@ -516,7 +519,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -642,7 +645,8 @@ class MemoryQueueFlowTests
     val dataMgmtService = TestProbe()
     val containerManager = TestProbe()
     val probe = TestProbe()
-    val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+    val testSchedulingDecisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
 
     // generate 2 activations
     val messages = getActivationMessages(3)
@@ -661,7 +665,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -762,7 +766,8 @@ class MemoryQueueFlowTests
     val dataMgmtService = TestProbe()
     val containerManager = TestProbe()
     val probe = TestProbe()
-    val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+    val testSchedulingDecisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
 
     // generate 2 activations
     val messages = getActivationMessages(3)
@@ -778,7 +783,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -874,7 +879,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -943,7 +948,8 @@ class MemoryQueueFlowTests
     val watcher = TestProbe()
     val dataMgmtService = TestProbe()
     val containerManager = TestProbe()
-    val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+    val testSchedulingDecisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
     val probe = TestProbe()
     val container = TestProbe()
 
@@ -956,7 +962,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -1040,7 +1046,8 @@ class MemoryQueueFlowTests
     val watcher = TestProbe()
     val dataMgmtService = TestProbe()
     val containerManager = TestProbe()
-    val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+    val testSchedulingDecisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
     val probe = TestProbe()
 
     // generate 2 activations
@@ -1055,7 +1062,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -1143,7 +1150,8 @@ class MemoryQueueFlowTests
     val watcher = TestProbe()
     val dataMgmtService = TestProbe()
     val containerManager = TestProbe()
-    val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+    val testSchedulingDecisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
     val probe = TestProbe()
 
     val messages = getActivationMessages(4)
@@ -1158,7 +1166,7 @@ class MemoryQueueFlowTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -1268,7 +1276,7 @@ class MemoryQueueFlowTests
             durationChecker,
             fqn,
             mockMessaging(),
-            config,
+            schedulingConfig,
             testInvocationNamespace,
             revision,
             endpoints,
@@ -1353,7 +1361,7 @@ class MemoryQueueFlowTests
             durationChecker,
             fqn,
             mockMessaging(),
-            config,
+            schedulingConfig,
             testInvocationNamespace,
             revision,
             endpoints,
@@ -1444,7 +1452,7 @@ class MemoryQueueFlowTests
             durationChecker,
             fqn,
             mockMessaging(),
-            config,
+            schedulingConfig,
             testInvocationNamespace,
             revision,
             endpoints,
@@ -1606,7 +1614,7 @@ class MemoryQueueFlowTests
             durationChecker,
             fqn,
             mockMessaging(),
-            config,
+            schedulingConfig,
             testInvocationNamespace,
             revision,
             endpoints,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index c75a400fc..fc1eac80a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -149,7 +149,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -206,7 +206,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -266,7 +266,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -323,7 +323,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -395,7 +395,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -474,7 +474,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -542,7 +542,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -584,7 +584,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -624,7 +624,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -663,7 +663,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -711,7 +711,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -792,7 +792,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -862,7 +862,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -930,7 +930,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -1022,7 +1022,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -1065,7 +1065,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -1110,7 +1110,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -1162,7 +1162,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -1226,7 +1226,7 @@ class MemoryQueueTests
         durationChecker,
         fqn,
         mockMessaging(),
-        config,
+        schedulingConfig,
         testInvocationNamespace,
         revision,
         endpoints,
@@ -1276,7 +1276,7 @@ class MemoryQueueTests
         durationChecker,
         fqn,
         mockMessaging(),
-        config,
+        schedulingConfig,
         testInvocationNamespace,
         revision,
         endpoints,
@@ -1325,7 +1325,7 @@ class MemoryQueueTests
         durationChecker,
         fqn,
         mockMessaging(),
-        config,
+        schedulingConfig,
         testInvocationNamespace,
         revision,
         endpoints,
@@ -1368,7 +1368,8 @@ class MemoryQueueTests
     val mockEtcdClient = new MockEtcdClient(client, true)
     val probe = TestProbe()
     val watcher = system.actorOf(WatcherService.props(mockEtcdClient))
-    val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+    val testSchedulingDecisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
 
     val mockFunction = (_: String) => {
       Future.successful(4)
@@ -1382,7 +1383,7 @@ class MemoryQueueTests
         durationChecker,
         fqn,
         mockMessaging(),
-        config,
+        schedulingConfig,
         testInvocationNamespace,
         revision,
         endpoints,
@@ -1598,7 +1599,7 @@ class MemoryQueueTests
           durationChecker,
           fqn,
           mockMessaging(),
-          config,
+          schedulingConfig,
           testInvocationNamespace,
           revision,
           endpoints,
@@ -1694,7 +1695,7 @@ class MemoryQueueTests
         durationChecker,
         fqn,
         mockMessaging(),
-        config,
+        schedulingConfig,
         testInvocationNamespace,
         revision,
         endpoints,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index 8c4ee849f..38f13ceee 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -18,6 +18,7 @@
 package org.apache.openwhisk.core.scheduler.queue.test
 
 import java.time.Instant
+
 import akka.actor.{ActorRef, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
 import com.sksamuel.elastic4s.http
@@ -42,7 +43,7 @@ import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationS
 import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
 import org.apache.openwhisk.core.entity.{WhiskActivation, _}
 import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
-import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
 import org.apache.openwhisk.core.scheduler.grpc.GetActivation
 import org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationChecker.{getFromDate, AverageAggregationName}
 import org.apache.openwhisk.core.scheduler.queue._
@@ -60,7 +61,7 @@ import org.apache.openwhisk.core.service.{
 }
 import org.scalamock.scalatest.MockFactory
 
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt}
 import scala.concurrent.{ExecutionContextExecutor, Future}
 import scala.language.{higherKinds, postfixOps}
 
@@ -85,6 +86,8 @@ class MemoryQueueTestsFixture
   val testNamespace = "test-namespace"
   val testAction = "test-action"
 
+  val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
+
   val fqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))
   val revision = DocRevision("1-testRev")
   val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index 83a4b6f8e..72adc8be2 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -22,12 +22,15 @@ import akka.actor.ActorSystem
 import akka.testkit.{TestKit, TestProbe}
 import common.StreamLogging
 import org.apache.openwhisk.core.entity.{EntityName, EntityPath, FullyQualifiedEntityName, SemVer}
+import org.apache.openwhisk.core.scheduler.SchedulingConfig
 import org.apache.openwhisk.core.scheduler.queue._
 import org.junit.runner.RunWith
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpecLike, Matchers}
 
+import scala.concurrent.duration.DurationInt
+
 @RunWith(classOf[JUnitRunner])
 class SchedulingDecisionMakerTests
     extends TestKit(ActorSystem("SchedulingDecisionMakerTests"))
@@ -44,8 +47,10 @@ class SchedulingDecisionMakerTests
   val testAction = "test-action"
   val action = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))
 
+  val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
+
   it should "decide pausing when the limit is less than equal to 0" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -68,7 +73,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "skip decision if the state is already Flushing when the limit is <= 0" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -91,7 +96,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "skip decision at any time if there is no message" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
 
     // For Throttled states, there will be always at least one message to disable the throttling
     val states = List(Running, Idle, Flushing, Removing, Removed)
@@ -119,7 +124,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "skip decision at any time if there is no message even with avg duration" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val states = List(Running, Idle, Flushing, Removing, Removed)
     val testProbe = TestProbe()
 
@@ -145,7 +150,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "enable namespace throttling with dropping msg when there is not enough capacity and no container" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -169,7 +174,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -193,7 +198,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add an initial container if there is no any" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -215,7 +220,7 @@ class SchedulingDecisionMakerTests
     testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
   }
   it should "disable the namespace throttling with adding an initial container when there is no container" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -238,7 +243,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "disable the namespace throttling when there are some containers" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -262,7 +267,7 @@ class SchedulingDecisionMakerTests
 
   // this is an exceptional case
   it should "add an initial container if there is no container in the Running state" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -286,7 +291,7 @@ class SchedulingDecisionMakerTests
 
   // this is an exceptional case
   it should "not add a container if there is no message even in the Running state" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -310,7 +315,7 @@ class SchedulingDecisionMakerTests
 
   // this can happen when the limit was 0 for some reason previously but it is increased after some time.
   it should "add one container if there is no container in the Paused state" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -333,7 +338,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add one container if there is no container in the Waiting state" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -356,7 +361,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add same number of containers with the number of stale messages if there are any" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -379,7 +384,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add exclude the number of in-progress container when adding containers for stale messages when there is no available duration" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -402,7 +407,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add at most the same number with the limit when adding containers for stale messages when there is no available duration" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -425,7 +430,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "not add any container for stale messages if the increment is <= 0 when there when there is no available duration" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -448,7 +453,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add containers for stale messages based on duration when there is available duration" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -473,7 +478,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add containers for stale messages at most the number of messages when there is available duration" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -498,7 +503,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add containers for stale messages within the limit when there is available duration" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -525,7 +530,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add containers based on duration if there is no stale message" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -551,7 +556,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add containers based on duration within the capacity if there is no stale message" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -578,7 +583,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "not add container when expected TPS is bigger than available message if there is no stale message" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -604,7 +609,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add one container when there is no container and are some messages" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -627,7 +632,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add more containers when there are stale messages even in the GracefulShuttingDown state" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -653,7 +658,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "add more containers when there are stale messages even in the GracefulShuttingDown state when there is no duration" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(
@@ -679,7 +684,7 @@ class SchedulingDecisionMakerTests
   }
 
   it should "enable namespace throttling while adding more container when there are stale messages even in the GracefulShuttingDown" in {
-    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+    val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
     val testProbe = TestProbe()
 
     val msg = QueueSnapshot(