You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/05/31 23:09:00 UTC
[openwhisk] branch master updated: Introduce scheduling configurations. (#5232)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 4ef94d727 Introduce scheduling configurations. (#5232)
4ef94d727 is described below
commit 4ef94d727cdc90a772946330de3837649936a5db
Author: Dominic Kim <st...@apache.org>
AuthorDate: Wed Jun 1 08:08:54 2022 +0900
Introduce scheduling configurations. (#5232)
* Introduce scheduling configurations.
* Apply SchedulingConfig to MemoryQueue.
* Apply SchedulingConfig to SchedulingDecisionMaker.
* Apply ScalaFmt
* Remove unused import
* Change configs.
* Fix test cases.
* Apply scalaFmt
* Remove Java8-compat dependency.
---
ansible/group_vars/all | 4 ++
ansible/roles/schedulers/tasks/deploy.yml | 3 ++
.../org/apache/openwhisk/core/WhiskConfig.scala | 2 +
.../core/database/memory/NoopActivationStore.scala | 7 ++-
.../core/loadBalancer/FPCPoolBalancer.scala | 19 ++++---
core/scheduler/src/main/resources/application.conf | 5 ++
.../openwhisk/core/scheduler/Scheduler.scala | 8 ++-
.../scheduler/container/ContainerManager.scala | 3 +-
.../core/scheduler/queue/MemoryQueue.scala | 27 +++-------
.../scheduler/queue/SchedulingDecisionMaker.scala | 15 +++---
.../mongodb/MongoDBStoreBehaviorBase.scala | 2 +-
.../invoker/test/DefaultInvokerServerTests.scala | 8 +--
.../core/invoker/test/FPCInvokerServerTests.scala | 8 +--
.../queue/test/MemoryQueueFlowTests.scala | 54 +++++++++++---------
.../scheduler/queue/test/MemoryQueueTests.scala | 51 ++++++++++---------
.../queue/test/MemoryQueueTestsFixture.scala | 7 ++-
.../queue/test/SchedulingDecisionMakerTests.scala | 59 ++++++++++++----------
17 files changed, 159 insertions(+), 123 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index b94385cc0..5e45a3844 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -515,6 +515,10 @@ scheduler:
inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20 seconds') }}"
managedFraction: "{{ scheduler_managed_fraction | default(1.0 - (scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
blackboxFraction: "{{ scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction) }}"
+ scheduling:
+ staleThreshold: "{{ scheduler_scheduling_staleThreshold | default('100 milliseconds') }}"
+ checkInterval: "{{ scheduler_scheduling_checkInterval | default('100 milliseconds') }}"
+ dropInterval: "{{ scheduler_scheduling_dropInterval | default('10 seconds') }}"
queueManager:
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
diff --git a/ansible/roles/schedulers/tasks/deploy.yml b/ansible/roles/schedulers/tasks/deploy.yml
index 9bf00403a..507f4813f 100644
--- a/ansible/roles/schedulers/tasks/deploy.yml
+++ b/ansible/roles/schedulers/tasks/deploy.yml
@@ -113,6 +113,9 @@
"CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
"CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}"
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetention }}"
+ "CONFIG_whisk_scheduler_scheduling_staleThreshold": "{{ scheduler.scheduling.staleThreshold }}"
+ "CONFIG_whisk_scheduler_scheduling_checkInterval": "{{ scheduler.scheduling.checkInterval }}"
+ "CONFIG_whisk_scheduler_scheduling_dropInterval": "{{ scheduler.scheduling.dropInterval }}"
"CONFIG_whisk_scheduler_queueManager_maxSchedulingTime": "{{ scheduler.queueManager.maxSchedulingTime }}"
"CONFIG_whisk_scheduler_queueManager_maxRetriesToGetQueue": "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
"CONFIG_whisk_scheduler_queue_idleGrace": "{{ scheduler.queue.idleGrace }}"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 7ddbc1f4d..95520f242 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -301,9 +301,11 @@ object ConfigKeys {
val schedulerGrpcService = "whisk.scheduler.grpc"
val schedulerMaxPeek = "whisk.scheduler.max-peek"
+ val schedulerScheduling = "whisk.scheduler.scheduling"
val schedulerQueue = "whisk.scheduler.queue"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
val schedulerInProgressJobRetention = "whisk.scheduler.in-progress-job-retention"
+ val schedulerStaleThreshold = "whisk.scheduler.stale-threshold"
val whiskClusterName = "whisk.cluster.name"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
index 8d45ff02f..fb881fff3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
@@ -20,7 +20,12 @@ package org.apache.openwhisk.core.database.memory
import java.time.Instant
import akka.actor.ActorSystem
import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId, WhiskInstants}
-import org.apache.openwhisk.core.database.{ActivationStore, ActivationStoreProvider, CacheChangeNotification, UserContext}
+import org.apache.openwhisk.core.database.{
+ ActivationStore,
+ ActivationStoreProvider,
+ CacheChangeNotification,
+ UserContext
+}
import org.apache.openwhisk.core.entity.{ActivationId, DocInfo, EntityName, EntityPath, Subject, WhiskActivation}
import spray.json.{JsNumber, JsObject}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index 576d3b3b1..478d5cbb9 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -639,17 +639,16 @@ class FPCPoolBalancer(config: WhiskConfig,
MetricEmitter.emitGaugeMetric(UNHEALTHY_INVOKERS, invokers.count(_.status == Unhealthy))
MetricEmitter.emitGaugeMetric(OFFLINE_INVOKERS, invokers.count(_.status == Offline))
// Add both user memory and busy memory because user memory represents free memory in this case
- MetricEmitter.emitGaugeMetric(
- INVOKER_TOTALMEM,
- invokers.foldLeft(0L) { (total, curr) =>
- if (curr.status.isUsable) {
- curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
- } else {
- total
- }
- })
+ MetricEmitter.emitGaugeMetric(INVOKER_TOTALMEM, invokers.foldLeft(0L) { (total, curr) =>
+ if (curr.status.isUsable) {
+ curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
+ } else {
+ total
+ }
+ })
MetricEmitter.emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
- MetricEmitter.emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
+ MetricEmitter
+ .emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
})
}
diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf
index 860b6e0ff..211ae5f0d 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -64,6 +64,11 @@ whisk {
grpc {
tls = "false"
}
+ scheduling {
+ stale-threshold = "100 milliseconds"
+ check-interval = "100 milliseconds"
+ drop-interval = "10 seconds"
+ }
queue {
idle-grace = "20 seconds"
stop-grace = "20 seconds"
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index f93a766b1..84643c270 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -70,6 +70,8 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
val leaseService =
actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, schedulerId, watcherService))
+ val schedulingConfig = loadConfigOrThrow[SchedulingConfig](ConfigKeys.schedulerQueue)
+
implicit val entityStore = WhiskEntityStore.datastore()
private val activationStore =
SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
@@ -191,7 +193,7 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef =
(factory, invocationNamespace, fqn, revision, actionMetaData) => {
// Todo: Change this to SPI
- val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn))
+ val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn, schedulingConfig))
factory.actorOf(
MemoryQueue.props(
@@ -199,7 +201,7 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
durationChecker,
fqn,
producer,
- config,
+ schedulingConfig,
invocationNamespace,
revision,
schedulerEndpoints,
@@ -395,3 +397,5 @@ object SchedulerStates extends DefaultJsonProtocol {
def parse(states: String) = Try(serdes.read(states.parseJson))
}
+
+case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval: FiniteDuration, dropInterval: FiniteDuration)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
index eaeb70be4..837045ed2 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -251,7 +251,8 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
// Filter out messages which can use warmed container
private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
msgs.filter { msg =>
- val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
+ val warmedPrefix =
+ containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
val chosenInvoker = warmedContainers
.filter(!inProgressWarmedContainers.values.toSeq.contains(_))
.find { container =>
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 3215be99a..d54e697e2 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -24,6 +24,7 @@ import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
import akka.util.Timeout
import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
import org.apache.openwhisk.core.connector._
@@ -33,13 +34,6 @@ import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
-import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
-import org.apache.openwhisk.core.scheduler.message.{
- ContainerCreation,
- ContainerDeletion,
- FailedCreationJob,
- SuccessfulCreationJob
-}
import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse}
import org.apache.openwhisk.core.scheduler.message.{
ContainerCreation,
@@ -47,8 +41,8 @@ import org.apache.openwhisk.core.scheduler.message.{
FailedCreationJob,
SuccessfulCreationJob
}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
import org.apache.openwhisk.core.service._
-import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
import pureconfig.generic.auto._
import pureconfig.loadConfigOrThrow
@@ -59,7 +53,6 @@ import scala.collection.immutable.Queue
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
-import scala.language.postfixOps
import scala.util.{Failure, Success}
// States
@@ -116,7 +109,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private val durationChecker: DurationChecker,
private val action: FullyQualifiedEntityName,
messagingProducer: MessageProducer,
- config: WhiskConfig,
+ schedulingConfig: SchedulingConfig,
invocationNamespace: String,
revision: DocRevision,
endpoints: SchedulerEndpoints,
@@ -144,11 +137,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private implicit val timeout = Timeout(5.seconds)
private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId)
+ private val StaleDuration = Duration.ofMillis(schedulingConfig.staleThreshold.toMillis)
private val unversionedAction = action.copy(version = None)
- private val checkInterval: FiniteDuration = 100 milliseconds
- private val StaleThreshold: Double = 100.0
- private val StaleDuration = Duration.ofMillis(StaleThreshold.toLong)
- private val dropInterval: FiniteDuration = 10 seconds
private val leaderKey = QueueKeys.queue(invocationNamespace, unversionedAction, leader = true)
private val inProgressContainerPrefixKey =
containerPrefix(ContainerKeys.inProgressPrefix, invocationNamespace, action, Some(revision))
@@ -834,7 +824,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
}
}
-
private def handleStaleActivationsWhenActionUpdated(queueManager: ActorRef): Unit = {
if (queue.size > 0) {
// if doesn't exist old container to pull old memoryQueue's activation, send the old activations to queueManager
@@ -862,12 +851,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// since there is no initial delay, it will try to create a container at initialization time
// these schedulers will run forever and stop when the memory queue stops
private def startMonitoring(): (ActorRef, ActorRef) = {
- val droppingScheduler = Scheduler.scheduleWaitAtLeast(dropInterval) { () =>
+ val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
checkToDropStaleActivation(queue, queueConfig.maxRetentionMs, invocationNamespace, action, stateName, self)
Future.successful(())
}
- val monitoringScheduler = Scheduler.scheduleWaitAtLeast(checkInterval) { () =>
+ val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () =>
// the average duration is updated every checkInterval
if (averageDurationBuffer.nonEmpty) {
averageDuration = Some(averageDurationBuffer.average)
@@ -1048,7 +1037,7 @@ object MemoryQueue {
durationChecker: DurationChecker,
fqn: FullyQualifiedEntityName,
messagingProducer: MessageProducer,
- config: WhiskConfig,
+ schedulingConfig: SchedulingConfig,
invocationNamespace: String,
revision: DocRevision,
endpoints: SchedulerEndpoints,
@@ -1067,7 +1056,7 @@ object MemoryQueue {
durationChecker,
fqn: FullyQualifiedEntityName,
messagingProducer: MessageProducer,
- config: WhiskConfig,
+ schedulingConfig: SchedulingConfig,
invocationNamespace: String,
revision,
endpoints: SchedulerEndpoints,
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index 0bbc1d98b..0e1f4ffa8 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -20,6 +20,7 @@ package org.apache.openwhisk.core.scheduler.queue
import akka.actor.{Actor, ActorSystem, Props}
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.entity.FullyQualifiedEntityName
+import org.apache.openwhisk.core.scheduler.SchedulingConfig
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
@@ -27,9 +28,11 @@ import scala.util.{Failure, Success}
class SchedulingDecisionMaker(
invocationNamespace: String,
action: FullyQualifiedEntityName,
- StaleThreshold: Double = 100.0)(implicit val actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging)
+ schedulingConfig: SchedulingConfig)(implicit val actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging)
extends Actor {
+ private val staleThreshold: Double = schedulingConfig.staleThreshold.toMillis.toDouble
+
override def receive: Receive = {
case msg: QueueSnapshot =>
decide(msg)
@@ -135,7 +138,7 @@ class SchedulingDecisionMaker(
case (Running, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
- val containerThroughput = StaleThreshold / duration
+ val containerThroughput = staleThreshold / duration
val num = ceiling(availableMsg.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
@@ -153,7 +156,7 @@ class SchedulingDecisionMaker(
// need more containers and a message is already processed
case (Running, Some(duration)) =>
// we can safely get the value as we already checked the existence
- val containerThroughput = StaleThreshold / duration
+ val containerThroughput = staleThreshold / duration
val expectedTps = containerThroughput * (existing + inProgress)
if (availableMsg >= expectedTps && existing + inProgress < availableMsg) {
@@ -180,7 +183,7 @@ class SchedulingDecisionMaker(
// this case is for that as a last resort.
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
- val containerThroughput = StaleThreshold / duration
+ val containerThroughput = staleThreshold / duration
val num = ceiling(availableMsg.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
@@ -252,10 +255,10 @@ class SchedulingDecisionMaker(
}
object SchedulingDecisionMaker {
- def props(invocationNamespace: String, action: FullyQualifiedEntityName, StaleThreshold: Double = 100.0)(
+ def props(invocationNamespace: String, action: FullyQualifiedEntityName, schedulingConfig: SchedulingConfig)(
implicit actorSystem: ActorSystem,
ec: ExecutionContext,
logging: Logging): Props = {
- Props(new SchedulingDecisionMaker(invocationNamespace, action, StaleThreshold))
+ Props(new SchedulingDecisionMaker(invocationNamespace, action, schedulingConfig))
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
index f6bb04967..50825dd09 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
@@ -25,7 +25,7 @@ import org.testcontainers.containers.MongoDBContainer
import pureconfig.loadConfigOrThrow
import pureconfig.generic.auto._
-import scala.reflect.{ClassTag, classTag}
+import scala.reflect.{classTag, ClassTag}
trait MongoDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase {
val imageName = loadConfigOrThrow[String]("whisk.mongodb.docker-image")
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
index 57cb976d2..76f1410a7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -83,9 +83,11 @@ class DefaultInvokerServerTests
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
- Unmarshal(responseEntity).to[String].map(response => {
- InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
- })
+ Unmarshal(responseEntity)
+ .to[String]
+ .map(response => {
+ InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
+ })
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
index e7ab02e2b..2393b8775 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -83,9 +83,11 @@ class FPCInvokerServerTests
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
- Unmarshal(responseEntity).to[String].map(response => {
- InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
- })
+ Unmarshal(responseEntity)
+ .to[String]
+ .map(response => {
+ InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
+ })
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 1d523c20d..edacd26f3 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -106,7 +106,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -176,7 +176,8 @@ class MemoryQueueFlowTests
val dataMgmtService = TestProbe()
val containerManager = TestProbe()
val probe = TestProbe()
- val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+ val testSchedulingDecisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
val messages = getActivationMessages(2)
val container = TestProbe()
@@ -190,7 +191,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -313,7 +314,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -372,7 +373,8 @@ class MemoryQueueFlowTests
val dataMgmtService = TestProbe()
val containerManager = TestProbe()
val probe = TestProbe()
- val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+ val testSchedulingDecisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
val getUserLimit = (_: String) => Future.successful(1)
val container = TestProbe()
@@ -388,7 +390,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -494,7 +496,8 @@ class MemoryQueueFlowTests
val dataMgmtService = TestProbe()
val containerManager = TestProbe()
val probe = TestProbe()
- val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+ val testSchedulingDecisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
// max retention size is 10 and throttling fraction is 0.8
// queue will be action throttled at 10 messages and disabled action throttling at 8 messages
@@ -516,7 +519,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -642,7 +645,8 @@ class MemoryQueueFlowTests
val dataMgmtService = TestProbe()
val containerManager = TestProbe()
val probe = TestProbe()
- val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+ val testSchedulingDecisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
// generate 2 activations
val messages = getActivationMessages(3)
@@ -661,7 +665,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -762,7 +766,8 @@ class MemoryQueueFlowTests
val dataMgmtService = TestProbe()
val containerManager = TestProbe()
val probe = TestProbe()
- val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+ val testSchedulingDecisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
// generate 2 activations
val messages = getActivationMessages(3)
@@ -778,7 +783,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -874,7 +879,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -943,7 +948,8 @@ class MemoryQueueFlowTests
val watcher = TestProbe()
val dataMgmtService = TestProbe()
val containerManager = TestProbe()
- val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+ val testSchedulingDecisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
val probe = TestProbe()
val container = TestProbe()
@@ -956,7 +962,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1040,7 +1046,8 @@ class MemoryQueueFlowTests
val watcher = TestProbe()
val dataMgmtService = TestProbe()
val containerManager = TestProbe()
- val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+ val testSchedulingDecisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
val probe = TestProbe()
// generate 2 activations
@@ -1055,7 +1062,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1143,7 +1150,8 @@ class MemoryQueueFlowTests
val watcher = TestProbe()
val dataMgmtService = TestProbe()
val containerManager = TestProbe()
- val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+ val testSchedulingDecisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
val probe = TestProbe()
val messages = getActivationMessages(4)
@@ -1158,7 +1166,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1268,7 +1276,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1353,7 +1361,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1444,7 +1452,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1606,7 +1614,7 @@ class MemoryQueueFlowTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index c75a400fc..fc1eac80a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -149,7 +149,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -206,7 +206,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -266,7 +266,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -323,7 +323,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -395,7 +395,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -474,7 +474,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -542,7 +542,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -584,7 +584,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -624,7 +624,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -663,7 +663,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -711,7 +711,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -792,7 +792,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -862,7 +862,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -930,7 +930,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1022,7 +1022,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1065,7 +1065,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1110,7 +1110,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1162,7 +1162,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1226,7 +1226,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1276,7 +1276,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1325,7 +1325,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1368,7 +1368,8 @@ class MemoryQueueTests
val mockEtcdClient = new MockEtcdClient(client, true)
val probe = TestProbe()
val watcher = system.actorOf(WatcherService.props(mockEtcdClient))
- val testSchedulingDecisionMaker = system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn))
+ val testSchedulingDecisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, fqn, schedulingConfig))
val mockFunction = (_: String) => {
Future.successful(4)
@@ -1382,7 +1383,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1598,7 +1599,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
@@ -1694,7 +1695,7 @@ class MemoryQueueTests
durationChecker,
fqn,
mockMessaging(),
- config,
+ schedulingConfig,
testInvocationNamespace,
revision,
endpoints,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index 8c4ee849f..38f13ceee 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -18,6 +18,7 @@
package org.apache.openwhisk.core.scheduler.queue.test
import java.time.Instant
+
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import com.sksamuel.elastic4s.http
@@ -42,7 +43,7 @@ import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationS
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import org.apache.openwhisk.core.entity.{WhiskActivation, _}
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
-import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
import org.apache.openwhisk.core.scheduler.grpc.GetActivation
import org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationChecker.{getFromDate, AverageAggregationName}
import org.apache.openwhisk.core.scheduler.queue._
@@ -60,7 +61,7 @@ import org.apache.openwhisk.core.service.{
}
import org.scalamock.scalatest.MockFactory
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt}
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.language.{higherKinds, postfixOps}
@@ -85,6 +86,8 @@ class MemoryQueueTestsFixture
val testNamespace = "test-namespace"
val testAction = "test-action"
+ val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
+
val fqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))
val revision = DocRevision("1-testRev")
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index 83a4b6f8e..72adc8be2 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -22,12 +22,15 @@ import akka.actor.ActorSystem
import akka.testkit.{TestKit, TestProbe}
import common.StreamLogging
import org.apache.openwhisk.core.entity.{EntityName, EntityPath, FullyQualifiedEntityName, SemVer}
+import org.apache.openwhisk.core.scheduler.SchedulingConfig
import org.apache.openwhisk.core.scheduler.queue._
import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpecLike, Matchers}
+import scala.concurrent.duration.DurationInt
+
@RunWith(classOf[JUnitRunner])
class SchedulingDecisionMakerTests
extends TestKit(ActorSystem("SchedulingDecisionMakerTests"))
@@ -44,8 +47,10 @@ class SchedulingDecisionMakerTests
val testAction = "test-action"
val action = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))
+ val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
+
it should "decide pausing when the limit is less than equal to 0" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -68,7 +73,7 @@ class SchedulingDecisionMakerTests
}
it should "skip decision if the state is already Flushing when the limit is <= 0" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -91,7 +96,7 @@ class SchedulingDecisionMakerTests
}
it should "skip decision at any time if there is no message" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
// For Throttled states, there will be always at least one message to disable the throttling
val states = List(Running, Idle, Flushing, Removing, Removed)
@@ -119,7 +124,7 @@ class SchedulingDecisionMakerTests
}
it should "skip decision at any time if there is no message even with avg duration" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val states = List(Running, Idle, Flushing, Removing, Removed)
val testProbe = TestProbe()
@@ -145,7 +150,7 @@ class SchedulingDecisionMakerTests
}
it should "enable namespace throttling with dropping msg when there is not enough capacity and no container" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -169,7 +174,7 @@ class SchedulingDecisionMakerTests
}
it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -193,7 +198,7 @@ class SchedulingDecisionMakerTests
}
it should "add an initial container if there is no any" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -215,7 +220,7 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
}
it should "disable the namespace throttling with adding an initial container when there is no container" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -238,7 +243,7 @@ class SchedulingDecisionMakerTests
}
it should "disable the namespace throttling when there are some containers" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -262,7 +267,7 @@ class SchedulingDecisionMakerTests
// this is an exceptional case
it should "add an initial container if there is no container in the Running state" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -286,7 +291,7 @@ class SchedulingDecisionMakerTests
// this is an exceptional case
it should "not add a container if there is no message even in the Running state" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -310,7 +315,7 @@ class SchedulingDecisionMakerTests
// this can happen when the limit was 0 for some reason previously but it is increased after some time.
it should "add one container if there is no container in the Paused state" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -333,7 +338,7 @@ class SchedulingDecisionMakerTests
}
it should "add one container if there is no container in the Waiting state" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -356,7 +361,7 @@ class SchedulingDecisionMakerTests
}
it should "add same number of containers with the number of stale messages if there are any" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -379,7 +384,7 @@ class SchedulingDecisionMakerTests
}
it should "add exclude the number of in-progress container when adding containers for stale messages when there is no available duration" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -402,7 +407,7 @@ class SchedulingDecisionMakerTests
}
it should "add at most the same number with the limit when adding containers for stale messages when there is no available duration" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -425,7 +430,7 @@ class SchedulingDecisionMakerTests
}
it should "not add any container for stale messages if the increment is <= 0 when there when there is no available duration" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -448,7 +453,7 @@ class SchedulingDecisionMakerTests
}
it should "add containers for stale messages based on duration when there is available duration" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -473,7 +478,7 @@ class SchedulingDecisionMakerTests
}
it should "add containers for stale messages at most the number of messages when there is available duration" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -498,7 +503,7 @@ class SchedulingDecisionMakerTests
}
it should "add containers for stale messages within the limit when there is available duration" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -525,7 +530,7 @@ class SchedulingDecisionMakerTests
}
it should "add containers based on duration if there is no stale message" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -551,7 +556,7 @@ class SchedulingDecisionMakerTests
}
it should "add containers based on duration within the capacity if there is no stale message" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -578,7 +583,7 @@ class SchedulingDecisionMakerTests
}
it should "not add container when expected TPS is bigger than available message if there is no stale message" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -604,7 +609,7 @@ class SchedulingDecisionMakerTests
}
it should "add one container when there is no container and are some messages" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -627,7 +632,7 @@ class SchedulingDecisionMakerTests
}
it should "add more containers when there are stale messages even in the GracefulShuttingDown state" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -653,7 +658,7 @@ class SchedulingDecisionMakerTests
}
it should "add more containers when there are stale messages even in the GracefulShuttingDown state when there is no duration" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(
@@ -679,7 +684,7 @@ class SchedulingDecisionMakerTests
}
it should "enable namespace throttling while adding more container when there are stale messages even in the GracefulShuttingDown" in {
- val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action))
+ val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
val msg = QueueSnapshot(