You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/08/02 01:55:05 UTC
[openwhisk] branch master updated: Revert cycle handling. (#5300)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 2683ed1e6 Revert cycle handling. (#5300)
2683ed1e6 is described below
commit 2683ed1e6f35da5d7f7c586d8f1a14ba11c7d408
Author: Dominic Kim <st...@apache.org>
AuthorDate: Tue Aug 2 10:54:58 2022 +0900
Revert cycle handling. (#5300)
* Revert cycle handling.
* Remove the RecoverQueue reference.
---
.../org/apache/openwhisk/common/Logging.scala | 1 -
.../core/scheduler/queue/QueueManager.scala | 130 ++++-----------------
.../scheduler/queue/test/QueueManagerTests.scala | 71 -----------
3 files changed, 22 insertions(+), 180 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index ff82ef5fb..541aee055 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -601,7 +601,6 @@ object LoggingMarkers {
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
- def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
def SCHEDULER_QUEUE_UPDATE(reason: String) =
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index a0a3ea188..3c11916af 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -54,9 +54,6 @@ case class UpdateMemoryQueue(oldAction: DocInfo,
case class CreateNewQueue(activationMessage: ActivationMessage,
action: FullyQualifiedEntityName,
actionMetadata: WhiskActionMetaData)
-case class RecoverQueue(activationMessage: ActivationMessage,
- action: FullyQualifiedEntityName,
- actionMetadata: WhiskActionMetaData)
case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
@@ -83,7 +80,7 @@ class QueueManager(
private val actorSelectionMap = TrieMap[String, ActorSelection]()
- private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()
+ private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
private implicit val askTimeout = Timeout(5.seconds)
private implicit val ec = context.dispatcher
@@ -93,8 +90,6 @@ class QueueManager(
// watch leaders and register them into actorSelectionMap
watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
- private var isShuttingDown = false
-
override def receive: Receive = {
case request: CreateQueue if isWarmUpAction(request.fqn) =>
logging.info(
@@ -119,12 +114,12 @@ class QueueManager(
msg.leadership match {
case Right(EtcdLeader(key, value, lease)) =>
leaderElectionCallbacks.remove(key).foreach { callback =>
- callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
+ callback(Right(EtcdLeader(key, value, lease)))
}
case Left(EtcdFollower(key, value)) =>
leaderElectionCallbacks.remove(key).foreach { callback =>
- callback(Left(EtcdFollower(key, value)), isShuttingDown)
+ callback(Left(EtcdFollower(key, value)))
}
}
@@ -134,11 +129,7 @@ class QueueManager(
s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
msg.transid)
- if (sender() == self) {
- handleCycle(msg)(msg.transid)
- } else {
- handleActivationMessage(msg)
- }
+ handleActivationMessage(msg)
case UpdateMemoryQueue(oldAction, newAction, msg) =>
logging.info(
@@ -173,25 +164,6 @@ class QueueManager(
updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
queue ! msg
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
- if (isShuttingDown) {
- queue ! GracefulShutdown
- }
- }
-
- case RecoverQueue(msg, action, actionMetaData) =>
- QueuePool.keys.find(k =>
- k.invocationNamespace == msg.user.namespace.name.asString && k.docInfo.id == action.toDocId) match {
- // queue is already recovered or a newer queue is created, send msg to new queue
- case Some(key) if key.docInfo.rev >= msg.revision =>
- QueuePool.get(key) match {
- case Some(queue) if queue.isLeader =>
- queue.queue ! msg.copy(revision = key.docInfo.rev)
- logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
- case _ =>
- recreateQueue(action, msg, actionMetaData)
- }
- case _ =>
- recreateQueue(action, msg, actionMetaData)
}
// leaderKey is now optional, it becomes None when the stale queue is removed
@@ -236,7 +208,6 @@ class QueueManager(
}
case GracefulShutdown =>
- isShuttingDown = true
logging.info(this, s"Gracefully shutdown the queue manager")
watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
@@ -346,49 +317,6 @@ class QueueManager(
}
}
- private def recreateQueue(action: FullyQualifiedEntityName,
- msg: ActivationMessage,
- actionMetaData: WhiskActionMetaData): Unit = {
- logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
- val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
- queue ! msg
- msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
- if (isShuttingDown) {
- queue ! GracefulShutdown
- }
- }
-
- private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
- logging.warn(
- this,
- s"queue for ${msg.user.namespace.name.asString}/${msg.action} doesn't exist in memory but exist in etcd, recovering...")
- val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
-
- logging.info(this, s"Recover a queue for ${msg.user.namespace.name.asString}/${msg.action},")
- getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
- .map { actionMetaData: WhiskActionMetaData =>
- actionMetaData.toExecutableWhiskAction match {
- case Some(_) =>
- self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
- transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")
-
- case None =>
- val message =
- s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
- completeErrorActivation(msg, message)
- transid.failed(this, start, message)
- }
- }
- .recover {
- case t =>
- transid.failed(
- this,
- start,
- s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
- completeErrorActivation(msg, t.getMessage)
- }
- }
-
private def handleActivationMessage(msg: ActivationMessage): Any = {
implicit val transid = msg.transid
@@ -525,24 +453,24 @@ class QueueManager(
case None =>
dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
leaderElectionCallbacks.put(
- leaderKey,
- (electResult, isShuttingDown) => {
- electResult match {
- case Right(EtcdLeader(_, _, _)) =>
- val queue = createAndStartQueue(
- request.invocationNamespace,
- request.fqn,
- request.revision,
- request.whiskActionMetaData)
- receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
- if (isShuttingDown) {
- queue ! GracefulShutdown
- }
-
- // in case of follower, do nothing
- case Left(EtcdFollower(_, _)) =>
- receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
- }
+ leaderKey, {
+ case Right(EtcdLeader(_, _, _)) =>
+ val queue = childFactory(
+ context,
+ request.invocationNamespace,
+ request.fqn,
+ request.revision,
+ request.whiskActionMetaData)
+ queue ! Start
+ QueuePool.put(
+ MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
+ MemoryQueueValue(queue, true))
+ updateInitRevisionMap(leaderKey, request.revision)
+ receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+
+ // in case of follower, do nothing
+ case Left(EtcdFollower(_, _)) =>
+ receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
})
// there is already a leader election for leaderKey, so skip it
@@ -562,20 +490,6 @@ class QueueManager(
}
}
- private def createAndStartQueue(invocationNamespace: String,
- action: FullyQualifiedEntityName,
- revision: DocRevision,
- actionMetaData: WhiskActionMetaData): ActorRef = {
- val queue =
- childFactory(context, invocationNamespace, action, revision, actionMetaData)
- queue ! Start
- QueuePool.put(
- MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
- MemoryQueueValue(queue, true))
- updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
- queue
- }
-
private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
})
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index f0a69a771..6ad1513f7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -549,65 +549,6 @@ class QueueManagerTests
probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
}
- it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
- val mockEtcdClient = mock[EtcdClient]
- (mockEtcdClient
- .get(_: String))
- .expects(*)
- .returning(Future.successful {
- RangeResponse
- .newBuilder()
- .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
- .build()
- })
- .anyNumberOfTimes()
- val dataManagementService = getTestDataManagementService()
- val watcher = TestProbe()
-
- val probe = TestProbe()
-
- val childFactory =
- (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
-
- val queueManager =
- TestActorRef(
- QueueManager
- .props(
- entityStore,
- get,
- mockEtcdClient,
- schedulerEndpoint,
- schedulerId,
- dataManagementService.ref,
- watcher.ref,
- ack,
- store,
- childFactory,
- mockConsumer))
-
- watcher.expectMsg(watchEndpoint)
- //current queue's revision is `1-test-revision`
- (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
- testInvocationNamespace,
- testFQN,
- true)
-
- probe.expectMsg(Start)
-
- // simulate queue superseded, the queue will be removed but leader key won't be deleted
- queueManager ! QueueRemoved(
- testInvocationNamespace,
- testFQN.toDocId.asDocInfo(testDocRevision),
- Some(testLeaderKey))
-
- queueManager.!(activationMessage)(queueManager)
- val msg2 = activationMessage.copy(activationId = ActivationId.generate())
- queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
- probe.expectMsg(Start)
- probe.expectMsg(activationMessage)
- probe.expectMsg(msg2)
- }
-
it should "not skip outdated activation when the revision is older than the one in a datastore" in {
stream.reset()
val mockEtcdClient = mock[EtcdClient]
@@ -1141,9 +1082,6 @@ class QueueManagerTests
val probe = TestProbe()
val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
- val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
- val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
- val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))
// probe will watch all actors which are created by these factories
val childFactory =
@@ -1191,14 +1129,5 @@ class QueueManagerTests
queueManager ! GracefulShutdown
probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
-
- // after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
- (queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
- .mapTo[CreateQueueResponse]
- .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
- queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
- queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)
-
- probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
}
}