You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by GitBox <> on 2018/02/21 20:39:28 UTC

[GitHub] rabbah closed pull request #3310: Refactor activation finisher without actor.

rabbah closed pull request #3310: Refactor activation finisher without actor.

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index f77d9bfec1..b7d7737c3b 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -17,39 +17,27 @@
 package whisk.core.controller.actions
-import java.time.Clock
-import java.time.Instant
+import java.time.{Clock, Instant}
-import scala.collection.mutable.Buffer
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
+import akka.event.Logging.InfoLevel
 import spray.json._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.Scheduler
-import whisk.common.TransactionId
+import whisk.common.{Logging, LoggingMarkers, TransactionId}
 import whisk.core.connector.ActivationMessage
 import whisk.core.controller.WhiskServices
 import whisk.core.database.NoDocumentException
-import whisk.core.entitlement._
-import whisk.core.entitlement.Resource
+import whisk.core.entitlement.{Resource, _}
 import whisk.core.entity._
 import whisk.core.entity.size.SizeInt
-import whisk.core.entity.types.ActivationStore
-import whisk.core.entity.types.EntityStore
+import whisk.core.entity.types.{ActivationStore, EntityStore}
 import whisk.http.Messages._
 import whisk.utils.ExecutionContextFactory.FutureExtensions
-import akka.event.Logging.InfoLevel
+import scala.collection.mutable.Buffer
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
 protected[actions] trait PrimitiveActions {
   /** The core collections require backend services to be injected in this trait. */
@@ -569,158 +557,57 @@ protected[actions] trait PrimitiveActions {
                                         totalWaitTime: FiniteDuration,
                                         activeAckResponse: Future[Either[ActivationId, WhiskActivation]])(
     implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
-    // this is the promise which active ack or db polling will try to complete via:
-    // 1. active ack response, or
-    // 2. failing active ack (due to active ack timeout), fall over to db polling
-    // 3. timeout on db polling => converts activation to non-blocking (returns activation id only)
-    // 4. internal error message
-    val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId))
-    val (promise, finisher) = ActivationFinisher.props({ () =>
-      WhiskActivation.get(activationStore, docid)
-    })
+    val result = Promise[Either[ActivationId, WhiskActivation]]
+    val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId))
     logging.debug(this, s"action activation will block for result upto $totalWaitTime")
-    activeAckResponse map {
-      case result @ Right(_) =>
-        // activation complete, result is available
-        finisher ! ActivationFinisher.Finish(result)
-      case _ =>
-        // active ack received but it does not carry the response,
-        // no result available except by polling the db
-        logging.warn(this, "preemptively polling db because active ack is missing result")
-        finisher ! Scheduler.WorkOnceNow
+    // 1. Wait for the active-ack to happen. Either immediately resolve the promise or poll the database quickly
+    //    in case of an incomplete active-ack (record too large for example).
+    activeAckResponse.foreach {
+      case Right(activation) => result.trySuccess(Right(activation))
+      case _                 => pollActivation(docid, result, i => 1.seconds + (2.seconds * i), maxRetries = 4)
-    // return the promise which is either fulfilled by active ack, polling from the database,
-    // or the timeout alternative when the allowed duration expires (i.e., the action took
-    // longer than the permitted, per totalWaitTime).
-    promise.withAlternativeAfterTimeout(
-      totalWaitTime, {
-        Future.successful(Left(activationId)).andThen {
-          // result no longer interesting; terminate the finisher/shut down db polling if necessary
-          case _ => actorSystem.stop(finisher)
-        }
-      })
-  }
-  /** Max atomic action count allowed for sequences */
-  private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
-/** Companion to the ActivationFinisher. */
-protected[actions] object ActivationFinisher {
-  case class Finish(activation: Right[ActivationId, WhiskActivation])
-  private type ActivationLookup = () => Future[WhiskActivation]
+    // 2. Poll the database slowly in case the active-ack never arrives
+    pollActivation(docid, result, _ => 15.seconds)
-  /** Periodically polls the db to cover missing active acks. */
-  private val datastorePollPeriodForActivation = 15.seconds
+    // 3. Timeout forces a fallback to activationId
+    val timeout = actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId)))
-  /**
-   * In case of a partial active ack where it is know an activation completed
-   * but the result could not be sent over the bus, use this periodicity to poll
-   * for a result.
-   */
-  private val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 7.seconds)
-  def props(activationLookup: ActivationLookup)(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging): (Future[Either[ActivationId, WhiskActivation]], ActorRef) = {
-    val (p, _, f) = props(activationLookup, datastorePollPeriodForActivation, datastorePreemptivePolling)
-    (p.future, f) // hides the polling actor
-  }
-  /**
-   * Creates the finishing actor.
-   * This is factored for testing.
-   */
-  protected[actions] def props(activationLookup: ActivationLookup,
-                               slowPoll: FiniteDuration,
-                               fastPolls: Seq[FiniteDuration])(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging): (Promise[Either[ActivationId, WhiskActivation]], ActorRef, ActorRef) = {
-    // this is strictly completed by the finishing actor
-    val promise = Promise[Either[ActivationId, WhiskActivation]]
-    val dbpoller = poller(slowPoll, promise, activationLookup)
-    val finisher = Props(new ActivationFinisher(dbpoller, fastPolls, promise))
-    (promise, dbpoller, actorSystem.actorOf(finisher))
+    result.future.andThen {
+      case _ => timeout.cancel()
+    }
-   * An actor to complete a blocking activation request. It encapsulates a promise
-   * to be completed when the result is ready. This may happen in one of two ways.
-   * An active ack message is relayed to this actor to complete the promise when
-   * the active ack is received. Or in case of a partial/missing active ack, an
-   * explicitly scheduled datastore poll of the activation record, if found, will
-   * complete the transaction. When the promise is fulfilled, the actor self destructs.
+   * Polls the database for an activation.
+   *
+   * Does not use Future composition because an early exit is wanted, once any possible external source resolved the
+   * Promise.
+   *
+   * @param docid the docid to poll for
+   * @param result promise to resolve on result. Is also used to abort polling once completed.
-  private class ActivationFinisher(poller: ActorRef, // the activation poller
-                                   fastPollPeriods: Seq[FiniteDuration],
-                                   promise: Promise[Either[ActivationId, WhiskActivation]])(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging)
-      extends Actor {
-    // when the future completes, self-destruct
-    promise.future.andThen { case _ => shutdown() }
-    var preemptiveMsgs = Vector.empty[Cancellable]
-    def receive = {
-      case ActivationFinisher.Finish(activation) =>
-        promise.trySuccess(activation)
-      case msg @ Scheduler.WorkOnceNow =>
-        // try up to three times when pre-emptying the schedule
-        fastPollPeriods.foreach { s =>
-          preemptiveMsgs = preemptiveMsgs :+ context.system.scheduler.scheduleOnce(s, poller, msg)
+  private def pollActivation(docid: DocId,
+                             result: Promise[Either[ActivationId, WhiskActivation]],
+                             wait: Int => FiniteDuration,
+                             retries: Int = 0,
+                             maxRetries: Int = Int.MaxValue)(implicit transid: TransactionId): Unit = {
+    if (!result.isCompleted && retries < maxRetries) {
+      val schedule = actorSystem.scheduler.scheduleOnce(wait(retries)) {
+        WhiskActivation.get(activationStore, docid).onComplete {
+          case Success(activation)             => result.trySuccess(Right(activation))
+          case Failure(_: NoDocumentException) => pollActivation(docid, result, wait, retries + 1, maxRetries)
+          case Failure(t: Throwable)           => result.tryFailure(t)
-    }
-    def shutdown(): Unit = Option(context).foreach(_.stop(self))
+      }
-    override def postStop() = {
-      logging.debug(this, "finisher shutdown")
-      preemptiveMsgs.foreach(_.cancel())
-      preemptiveMsgs = Vector.empty
-      context.stop(poller)
+      // Halt the schedule if the result is provided during one execution
+      result.future.onComplete(_ => schedule.cancel())
-  /**
-   * This creates the inner datastore poller for the completed activation.
-   * It is a factory method to facilitate testing.
-   */
-  private def poller(slowPollPeriod: FiniteDuration,
-                     promise: Promise[Either[ActivationId, WhiskActivation]],
-                     activationLookup: ActivationLookup)(implicit transid: TransactionId,
-                                                         actorSystem: ActorSystem,
-                                                         executionContext: ExecutionContext,
-                                                         logging: Logging): ActorRef = {
-    Scheduler.scheduleWaitAtMost(slowPollPeriod, initialDelay = slowPollPeriod, name = "dbpoll")(() => {
-      if (!promise.isCompleted) {
-        activationLookup() map {
-          // complete the future, which in turn will poison pill this scheduler
-          activation =>
-            promise.trySuccess(Right(activation.withoutLogs)) // logs excluded on blocking calls
-        } andThen {
-          case Failure(e: NoDocumentException) => // do nothing, scheduler will reschedule another poll
-          case Failure(t: Throwable) => // something went wrong, abort
-            logging.error(this, s"failed while waiting on result: ${t.getMessage}")
-            promise.tryFailure(t) // complete the future, which in turn will poison pill this scheduler
-        }
-      } else Future.successful({}) // the scheduler will be halted because the promise is now resolved
-    })
-  }
+  /** Max atomic action count allowed for sequences */
+  private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
diff --git a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala b/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
deleted file mode 100644
index 25f48ae130..0000000000
--- a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
+++ /dev/null
@@ -1,168 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package whisk.core.controller.actions.test
-import java.time.Instant
-import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-import common.StreamLogging
-import common.WskActorSystem
-import spray.json._
-import whisk.common.TransactionId
-import whisk.core.controller.actions.ActivationFinisher
-import whisk.core.entity._
-import whisk.core.entity.ActivationResponse
-import whisk.core.entity.size.SizeInt
-import whisk.core.database.NoDocumentException
-import akka.testkit.TestProbe
-import whisk.common.Scheduler
-class ActivationFinisherTests
-    extends FlatSpec
-    with BeforeAndAfterEach
-    with Matchers
-    with WskActorSystem
-    with StreamLogging {
-  implicit val tid = TransactionId.testing
-  val activation = WhiskActivation(
-    namespace = EntityPath("ns"),
-    name = EntityName("a"),
-    Subject(),
-    activationId = ActivationId(),
-    start =,
-    end =,
-    response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(2)))),
-    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
-    duration = Some(123))
-  var activationLookupCounter = 0
-  @volatile var activationResult: Option[Throwable] = None
-  def activationLookup(): Future[WhiskActivation] = {
-    activationLookupCounter += 1
-  }
-  override def beforeEach() = {
-    activationLookupCounter = 0
-    activationResult = None
-  }
-  behavior of "activation finisher"
-  val slowPoll = 200.milliseconds
-  val fastPoll = Seq()
-  it should "poll until promise is completed" in {
-    activationResult = Some(NoDocumentException(""))
-    val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    val slowPollWorkWindow = (slowPoll * 3) + (slowPoll / 2)
-    Thread.sleep(slowPollWorkWindow.toMillis)
-    activationLookupCounter should (be >= 2 and be <= 3)
-    // should terminate the parent finisher and child poller on completion
-    promise.trySuccess(Right(activation))
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-  }
-  it should "complete promise from poller" in {
-    val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
-    Thread.sleep(slowPollWorkWindow.toMillis)
-    activationLookupCounter should be(1)
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-    promise shouldBe 'completed
-  }
-  it should "finish when receiving corresponding message" in {
-    activationResult = Some(NoDocumentException(""))
-    val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
-    Thread.sleep(slowPollWorkWindow.toMillis)
-    activationLookupCounter should (be >= 1 and be <= 2)
-    // should terminate the parent finisher and child poller once message is received
-    finisher ! ActivationFinisher.Finish(Right(activation))
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-    promise shouldBe 'completed
-  }
-  it should "poll pre-emptively" in {
-    activationResult = Some(NoDocumentException(""))
-    val slowPoll = 600.milliseconds
-    val fastPoll = Seq(100.milliseconds, 200.milliseconds)
-    val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    Thread.sleep(500.milliseconds.toMillis)
-    activationLookupCounter should be(0)
-    // should cause polls
-    finisher ! Scheduler.WorkOnceNow
-    Thread.sleep(500.milliseconds.toMillis)
-    activationLookupCounter should be(3)
-    finisher ! PoisonPill
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-  }


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services