You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/08/03 08:39:38 UTC

[openwhisk] branch master updated: Go to the NamespaceThrottled state rather than Flushing state. (#5303)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fd21565a Go to the NamespaceThrottled state rather than Flushing state. (#5303)
8fd21565a is described below

commit 8fd21565a293eb548e6aceddb925cec49bbb6b03
Author: Dominic Kim <st...@apache.org>
AuthorDate: Wed Aug 3 17:39:32 2022 +0900

    Go to the NamespaceThrottled state rather than Flushing state. (#5303)
    
    * Currently MemoryQueue will go to Flushing state when receive a EnableNamespaceThrottling(dropMsg=true) message, but the Flushing state doesn't have a case to disable namespace throttling at all.
    
    * Remove unused import.
---
 .../core/scheduler/queue/MemoryQueue.scala          | 21 ++++++---------------
 .../scheduler/queue/test/MemoryQueueFlowTests.scala | 12 ++++++++----
 .../scheduler/queue/test/MemoryQueueTests.scala     |  2 +-
 3 files changed, 15 insertions(+), 20 deletions(-)

diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index fabc785a4..312e9ecff 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -17,15 +17,13 @@
 
 package org.apache.openwhisk.core.scheduler.queue
 
-import java.time.{Duration, Instant}
-import java.util.concurrent.atomic.AtomicInteger
 import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
 import akka.util.Timeout
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.ack.ActiveAck
-import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
+import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.containerpool.Interval
 import org.apache.openwhisk.core.database.{NoDocumentException, UserContext}
@@ -44,10 +42,12 @@ import org.apache.openwhisk.core.scheduler.message.{
 import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
 import org.apache.openwhisk.core.service._
 import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
-import pureconfig.generic.auto._
 import pureconfig.loadConfigOrThrow
 import spray.json._
+import pureconfig.generic.auto._
 
+import java.time.{Duration, Instant}
+import java.util.concurrent.atomic.AtomicInteger
 import scala.annotation.tailrec
 import scala.collection.immutable.Queue
 import scala.collection.mutable
@@ -224,18 +224,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       logging.info(this, s"[$invocationNamespace:$action:$stateName] Enable namespace throttling.")
       enableNamespaceThrottling()
 
-      // if no container could be created, it is same with Flushing state.
-      if (dropMsg) {
+      if (dropMsg)
         completeAllActivations(tooManyConcurrentRequests, isWhiskError = false)
-        goto(Flushing) using FlushingData(
-          data.schedulerActor,
-          data.droppingActor,
-          TooManyConcurrentRequests,
-          tooManyConcurrentRequests)
-      } else {
-        // if there are already some containers running, activations can still be processed so goto the NamespaceThrottled state.
-        goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor)
-      }
+      goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor)
 
     case Event(StateTimeout, data: RunningData) =>
       if (queue.isEmpty && (containers.size + creationIds.size) <= 0) {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 267d3a81d..52568327f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -280,7 +280,7 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  it should "go to the Flushing state dropping messages when it can't create an initial container" in {
+  it should "go to the NamespaceThrottled state dropping messages when it can't create an initial container" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -340,7 +340,7 @@ class MemoryQueueFlowTests
     fsm ! message
 
     dataMgmtService.expectMsg(RegisterData(namespaceThrottlingKey, true.toString, failoverEnabled = false))
-    probe.expectMsg(Transition(fsm, Running, Flushing))
+    probe.expectMsg(Transition(fsm, Running, NamespaceThrottled))
 
     awaitAssert({
       ackedMessageCount shouldBe 1
@@ -352,13 +352,17 @@ class MemoryQueueFlowTests
       fsm.underlyingActor.queue.size shouldBe 0
     }, 5.seconds)
 
-    parent.expectMsg(flushGrace * 2 + 5.seconds, queueRemovedMsg)
-    probe.expectMsg(Transition(fsm, Flushing, Removed))
+    fsm ! GracefulShutdown
+
+    parent.expectMsg(queueRemovedMsg)
+    probe.expectMsg(Transition(fsm, NamespaceThrottled, Removing))
 
     fsm ! QueueRemovedCompleted
 
     expectDataCleanUp(watcher, dataMgmtService)
 
+    probe.expectMsg(Transition(fsm, Removing, Removed))
+
     probe.expectTerminated(fsm, 10.seconds)
   }
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 950af0292..fd2ac7363 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1357,7 +1357,7 @@ class MemoryQueueTests
     parent.expectMsg(10 seconds, Transition(fsm, Uninitialized, Running))
 
     fsm ! EnableNamespaceThrottling(dropMsg = true)
-    parent.expectMsg(10 seconds, Transition(fsm, Running, Flushing))
+    parent.expectMsg(10 seconds, Transition(fsm, Running, NamespaceThrottled))
     dataManagementService.expectMsg(RegisterData(namespaceThrottlingKey, true.toString, false))
 
     fsm.stop()