You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/08/03 08:39:38 UTC
[openwhisk] branch master updated: Go to the NamespaceThrottled state rather than Flushing state. (#5303)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 8fd21565a Go to the NamespaceThrottled state rather than Flushing state. (#5303)
8fd21565a is described below
commit 8fd21565a293eb548e6aceddb925cec49bbb6b03
Author: Dominic Kim <st...@apache.org>
AuthorDate: Wed Aug 3 17:39:32 2022 +0900
Go to the NamespaceThrottled state rather than Flushing state. (#5303)
* Currently MemoryQueue will go to Flushing state when receive a EnableNamespaceThrottling(dropMsg=true) message, but the Flushing state doesn't have a case to disable namespace throttling at all.
* Remove unused import.
---
.../core/scheduler/queue/MemoryQueue.scala | 21 ++++++---------------
.../scheduler/queue/test/MemoryQueueFlowTests.scala | 12 ++++++++----
.../scheduler/queue/test/MemoryQueueTests.scala | 2 +-
3 files changed, 15 insertions(+), 20 deletions(-)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index fabc785a4..312e9ecff 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -17,15 +17,13 @@
package org.apache.openwhisk.core.scheduler.queue
-import java.time.{Duration, Instant}
-import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
import akka.util.Timeout
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.ack.ActiveAck
-import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
+import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.Interval
import org.apache.openwhisk.core.database.{NoDocumentException, UserContext}
@@ -44,10 +42,12 @@ import org.apache.openwhisk.core.scheduler.message.{
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
import org.apache.openwhisk.core.service._
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
-import pureconfig.generic.auto._
import pureconfig.loadConfigOrThrow
import spray.json._
+import pureconfig.generic.auto._
+import java.time.{Duration, Instant}
+import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.collection.mutable
@@ -224,18 +224,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
logging.info(this, s"[$invocationNamespace:$action:$stateName] Enable namespace throttling.")
enableNamespaceThrottling()
- // if no container could be created, it is same with Flushing state.
- if (dropMsg) {
+ if (dropMsg)
completeAllActivations(tooManyConcurrentRequests, isWhiskError = false)
- goto(Flushing) using FlushingData(
- data.schedulerActor,
- data.droppingActor,
- TooManyConcurrentRequests,
- tooManyConcurrentRequests)
- } else {
- // if there are already some containers running, activations can still be processed so goto the NamespaceThrottled state.
- goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor)
- }
+ goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor)
case Event(StateTimeout, data: RunningData) =>
if (queue.isEmpty && (containers.size + creationIds.size) <= 0) {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 267d3a81d..52568327f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -280,7 +280,7 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- it should "go to the Flushing state dropping messages when it can't create an initial container" in {
+ it should "go to the NamespaceThrottled state dropping messages when it can't create an initial container" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -340,7 +340,7 @@ class MemoryQueueFlowTests
fsm ! message
dataMgmtService.expectMsg(RegisterData(namespaceThrottlingKey, true.toString, failoverEnabled = false))
- probe.expectMsg(Transition(fsm, Running, Flushing))
+ probe.expectMsg(Transition(fsm, Running, NamespaceThrottled))
awaitAssert({
ackedMessageCount shouldBe 1
@@ -352,13 +352,17 @@ class MemoryQueueFlowTests
fsm.underlyingActor.queue.size shouldBe 0
}, 5.seconds)
- parent.expectMsg(flushGrace * 2 + 5.seconds, queueRemovedMsg)
- probe.expectMsg(Transition(fsm, Flushing, Removed))
+ fsm ! GracefulShutdown
+
+ parent.expectMsg(queueRemovedMsg)
+ probe.expectMsg(Transition(fsm, NamespaceThrottled, Removing))
fsm ! QueueRemovedCompleted
expectDataCleanUp(watcher, dataMgmtService)
+ probe.expectMsg(Transition(fsm, Removing, Removed))
+
probe.expectTerminated(fsm, 10.seconds)
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 950af0292..fd2ac7363 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1357,7 +1357,7 @@ class MemoryQueueTests
parent.expectMsg(10 seconds, Transition(fsm, Uninitialized, Running))
fsm ! EnableNamespaceThrottling(dropMsg = true)
- parent.expectMsg(10 seconds, Transition(fsm, Running, Flushing))
+ parent.expectMsg(10 seconds, Transition(fsm, Running, NamespaceThrottled))
dataManagementService.expectMsg(RegisterData(namespaceThrottlingKey, true.toString, false))
fsm.stop()