You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/05/31 23:09:25 UTC

[openwhisk] branch master updated: Prevent cycle sending (#5251)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a75950a54 Prevent cycle sending (#5251)
a75950a54 is described below

commit a75950a543b83ffbd7753f128855556198ee155d
Author: jiangpengcheng <ji...@navercorp.com>
AuthorDate: Wed Jun 1 07:09:20 2022 +0800

    Prevent cycle sending (#5251)
---
 .../org/apache/openwhisk/common/Logging.scala      |   1 +
 .../core/scheduler/queue/QueueManager.scala        | 127 +++++++++++++++++----
 .../scheduler/queue/test/QueueManagerTests.scala   |  71 ++++++++++++
 3 files changed, 177 insertions(+), 22 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 3072ff33d..5a1fe89f0 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -598,6 +598,7 @@ object LoggingMarkers {
     LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
   def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
   def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
+  def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
   def SCHEDULER_QUEUE_UPDATE(reason: String) =
     LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
   def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index 9a45f8ad0..6dbaffb0b 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -54,6 +54,9 @@ case class UpdateMemoryQueue(oldAction: DocInfo,
 case class CreateNewQueue(activationMessage: ActivationMessage,
                           action: FullyQualifiedEntityName,
                           actionMetadata: WhiskActionMetaData)
+case class RecoverQueue(activationMessage: ActivationMessage,
+                        action: FullyQualifiedEntityName,
+                        actionMetadata: WhiskActionMetaData)
 
 case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
 
@@ -80,7 +83,7 @@ class QueueManager(
 
   private val actorSelectionMap = TrieMap[String, ActorSelection]()
 
-  private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
+  private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()
 
   private implicit val askTimeout = Timeout(5.seconds)
   private implicit val ec = context.dispatcher
@@ -90,6 +93,8 @@ class QueueManager(
   // watch leaders and register them into actorSelectionMap
   watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
 
+  private var isShuttingDown = false
+
   override def receive: Receive = {
     case request: CreateQueue if isWarmUpAction(request.fqn) =>
       logging.info(
@@ -114,12 +119,12 @@ class QueueManager(
       msg.leadership match {
         case Right(EtcdLeader(key, value, lease)) =>
           leaderElectionCallbacks.remove(key).foreach { callback =>
-            callback(Right(EtcdLeader(key, value, lease)))
+            callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
           }
 
         case Left(EtcdFollower(key, value)) =>
           leaderElectionCallbacks.remove(key).foreach { callback =>
-            callback(Left(EtcdFollower(key, value)))
+            callback(Left(EtcdFollower(key, value)), isShuttingDown)
           }
       }
 
@@ -129,7 +134,11 @@ class QueueManager(
         s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
         msg.transid)
 
-      handleActivationMessage(msg)
+      if (sender() == self) {
+        handleCycle(msg)(msg.transid)
+      } else {
+        handleActivationMessage(msg)
+      }
 
     case UpdateMemoryQueue(oldAction, newAction, msg) =>
       logging.info(
@@ -164,6 +173,24 @@ class QueueManager(
           updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
           queue ! msg
           msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
+          if (isShuttingDown) {
+            queue ! GracefulShutdown
+          }
+      }
+
+    case RecoverQueue(msg, action, actionMetaData) =>
+      QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
+        // queue is already recovered or a newer queue is created, send msg to new queue
+        case Some(key) if key.docInfo.rev >= msg.revision =>
+          QueuePool.get(key) match {
+            case Some(queue) if queue.isLeader =>
+              queue.queue ! msg.copy(revision = key.docInfo.rev)
+              logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
+            case _ =>
+              recreateQueue(action, msg, actionMetaData)
+          }
+        case _ =>
+          recreateQueue(action, msg, actionMetaData)
       }
 
     // leaderKey is now optional, it becomes None when the stale queue is removed
@@ -208,6 +235,7 @@ class QueueManager(
       }
 
     case GracefulShutdown =>
+      isShuttingDown = true
       logging.info(this, s"Gracefully shutdown the queue manager")
 
       watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
@@ -317,6 +345,47 @@ class QueueManager(
       }
   }
 
+  private def recreateQueue(action: FullyQualifiedEntityName,
+                            msg: ActivationMessage,
+                            actionMetaData: WhiskActionMetaData): Unit = {
+    logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
+    val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
+    queue ! msg
+    msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
+    if (isShuttingDown) {
+      queue ! GracefulShutdown
+    }
+  }
+
+  private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
+    logging.warn(this, s"queue for ${msg.action} doesn't exist in memory but exist in etcd, recovering...")
+    val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
+
+    logging.info(this, s"Recover a queue for ${msg.action},")
+    getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
+      .map { actionMetaData: WhiskActionMetaData =>
+        actionMetaData.toExecutableWhiskAction match {
+          case Some(_) =>
+            self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
+            transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")
+
+          case None =>
+            val message =
+              s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
+            completeErrorActivation(msg, message)
+            transid.failed(this, start, message)
+        }
+      }
+      .recover {
+        case t =>
+          transid.failed(
+            this,
+            start,
+            s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
+          completeErrorActivation(msg, t.getMessage)
+      }
+  }
+
   private def handleActivationMessage(msg: ActivationMessage): Any = {
     implicit val transid = msg.transid
 
@@ -451,24 +520,24 @@ class QueueManager(
           case None =>
             dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
             leaderElectionCallbacks.put(
-              leaderKey, {
-                case Right(EtcdLeader(_, _, _)) =>
-                  val queue = childFactory(
-                    context,
-                    request.invocationNamespace,
-                    request.fqn,
-                    request.revision,
-                    request.whiskActionMetaData)
-                  queue ! Start
-                  QueuePool.put(
-                    MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
-                    MemoryQueueValue(queue, true))
-                  updateInitRevisionMap(leaderKey, request.revision)
-                  receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
-
-                // in case of follower, do nothing
-                case Left(EtcdFollower(_, _)) =>
-                  receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+              leaderKey,
+              (electResult, isShuttingDown) => {
+                electResult match {
+                  case Right(EtcdLeader(_, _, _)) =>
+                    val queue = createAndStartQueue(
+                      request.invocationNamespace,
+                      request.fqn,
+                      request.revision,
+                      request.whiskActionMetaData)
+                    receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+                    if (isShuttingDown) {
+                      queue ! GracefulShutdown
+                    }
+
+                  // in case of follower, do nothing
+                  case Left(EtcdFollower(_, _)) =>
+                    receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+                }
               })
 
           // there is already a leader election for leaderKey, so skip it
@@ -488,6 +557,20 @@ class QueueManager(
     }
   }
 
+  private def createAndStartQueue(invocationNamespace: String,
+                                  action: FullyQualifiedEntityName,
+                                  revision: DocRevision,
+                                  actionMetaData: WhiskActionMetaData): ActorRef = {
+    val queue =
+      childFactory(context, invocationNamespace, action, revision, actionMetaData)
+    queue ! Start
+    QueuePool.put(
+      MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
+      MemoryQueueValue(queue, true))
+    updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
+    queue
+  }
+
   private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
     MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
   })
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index 6ad1513f7..f0a69a771 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -549,6 +549,65 @@ class QueueManagerTests
     probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
   }
 
+  it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
+    val mockEtcdClient = mock[EtcdClient]
+    (mockEtcdClient
+      .get(_: String))
+      .expects(*)
+      .returning(Future.successful {
+        RangeResponse
+          .newBuilder()
+          .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
+          .build()
+      })
+      .anyNumberOfTimes()
+    val dataManagementService = getTestDataManagementService()
+    val watcher = TestProbe()
+
+    val probe = TestProbe()
+
+    val childFactory =
+      (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    //current queue's revision is `1-test-revision`
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    probe.expectMsg(Start)
+
+    // simulate queue superseded, the queue will be removed but leader key won't be deleted
+    queueManager ! QueueRemoved(
+      testInvocationNamespace,
+      testFQN.toDocId.asDocInfo(testDocRevision),
+      Some(testLeaderKey))
+
+    queueManager.!(activationMessage)(queueManager)
+    val msg2 = activationMessage.copy(activationId = ActivationId.generate())
+    queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
+    probe.expectMsg(Start)
+    probe.expectMsg(activationMessage)
+    probe.expectMsg(msg2)
+  }
+
   it should "not skip outdated activation when the revision is older than the one in a datastore" in {
     stream.reset()
     val mockEtcdClient = mock[EtcdClient]
@@ -1082,6 +1141,9 @@ class QueueManagerTests
     val probe = TestProbe()
     val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
     val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
+    val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
+    val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
+    val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))
 
     // probe will watch all actors which are created by these factories
     val childFactory =
@@ -1129,5 +1191,14 @@ class QueueManagerTests
     queueManager ! GracefulShutdown
 
     probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
+
+    // after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
+    (queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
+      .mapTo[CreateQueueResponse]
+      .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
+    queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
+    queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)
+
+    probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
   }
 }