You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/10/14 04:44:56 UTC

[openwhisk] branch master updated: Prevent cycle in the QueueManager (#5332)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ef725a653 Prevent cycle in the QueueManager (#5332)
ef725a653 is described below

commit ef725a653ab112391f79c274d8e3dcfb915d59a3
Author: Dominic Kim <st...@apache.org>
AuthorDate: Fri Oct 14 13:44:50 2022 +0900

    Prevent cycle in the QueueManager (#5332)
---
 .../org/apache/openwhisk/common/Logging.scala      |   1 +
 .../core/scheduler/queue/QueueManager.scala        | 144 +++++++++++++++++----
 .../scheduler/queue/test/QueueManagerTests.scala   |  72 +++++++++++
 3 files changed, 195 insertions(+), 22 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 541aee055..ff82ef5fb 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -601,6 +601,7 @@ object LoggingMarkers {
     LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
   def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
   def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
+  def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
   def SCHEDULER_QUEUE_UPDATE(reason: String) =
     LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
   def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index 3c11916af..d87338dd3 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -55,6 +55,10 @@ case class CreateNewQueue(activationMessage: ActivationMessage,
                           action: FullyQualifiedEntityName,
                           actionMetadata: WhiskActionMetaData)
 
+case class RecoverQueue(activationMessage: ActivationMessage,
+                        action: FullyQualifiedEntityName,
+                        actionMetadata: WhiskActionMetaData)
+
 case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
 
 class QueueManager(
@@ -80,7 +84,7 @@ class QueueManager(
 
   private val actorSelectionMap = TrieMap[String, ActorSelection]()
 
-  private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
+  private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()
 
   private implicit val askTimeout = Timeout(5.seconds)
   private implicit val ec = context.dispatcher
@@ -90,6 +94,8 @@ class QueueManager(
   // watch leaders and register them into actorSelectionMap
   watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
 
+  private var isShuttingDown = false
+
   override def receive: Receive = {
     case request: CreateQueue if isWarmUpAction(request.fqn) =>
       logging.info(
@@ -114,12 +120,12 @@ class QueueManager(
       msg.leadership match {
         case Right(EtcdLeader(key, value, lease)) =>
           leaderElectionCallbacks.remove(key).foreach { callback =>
-            callback(Right(EtcdLeader(key, value, lease)))
+            callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
           }
 
         case Left(EtcdFollower(key, value)) =>
           leaderElectionCallbacks.remove(key).foreach { callback =>
-            callback(Left(EtcdFollower(key, value)))
+            callback(Left(EtcdFollower(key, value)), isShuttingDown)
           }
       }
 
@@ -129,7 +135,11 @@ class QueueManager(
         s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
         msg.transid)
 
-      handleActivationMessage(msg)
+      if (sender() == self) {
+        handleCycle(msg)(msg.transid)
+      } else {
+        handleActivationMessage(msg)
+      }
 
     case UpdateMemoryQueue(oldAction, newAction, msg) =>
       logging.info(
@@ -164,6 +174,25 @@ class QueueManager(
           updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
           queue ! msg
           msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
+          if (isShuttingDown) {
+            queue ! GracefulShutdown
+          }
+      }
+
+    case RecoverQueue(msg, action, actionMetaData) =>
+      QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
+        // a newer queue is created, send msg to new queue
+        case Some(key) if key.docInfo.rev >= msg.revision =>
+          QueuePool.get(key) match {
+            case Some(queue) if queue.isLeader =>
+              queue.queue ! msg.copy(revision = key.docInfo.rev)
+              logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
+            case _ =>
+              recreateQueue(action, msg, actionMetaData)
+          }
+        case _ =>
+          recreateQueue(action, msg, actionMetaData)
+
       }
 
     // leaderKey is now optional, it becomes None when the stale queue is removed
@@ -208,6 +237,7 @@ class QueueManager(
       }
 
     case GracefulShutdown =>
+      isShuttingDown = true
       logging.info(this, s"Gracefully shutdown the queue manager")
 
       watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
@@ -278,6 +308,62 @@ class QueueManager(
     initRevisionMap.update(key, revision)
   }
 
+  private def recreateQueue(action: FullyQualifiedEntityName,
+                            msg: ActivationMessage,
+                            actionMetaData: WhiskActionMetaData): Unit = {
+    logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
+    val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
+    queue ! msg
+    msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
+    if (isShuttingDown) {
+      queue ! GracefulShutdown
+    }
+  }
+
+  private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Unit = {
+    val action = msg.action
+    QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
+      // a newer queue is created, send msg to new queue
+      case Some(key) if key.docInfo.rev >= msg.revision =>
+        QueuePool.get(key) match {
+          case Some(queue) if queue.isLeader =>
+            queue.queue ! msg.copy(revision = key.docInfo.rev)
+            logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
+          case _ =>
+            recoverQueue(msg)
+        }
+      case _ =>
+        recoverQueue(msg)
+    }
+  }
+
+  private def recoverQueue(msg: ActivationMessage)(implicit transid: TransactionId): Unit = {
+    val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
+    logging.info(this, s"Recover a queue for ${msg.action},")
+    getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
+      .map { actionMetaData: WhiskActionMetaData =>
+        actionMetaData.toExecutableWhiskAction match {
+          case Some(_) =>
+            self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
+            transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")
+
+          case None =>
+            val message =
+              s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
+            completeErrorActivation(msg, message)
+            transid.failed(this, start, message)
+        }
+      }
+      .recover {
+        case t =>
+          transid.failed(
+            this,
+            start,
+            s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
+          completeErrorActivation(msg, t.getMessage)
+      }
+  }
+
   private def createNewQueue(newAction: FullyQualifiedEntityName, msg: ActivationMessage)(
     implicit transid: TransactionId): Future[Any] = {
     val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_UPDATE("version-mismatch"))
@@ -453,24 +539,24 @@ class QueueManager(
           case None =>
             dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
             leaderElectionCallbacks.put(
-              leaderKey, {
-                case Right(EtcdLeader(_, _, _)) =>
-                  val queue = childFactory(
-                    context,
-                    request.invocationNamespace,
-                    request.fqn,
-                    request.revision,
-                    request.whiskActionMetaData)
-                  queue ! Start
-                  QueuePool.put(
-                    MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
-                    MemoryQueueValue(queue, true))
-                  updateInitRevisionMap(leaderKey, request.revision)
-                  receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
-
-                // in case of follower, do nothing
-                case Left(EtcdFollower(_, _)) =>
-                  receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+              leaderKey,
+              (electResult, isShuttingDown) => {
+                electResult match {
+                  case Right(EtcdLeader(_, _, _)) =>
+                    val queue = createAndStartQueue(
+                      request.invocationNamespace,
+                      request.fqn,
+                      request.revision,
+                      request.whiskActionMetaData)
+                    receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+                    if (isShuttingDown) {
+                      queue ! GracefulShutdown
+                    }
+
+                  // in case of follower, do nothing
+                  case Left(EtcdFollower(_, _)) =>
+                    receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+                }
               })
 
           // there is already a leader election for leaderKey, so skip it
@@ -490,6 +576,20 @@ class QueueManager(
     }
   }
 
+  private def createAndStartQueue(invocationNamespace: String,
+                                  action: FullyQualifiedEntityName,
+                                  revision: DocRevision,
+                                  actionMetaData: WhiskActionMetaData): ActorRef = {
+    val queue =
+      childFactory(context, invocationNamespace, action, revision, actionMetaData)
+    queue ! Start
+    QueuePool.put(
+      MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
+      MemoryQueueValue(queue, true))
+    updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
+    queue
+  }
+
   private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
     MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
   })
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index 6ad1513f7..b60472e81 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -549,6 +549,65 @@ class QueueManagerTests
     probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
   }
 
+  it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
+    val mockEtcdClient = mock[EtcdClient]
+    (mockEtcdClient
+      .get(_: String))
+      .expects(*)
+      .returning(Future.successful {
+        RangeResponse
+          .newBuilder()
+          .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
+          .build()
+      })
+      .anyNumberOfTimes()
+    val dataManagementService = getTestDataManagementService()
+    val watcher = TestProbe()
+
+    val probe = TestProbe()
+
+    val childFactory =
+      (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    //current queue's revision is `1-test-revision`
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    probe.expectMsg(Start)
+
+    // simulate queue superseded, the queue will be removed but leader key won't be deleted
+    queueManager ! QueueRemoved(
+      testInvocationNamespace,
+      testFQN.toDocId.asDocInfo(testDocRevision),
+      Some(testLeaderKey))
+
+    queueManager.!(activationMessage)(queueManager)
+    val msg2 = activationMessage.copy(activationId = ActivationId.generate())
+    queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
+    probe.expectMsg(Start)
+    probe.expectMsg(activationMessage)
+    probe.expectMsg(msg2)
+  }
+
   it should "not skip outdated activation when the revision is older than the one in a datastore" in {
     stream.reset()
     val mockEtcdClient = mock[EtcdClient]
@@ -1082,6 +1141,9 @@ class QueueManagerTests
     val probe = TestProbe()
     val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
     val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
+    val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
+    val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
+    val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))
 
     // probe will watch all actors which are created by these factories
     val childFactory =
@@ -1129,5 +1191,15 @@ class QueueManagerTests
     queueManager ! GracefulShutdown
 
     probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
+
+    // after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
+    (queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
+      .mapTo[CreateQueueResponse]
+      .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
+    queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
+    queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)
+
+    probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
+
   }
 }