You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/02/21 20:39:27 UTC
[incubator-openwhisk] branch master updated: Refactor activation
finisher without actor. (#3310)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 11a112b Refactor activation finisher without actor. (#3310)
11a112b is described below
commit 11a112b9c262750b795816a2fef5c24b99ba2de8
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Wed Feb 21 21:39:23 2018 +0100
Refactor activation finisher without actor. (#3310)
---
.../core/controller/actions/PrimitiveActions.scala | 209 +++++----------------
.../actions/test/ActivationFinisherTests.scala | 168 -----------------
2 files changed, 48 insertions(+), 329 deletions(-)
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index f77d9bf..b7d7737 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -17,39 +17,27 @@
package whisk.core.controller.actions
-import java.time.Clock
-import java.time.Instant
+import java.time.{Clock, Instant}
-import scala.collection.mutable.Buffer
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
-import akka.actor.Actor
-import akka.actor.ActorRef
import akka.actor.ActorSystem
-import akka.actor.Cancellable
-import akka.actor.Props
+import akka.event.Logging.InfoLevel
import spray.json._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.Scheduler
-import whisk.common.TransactionId
+import whisk.common.{Logging, LoggingMarkers, TransactionId}
import whisk.core.connector.ActivationMessage
import whisk.core.controller.WhiskServices
import whisk.core.database.NoDocumentException
-import whisk.core.entitlement._
-import whisk.core.entitlement.Resource
+import whisk.core.entitlement.{Resource, _}
import whisk.core.entity._
import whisk.core.entity.size.SizeInt
-import whisk.core.entity.types.ActivationStore
-import whisk.core.entity.types.EntityStore
+import whisk.core.entity.types.{ActivationStore, EntityStore}
import whisk.http.Messages._
import whisk.utils.ExecutionContextFactory.FutureExtensions
-import akka.event.Logging.InfoLevel
+
+import scala.collection.mutable.Buffer
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
protected[actions] trait PrimitiveActions {
/** The core collections require backend services to be injected in this trait. */
@@ -569,158 +557,57 @@ protected[actions] trait PrimitiveActions {
totalWaitTime: FiniteDuration,
activeAckResponse: Future[Either[ActivationId, WhiskActivation]])(
implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
- // this is the promise which active ack or db polling will try to complete via:
- // 1. active ack response, or
- // 2. failing active ack (due to active ack timeout), fall over to db polling
- // 3. timeout on db polling => converts activation to non-blocking (returns activation id only)
- // 4. internal error message
- val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId))
- val (promise, finisher) = ActivationFinisher.props({ () =>
- WhiskActivation.get(activationStore, docid)
- })
+ val result = Promise[Either[ActivationId, WhiskActivation]]
+ val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId))
logging.debug(this, s"action activation will block for result upto $totalWaitTime")
- activeAckResponse map {
- case result @ Right(_) =>
- // activation complete, result is available
- finisher ! ActivationFinisher.Finish(result)
-
- case _ =>
- // active ack received but it does not carry the response,
- // no result available except by polling the db
- logging.warn(this, "preemptively polling db because active ack is missing result")
- finisher ! Scheduler.WorkOnceNow
+ // 1. Wait for the active-ack to happen. Either immediately resolve the promise or poll the database quickly
+ // in case of an incomplete active-ack (record too large for example).
+ activeAckResponse.foreach {
+ case Right(activation) => result.trySuccess(Right(activation))
+ case _ => pollActivation(docid, result, i => 1.seconds + (2.seconds * i), maxRetries = 4)
}
- // return the promise which is either fulfilled by active ack, polling from the database,
- // or the timeout alternative when the allowed duration expires (i.e., the action took
- // longer than the permitted, per totalWaitTime).
- promise.withAlternativeAfterTimeout(
- totalWaitTime, {
- Future.successful(Left(activationId)).andThen {
- // result no longer interesting; terminate the finisher/shut down db polling if necessary
- case _ => actorSystem.stop(finisher)
- }
- })
- }
-
- /** Max atomic action count allowed for sequences */
- private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
-}
-
-/** Companion to the ActivationFinisher. */
-protected[actions] object ActivationFinisher {
- case class Finish(activation: Right[ActivationId, WhiskActivation])
-
- private type ActivationLookup = () => Future[WhiskActivation]
+ // 2. Poll the database slowly in case the active-ack never arrives
+ pollActivation(docid, result, _ => 15.seconds)
- /** Periodically polls the db to cover missing active acks. */
- private val datastorePollPeriodForActivation = 15.seconds
+ // 3. Timeout forces a fallback to activationId
+ val timeout = actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId)))
- /**
- * In case of a partial active ack where it is know an activation completed
- * but the result could not be sent over the bus, use this periodicity to poll
- * for a result.
- */
- private val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 7.seconds)
-
- def props(activationLookup: ActivationLookup)(
- implicit transid: TransactionId,
- actorSystem: ActorSystem,
- executionContext: ExecutionContext,
- logging: Logging): (Future[Either[ActivationId, WhiskActivation]], ActorRef) = {
-
- val (p, _, f) = props(activationLookup, datastorePollPeriodForActivation, datastorePreemptivePolling)
- (p.future, f) // hides the polling actor
- }
-
- /**
- * Creates the finishing actor.
- * This is factored for testing.
- */
- protected[actions] def props(activationLookup: ActivationLookup,
- slowPoll: FiniteDuration,
- fastPolls: Seq[FiniteDuration])(
- implicit transid: TransactionId,
- actorSystem: ActorSystem,
- executionContext: ExecutionContext,
- logging: Logging): (Promise[Either[ActivationId, WhiskActivation]], ActorRef, ActorRef) = {
-
- // this is strictly completed by the finishing actor
- val promise = Promise[Either[ActivationId, WhiskActivation]]
- val dbpoller = poller(slowPoll, promise, activationLookup)
- val finisher = Props(new ActivationFinisher(dbpoller, fastPolls, promise))
-
- (promise, dbpoller, actorSystem.actorOf(finisher))
+ result.future.andThen {
+ case _ => timeout.cancel()
+ }
}
/**
- * An actor to complete a blocking activation request. It encapsulates a promise
- * to be completed when the result is ready. This may happen in one of two ways.
- * An active ack message is relayed to this actor to complete the promise when
- * the active ack is received. Or in case of a partial/missing active ack, an
- * explicitly scheduled datastore poll of the activation record, if found, will
- * complete the transaction. When the promise is fulfilled, the actor self destructs.
+ * Polls the database for an activation.
+ *
+ * Does not use Future composition because an early exit is wanted, once any possible external source resolved the
+ * Promise.
+ *
+ * @param docid the docid to poll for
+ * @param result promise to resolve on result. Is also used to abort polling once completed.
*/
- private class ActivationFinisher(poller: ActorRef, // the activation poller
- fastPollPeriods: Seq[FiniteDuration],
- promise: Promise[Either[ActivationId, WhiskActivation]])(
- implicit transid: TransactionId,
- actorSystem: ActorSystem,
- executionContext: ExecutionContext,
- logging: Logging)
- extends Actor {
-
- // when the future completes, self-destruct
- promise.future.andThen { case _ => shutdown() }
-
- var preemptiveMsgs = Vector.empty[Cancellable]
-
- def receive = {
- case ActivationFinisher.Finish(activation) =>
- promise.trySuccess(activation)
-
- case msg @ Scheduler.WorkOnceNow =>
- // try up to three times when pre-emptying the schedule
- fastPollPeriods.foreach { s =>
- preemptiveMsgs = preemptiveMsgs :+ context.system.scheduler.scheduleOnce(s, poller, msg)
+ private def pollActivation(docid: DocId,
+ result: Promise[Either[ActivationId, WhiskActivation]],
+ wait: Int => FiniteDuration,
+ retries: Int = 0,
+ maxRetries: Int = Int.MaxValue)(implicit transid: TransactionId): Unit = {
+ if (!result.isCompleted && retries < maxRetries) {
+ val schedule = actorSystem.scheduler.scheduleOnce(wait(retries)) {
+ WhiskActivation.get(activationStore, docid).onComplete {
+ case Success(activation) => result.trySuccess(Right(activation))
+ case Failure(_: NoDocumentException) => pollActivation(docid, result, wait, retries + 1, maxRetries)
+ case Failure(t: Throwable) => result.tryFailure(t)
}
- }
-
- def shutdown(): Unit = Option(context).foreach(_.stop(self))
+ }
- override def postStop() = {
- logging.debug(this, "finisher shutdown")
- preemptiveMsgs.foreach(_.cancel())
- preemptiveMsgs = Vector.empty
- context.stop(poller)
+ // Halt the schedule if the result is provided during one execution
+ result.future.onComplete(_ => schedule.cancel())
}
}
- /**
- * This creates the inner datastore poller for the completed activation.
- * It is a factory method to facilitate testing.
- */
- private def poller(slowPollPeriod: FiniteDuration,
- promise: Promise[Either[ActivationId, WhiskActivation]],
- activationLookup: ActivationLookup)(implicit transid: TransactionId,
- actorSystem: ActorSystem,
- executionContext: ExecutionContext,
- logging: Logging): ActorRef = {
- Scheduler.scheduleWaitAtMost(slowPollPeriod, initialDelay = slowPollPeriod, name = "dbpoll")(() => {
- if (!promise.isCompleted) {
- activationLookup() map {
- // complete the future, which in turn will poison pill this scheduler
- activation =>
- promise.trySuccess(Right(activation.withoutLogs)) // logs excluded on blocking calls
- } andThen {
- case Failure(e: NoDocumentException) => // do nothing, scheduler will reschedule another poll
- case Failure(t: Throwable) => // something went wrong, abort
- logging.error(this, s"failed while waiting on result: ${t.getMessage}")
- promise.tryFailure(t) // complete the future, which in turn will poison pill this scheduler
- }
- } else Future.successful({}) // the scheduler will be halted because the promise is now resolved
- })
- }
+ /** Max atomic action count allowed for sequences */
+ private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
}
diff --git a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala b/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
deleted file mode 100644
index 25f48ae..0000000
--- a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.controller.actions.test
-
-import java.time.Instant
-
-import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-
-import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-
-import common.StreamLogging
-import common.WskActorSystem
-import spray.json._
-import whisk.common.TransactionId
-import whisk.core.controller.actions.ActivationFinisher
-import whisk.core.entity._
-import whisk.core.entity.ActivationResponse
-import whisk.core.entity.size.SizeInt
-import whisk.core.database.NoDocumentException
-import akka.testkit.TestProbe
-import whisk.common.Scheduler
-import akka.actor.PoisonPill
-
-@RunWith(classOf[JUnitRunner])
-class ActivationFinisherTests
- extends FlatSpec
- with BeforeAndAfterEach
- with Matchers
- with WskActorSystem
- with StreamLogging {
-
- implicit val tid = TransactionId.testing
-
- val activation = WhiskActivation(
- namespace = EntityPath("ns"),
- name = EntityName("a"),
- Subject(),
- activationId = ActivationId(),
- start = Instant.now(),
- end = Instant.now(),
- response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(2)))),
- annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
- duration = Some(123))
-
- var activationLookupCounter = 0
- @volatile var activationResult: Option[Throwable] = None
-
- def activationLookup(): Future[WhiskActivation] = {
- activationLookupCounter += 1
- activationResult.map(Future.failed(_)).getOrElse(Future.successful(activation))
- }
-
- override def beforeEach() = {
- activationLookupCounter = 0
- activationResult = None
- }
-
- behavior of "activation finisher"
-
- val slowPoll = 200.milliseconds
- val fastPoll = Seq()
-
- it should "poll until promise is completed" in {
- activationResult = Some(NoDocumentException(""))
- val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
- val testProbePoller = TestProbe()
- val testProbeFinisher = TestProbe()
- testProbePoller.watch(poller)
- testProbeFinisher.watch(finisher)
-
- val slowPollWorkWindow = (slowPoll * 3) + (slowPoll / 2)
- Thread.sleep(slowPollWorkWindow.toMillis)
- activationLookupCounter should (be >= 2 and be <= 3)
-
- // should terminate the parent finisher and child poller on completion
- promise.trySuccess(Right(activation))
-
- testProbePoller.expectTerminated(poller, 1.second)
- testProbeFinisher.expectTerminated(finisher, 1.second)
- }
-
- it should "complete promise from poller" in {
- val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
- val testProbePoller = TestProbe()
- val testProbeFinisher = TestProbe()
- testProbePoller.watch(poller)
- testProbeFinisher.watch(finisher)
-
- val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
- Thread.sleep(slowPollWorkWindow.toMillis)
- activationLookupCounter should be(1)
-
- testProbePoller.expectTerminated(poller, 1.second)
- testProbeFinisher.expectTerminated(finisher, 1.second)
-
- promise shouldBe 'completed
- }
-
- it should "finish when receiving corresponding message" in {
- activationResult = Some(NoDocumentException(""))
- val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
- val testProbePoller = TestProbe()
- val testProbeFinisher = TestProbe()
- testProbePoller.watch(poller)
- testProbeFinisher.watch(finisher)
-
- val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
- Thread.sleep(slowPollWorkWindow.toMillis)
- activationLookupCounter should (be >= 1 and be <= 2)
-
- // should terminate the parent finisher and child poller once message is received
- finisher ! ActivationFinisher.Finish(Right(activation))
-
- testProbePoller.expectTerminated(poller, 1.second)
- testProbeFinisher.expectTerminated(finisher, 1.second)
-
- promise shouldBe 'completed
- }
-
- it should "poll pre-emptively" in {
- activationResult = Some(NoDocumentException(""))
- val slowPoll = 600.milliseconds
- val fastPoll = Seq(100.milliseconds, 200.milliseconds)
- val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
- val testProbePoller = TestProbe()
- val testProbeFinisher = TestProbe()
- testProbePoller.watch(poller)
- testProbeFinisher.watch(finisher)
-
- Thread.sleep(500.milliseconds.toMillis)
- activationLookupCounter should be(0)
-
- // should cause polls
- finisher ! Scheduler.WorkOnceNow
- Thread.sleep(500.milliseconds.toMillis)
- activationLookupCounter should be(3)
-
- finisher ! PoisonPill
-
- testProbePoller.expectTerminated(poller, 1.second)
- testProbeFinisher.expectTerminated(finisher, 1.second)
- }
-
-}
--
To stop receiving notification emails like this one, please contact
rabbah@apache.org.