You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/08/02 01:55:05 UTC

[openwhisk] branch master updated: Revert cycle handling. (#5300)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 2683ed1e6 Revert cycle handling. (#5300)
2683ed1e6 is described below

commit 2683ed1e6f35da5d7f7c586d8f1a14ba11c7d408
Author: Dominic Kim <st...@apache.org>
AuthorDate: Tue Aug 2 10:54:58 2022 +0900

    Revert cycle handling. (#5300)
    
    * Revert cycle handling.
    
    * Remove the RecoverQueue reference.
---
 .../org/apache/openwhisk/common/Logging.scala      |   1 -
 .../core/scheduler/queue/QueueManager.scala        | 130 ++++-----------------
 .../scheduler/queue/test/QueueManagerTests.scala   |  71 -----------
 3 files changed, 22 insertions(+), 180 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index ff82ef5fb..541aee055 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -601,7 +601,6 @@ object LoggingMarkers {
     LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
   def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
   def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
-  def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
   def SCHEDULER_QUEUE_UPDATE(reason: String) =
     LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
   def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index a0a3ea188..3c11916af 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -54,9 +54,6 @@ case class UpdateMemoryQueue(oldAction: DocInfo,
 case class CreateNewQueue(activationMessage: ActivationMessage,
                           action: FullyQualifiedEntityName,
                           actionMetadata: WhiskActionMetaData)
-case class RecoverQueue(activationMessage: ActivationMessage,
-                        action: FullyQualifiedEntityName,
-                        actionMetadata: WhiskActionMetaData)
 
 case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
 
@@ -83,7 +80,7 @@ class QueueManager(
 
   private val actorSelectionMap = TrieMap[String, ActorSelection]()
 
-  private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()
+  private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
 
   private implicit val askTimeout = Timeout(5.seconds)
   private implicit val ec = context.dispatcher
@@ -93,8 +90,6 @@ class QueueManager(
   // watch leaders and register them into actorSelectionMap
   watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
 
-  private var isShuttingDown = false
-
   override def receive: Receive = {
     case request: CreateQueue if isWarmUpAction(request.fqn) =>
       logging.info(
@@ -119,12 +114,12 @@ class QueueManager(
       msg.leadership match {
         case Right(EtcdLeader(key, value, lease)) =>
           leaderElectionCallbacks.remove(key).foreach { callback =>
-            callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
+            callback(Right(EtcdLeader(key, value, lease)))
           }
 
         case Left(EtcdFollower(key, value)) =>
           leaderElectionCallbacks.remove(key).foreach { callback =>
-            callback(Left(EtcdFollower(key, value)), isShuttingDown)
+            callback(Left(EtcdFollower(key, value)))
           }
       }
 
@@ -134,11 +129,7 @@ class QueueManager(
         s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
         msg.transid)
 
-      if (sender() == self) {
-        handleCycle(msg)(msg.transid)
-      } else {
-        handleActivationMessage(msg)
-      }
+      handleActivationMessage(msg)
 
     case UpdateMemoryQueue(oldAction, newAction, msg) =>
       logging.info(
@@ -173,25 +164,6 @@ class QueueManager(
           updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
           queue ! msg
           msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
-          if (isShuttingDown) {
-            queue ! GracefulShutdown
-          }
-      }
-
-    case RecoverQueue(msg, action, actionMetaData) =>
-      QueuePool.keys.find(k =>
-        k.invocationNamespace == msg.user.namespace.name.asString && k.docInfo.id == action.toDocId) match {
-        // queue is already recovered or a newer queue is created, send msg to new queue
-        case Some(key) if key.docInfo.rev >= msg.revision =>
-          QueuePool.get(key) match {
-            case Some(queue) if queue.isLeader =>
-              queue.queue ! msg.copy(revision = key.docInfo.rev)
-              logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
-            case _ =>
-              recreateQueue(action, msg, actionMetaData)
-          }
-        case _ =>
-          recreateQueue(action, msg, actionMetaData)
       }
 
     // leaderKey is now optional, it becomes None when the stale queue is removed
@@ -236,7 +208,6 @@ class QueueManager(
       }
 
     case GracefulShutdown =>
-      isShuttingDown = true
       logging.info(this, s"Gracefully shutdown the queue manager")
 
       watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
@@ -346,49 +317,6 @@ class QueueManager(
       }
   }
 
-  private def recreateQueue(action: FullyQualifiedEntityName,
-                            msg: ActivationMessage,
-                            actionMetaData: WhiskActionMetaData): Unit = {
-    logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
-    val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
-    queue ! msg
-    msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
-    if (isShuttingDown) {
-      queue ! GracefulShutdown
-    }
-  }
-
-  private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
-    logging.warn(
-      this,
-      s"queue for ${msg.user.namespace.name.asString}/${msg.action} doesn't exist in memory but exist in etcd, recovering...")
-    val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
-
-    logging.info(this, s"Recover a queue for ${msg.user.namespace.name.asString}/${msg.action},")
-    getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
-      .map { actionMetaData: WhiskActionMetaData =>
-        actionMetaData.toExecutableWhiskAction match {
-          case Some(_) =>
-            self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
-            transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")
-
-          case None =>
-            val message =
-              s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
-            completeErrorActivation(msg, message)
-            transid.failed(this, start, message)
-        }
-      }
-      .recover {
-        case t =>
-          transid.failed(
-            this,
-            start,
-            s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
-          completeErrorActivation(msg, t.getMessage)
-      }
-  }
-
   private def handleActivationMessage(msg: ActivationMessage): Any = {
     implicit val transid = msg.transid
 
@@ -525,24 +453,24 @@ class QueueManager(
           case None =>
             dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
             leaderElectionCallbacks.put(
-              leaderKey,
-              (electResult, isShuttingDown) => {
-                electResult match {
-                  case Right(EtcdLeader(_, _, _)) =>
-                    val queue = createAndStartQueue(
-                      request.invocationNamespace,
-                      request.fqn,
-                      request.revision,
-                      request.whiskActionMetaData)
-                    receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
-                    if (isShuttingDown) {
-                      queue ! GracefulShutdown
-                    }
-
-                  // in case of follower, do nothing
-                  case Left(EtcdFollower(_, _)) =>
-                    receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
-                }
+              leaderKey, {
+                case Right(EtcdLeader(_, _, _)) =>
+                  val queue = childFactory(
+                    context,
+                    request.invocationNamespace,
+                    request.fqn,
+                    request.revision,
+                    request.whiskActionMetaData)
+                  queue ! Start
+                  QueuePool.put(
+                    MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
+                    MemoryQueueValue(queue, true))
+                  updateInitRevisionMap(leaderKey, request.revision)
+                  receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+
+                // in case of follower, do nothing
+                case Left(EtcdFollower(_, _)) =>
+                  receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
               })
 
           // there is already a leader election for leaderKey, so skip it
@@ -562,20 +490,6 @@ class QueueManager(
     }
   }
 
-  private def createAndStartQueue(invocationNamespace: String,
-                                  action: FullyQualifiedEntityName,
-                                  revision: DocRevision,
-                                  actionMetaData: WhiskActionMetaData): ActorRef = {
-    val queue =
-      childFactory(context, invocationNamespace, action, revision, actionMetaData)
-    queue ! Start
-    QueuePool.put(
-      MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
-      MemoryQueueValue(queue, true))
-    updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
-    queue
-  }
-
   private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
     MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
   })
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index f0a69a771..6ad1513f7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -549,65 +549,6 @@ class QueueManagerTests
     probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
   }
 
-  it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
-    val mockEtcdClient = mock[EtcdClient]
-    (mockEtcdClient
-      .get(_: String))
-      .expects(*)
-      .returning(Future.successful {
-        RangeResponse
-          .newBuilder()
-          .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
-          .build()
-      })
-      .anyNumberOfTimes()
-    val dataManagementService = getTestDataManagementService()
-    val watcher = TestProbe()
-
-    val probe = TestProbe()
-
-    val childFactory =
-      (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
-
-    val queueManager =
-      TestActorRef(
-        QueueManager
-          .props(
-            entityStore,
-            get,
-            mockEtcdClient,
-            schedulerEndpoint,
-            schedulerId,
-            dataManagementService.ref,
-            watcher.ref,
-            ack,
-            store,
-            childFactory,
-            mockConsumer))
-
-    watcher.expectMsg(watchEndpoint)
-    //current queue's revision is `1-test-revision`
-    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
-      testInvocationNamespace,
-      testFQN,
-      true)
-
-    probe.expectMsg(Start)
-
-    // simulate queue superseded, the queue will be removed but leader key won't be deleted
-    queueManager ! QueueRemoved(
-      testInvocationNamespace,
-      testFQN.toDocId.asDocInfo(testDocRevision),
-      Some(testLeaderKey))
-
-    queueManager.!(activationMessage)(queueManager)
-    val msg2 = activationMessage.copy(activationId = ActivationId.generate())
-    queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
-    probe.expectMsg(Start)
-    probe.expectMsg(activationMessage)
-    probe.expectMsg(msg2)
-  }
-
   it should "not skip outdated activation when the revision is older than the one in a datastore" in {
     stream.reset()
     val mockEtcdClient = mock[EtcdClient]
@@ -1141,9 +1082,6 @@ class QueueManagerTests
     val probe = TestProbe()
     val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
     val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
-    val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
-    val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
-    val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))
 
     // probe will watch all actors which are created by these factories
     val childFactory =
@@ -1191,14 +1129,5 @@ class QueueManagerTests
     queueManager ! GracefulShutdown
 
     probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
-
-    // after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
-    (queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
-      .mapTo[CreateQueueResponse]
-      .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
-    queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
-    queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)
-
-    probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
   }
 }