You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/05/31 23:09:25 UTC
[openwhisk] branch master updated: Prevent cycle sending (#5251)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new a75950a54 Prevent cycle sending (#5251)
a75950a54 is described below
commit a75950a543b83ffbd7753f128855556198ee155d
Author: jiangpengcheng <ji...@navercorp.com>
AuthorDate: Wed Jun 1 07:09:20 2022 +0800
Prevent cycle sending (#5251)
---
.../org/apache/openwhisk/common/Logging.scala | 1 +
.../core/scheduler/queue/QueueManager.scala | 127 +++++++++++++++++----
.../scheduler/queue/test/QueueManagerTests.scala | 71 ++++++++++++
3 files changed, 177 insertions(+), 22 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 3072ff33d..5a1fe89f0 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -598,6 +598,7 @@ object LoggingMarkers {
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
+ def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
def SCHEDULER_QUEUE_UPDATE(reason: String) =
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index 9a45f8ad0..6dbaffb0b 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -54,6 +54,9 @@ case class UpdateMemoryQueue(oldAction: DocInfo,
case class CreateNewQueue(activationMessage: ActivationMessage,
action: FullyQualifiedEntityName,
actionMetadata: WhiskActionMetaData)
+case class RecoverQueue(activationMessage: ActivationMessage,
+ action: FullyQualifiedEntityName,
+ actionMetadata: WhiskActionMetaData)
case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
@@ -80,7 +83,7 @@ class QueueManager(
private val actorSelectionMap = TrieMap[String, ActorSelection]()
- private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
+ private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()
private implicit val askTimeout = Timeout(5.seconds)
private implicit val ec = context.dispatcher
@@ -90,6 +93,8 @@ class QueueManager(
// watch leaders and register them into actorSelectionMap
watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
+ private var isShuttingDown = false
+
override def receive: Receive = {
case request: CreateQueue if isWarmUpAction(request.fqn) =>
logging.info(
@@ -114,12 +119,12 @@ class QueueManager(
msg.leadership match {
case Right(EtcdLeader(key, value, lease)) =>
leaderElectionCallbacks.remove(key).foreach { callback =>
- callback(Right(EtcdLeader(key, value, lease)))
+ callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
}
case Left(EtcdFollower(key, value)) =>
leaderElectionCallbacks.remove(key).foreach { callback =>
- callback(Left(EtcdFollower(key, value)))
+ callback(Left(EtcdFollower(key, value)), isShuttingDown)
}
}
@@ -129,7 +134,11 @@ class QueueManager(
s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
msg.transid)
- handleActivationMessage(msg)
+ if (sender() == self) {
+ handleCycle(msg)(msg.transid)
+ } else {
+ handleActivationMessage(msg)
+ }
case UpdateMemoryQueue(oldAction, newAction, msg) =>
logging.info(
@@ -164,6 +173,24 @@ class QueueManager(
updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
queue ! msg
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
+ if (isShuttingDown) {
+ queue ! GracefulShutdown
+ }
+ }
+
+ case RecoverQueue(msg, action, actionMetaData) =>
+ QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
+ // queue is already recovered or a newer queue is created, send msg to new queue
+ case Some(key) if key.docInfo.rev >= msg.revision =>
+ QueuePool.get(key) match {
+ case Some(queue) if queue.isLeader =>
+ queue.queue ! msg.copy(revision = key.docInfo.rev)
+ logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
+ case _ =>
+ recreateQueue(action, msg, actionMetaData)
+ }
+ case _ =>
+ recreateQueue(action, msg, actionMetaData)
}
// leaderKey is now optional, it becomes None when the stale queue is removed
@@ -208,6 +235,7 @@ class QueueManager(
}
case GracefulShutdown =>
+ isShuttingDown = true
logging.info(this, s"Gracefully shutdown the queue manager")
watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
@@ -317,6 +345,47 @@ class QueueManager(
}
}
+ private def recreateQueue(action: FullyQualifiedEntityName,
+ msg: ActivationMessage,
+ actionMetaData: WhiskActionMetaData): Unit = {
+ logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
+ val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
+ queue ! msg
+ msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
+ if (isShuttingDown) {
+ queue ! GracefulShutdown
+ }
+ }
+
+ private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
+ logging.warn(this, s"queue for ${msg.action} doesn't exist in memory but exist in etcd, recovering...")
+ val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
+
+ logging.info(this, s"Recover a queue for ${msg.action},")
+ getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
+ .map { actionMetaData: WhiskActionMetaData =>
+ actionMetaData.toExecutableWhiskAction match {
+ case Some(_) =>
+ self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
+ transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")
+
+ case None =>
+ val message =
+ s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
+ completeErrorActivation(msg, message)
+ transid.failed(this, start, message)
+ }
+ }
+ .recover {
+ case t =>
+ transid.failed(
+ this,
+ start,
+ s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
+ completeErrorActivation(msg, t.getMessage)
+ }
+ }
+
private def handleActivationMessage(msg: ActivationMessage): Any = {
implicit val transid = msg.transid
@@ -451,24 +520,24 @@ class QueueManager(
case None =>
dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
leaderElectionCallbacks.put(
- leaderKey, {
- case Right(EtcdLeader(_, _, _)) =>
- val queue = childFactory(
- context,
- request.invocationNamespace,
- request.fqn,
- request.revision,
- request.whiskActionMetaData)
- queue ! Start
- QueuePool.put(
- MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
- MemoryQueueValue(queue, true))
- updateInitRevisionMap(leaderKey, request.revision)
- receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
-
- // in case of follower, do nothing
- case Left(EtcdFollower(_, _)) =>
- receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+ leaderKey,
+ (electResult, isShuttingDown) => {
+ electResult match {
+ case Right(EtcdLeader(_, _, _)) =>
+ val queue = createAndStartQueue(
+ request.invocationNamespace,
+ request.fqn,
+ request.revision,
+ request.whiskActionMetaData)
+ receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+ if (isShuttingDown) {
+ queue ! GracefulShutdown
+ }
+
+ // in case of follower, do nothing
+ case Left(EtcdFollower(_, _)) =>
+ receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+ }
})
// there is already a leader election for leaderKey, so skip it
@@ -488,6 +557,20 @@ class QueueManager(
}
}
+ private def createAndStartQueue(invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ actionMetaData: WhiskActionMetaData): ActorRef = {
+ val queue =
+ childFactory(context, invocationNamespace, action, revision, actionMetaData)
+ queue ! Start
+ QueuePool.put(
+ MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
+ MemoryQueueValue(queue, true))
+ updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
+ queue
+ }
+
private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
})
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index 6ad1513f7..f0a69a771 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -549,6 +549,65 @@ class QueueManagerTests
probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
}
+ it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
+ val mockEtcdClient = mock[EtcdClient]
+ (mockEtcdClient
+ .get(_: String))
+ .expects(*)
+ .returning(Future.successful {
+ RangeResponse
+ .newBuilder()
+ .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
+ .build()
+ })
+ .anyNumberOfTimes()
+ val dataManagementService = getTestDataManagementService()
+ val watcher = TestProbe()
+
+ val probe = TestProbe()
+
+ val childFactory =
+ (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
+
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ //current queue's revision is `1-test-revision`
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ probe.expectMsg(Start)
+
+ // simulate queue superseded, the queue will be removed but leader key won't be deleted
+ queueManager ! QueueRemoved(
+ testInvocationNamespace,
+ testFQN.toDocId.asDocInfo(testDocRevision),
+ Some(testLeaderKey))
+
+ queueManager.!(activationMessage)(queueManager)
+ val msg2 = activationMessage.copy(activationId = ActivationId.generate())
+ queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
+ probe.expectMsg(Start)
+ probe.expectMsg(activationMessage)
+ probe.expectMsg(msg2)
+ }
+
it should "not skip outdated activation when the revision is older than the one in a datastore" in {
stream.reset()
val mockEtcdClient = mock[EtcdClient]
@@ -1082,6 +1141,9 @@ class QueueManagerTests
val probe = TestProbe()
val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
+ val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
+ val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
+ val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))
// probe will watch all actors which are created by these factories
val childFactory =
@@ -1129,5 +1191,14 @@ class QueueManagerTests
queueManager ! GracefulShutdown
probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
+
+ // after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
+ (queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
+ .mapTo[CreateQueueResponse]
+ .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
+ queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
+ queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)
+
+ probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
}
}