You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/02/21 20:39:27 UTC

[incubator-openwhisk] branch master updated: Refactor activation finisher without actor. (#3310)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 11a112b  Refactor activation finisher without actor. (#3310)
11a112b is described below

commit 11a112b9c262750b795816a2fef5c24b99ba2de8
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Wed Feb 21 21:39:23 2018 +0100

    Refactor activation finisher without actor. (#3310)
---
 .../core/controller/actions/PrimitiveActions.scala | 209 +++++----------------
 .../actions/test/ActivationFinisherTests.scala     | 168 -----------------
 2 files changed, 48 insertions(+), 329 deletions(-)

diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index f77d9bf..b7d7737 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -17,39 +17,27 @@
 
 package whisk.core.controller.actions
 
-import java.time.Clock
-import java.time.Instant
+import java.time.{Clock, Instant}
 
-import scala.collection.mutable.Buffer
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
-import akka.actor.Actor
-import akka.actor.ActorRef
 import akka.actor.ActorSystem
-import akka.actor.Cancellable
-import akka.actor.Props
+import akka.event.Logging.InfoLevel
 import spray.json._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.Scheduler
-import whisk.common.TransactionId
+import whisk.common.{Logging, LoggingMarkers, TransactionId}
 import whisk.core.connector.ActivationMessage
 import whisk.core.controller.WhiskServices
 import whisk.core.database.NoDocumentException
-import whisk.core.entitlement._
-import whisk.core.entitlement.Resource
+import whisk.core.entitlement.{Resource, _}
 import whisk.core.entity._
 import whisk.core.entity.size.SizeInt
-import whisk.core.entity.types.ActivationStore
-import whisk.core.entity.types.EntityStore
+import whisk.core.entity.types.{ActivationStore, EntityStore}
 import whisk.http.Messages._
 import whisk.utils.ExecutionContextFactory.FutureExtensions
-import akka.event.Logging.InfoLevel
+
+import scala.collection.mutable.Buffer
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
 
 protected[actions] trait PrimitiveActions {
   /** The core collections require backend services to be injected in this trait. */
@@ -569,158 +557,57 @@ protected[actions] trait PrimitiveActions {
                                         totalWaitTime: FiniteDuration,
                                         activeAckResponse: Future[Either[ActivationId, WhiskActivation]])(
     implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
-    // this is the promise which active ack or db polling will try to complete via:
-    // 1. active ack response, or
-    // 2. failing active ack (due to active ack timeout), fall over to db polling
-    // 3. timeout on db polling => converts activation to non-blocking (returns activation id only)
-    // 4. internal error message
-    val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId))
-    val (promise, finisher) = ActivationFinisher.props({ () =>
-      WhiskActivation.get(activationStore, docid)
-    })
+    val result = Promise[Either[ActivationId, WhiskActivation]]
 
+    val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId))
     logging.debug(this, s"action activation will block for result upto $totalWaitTime")
 
-    activeAckResponse map {
-      case result @ Right(_) =>
-        // activation complete, result is available
-        finisher ! ActivationFinisher.Finish(result)
-
-      case _ =>
-        // active ack received but it does not carry the response,
-        // no result available except by polling the db
-        logging.warn(this, "preemptively polling db because active ack is missing result")
-        finisher ! Scheduler.WorkOnceNow
+    // 1. Wait for the active-ack to happen. Either immediately resolve the promise or poll the database quickly
+    //    in case of an incomplete active-ack (record too large for example).
+    activeAckResponse.foreach {
+      case Right(activation) => result.trySuccess(Right(activation))
+      case _                 => pollActivation(docid, result, i => 1.seconds + (2.seconds * i), maxRetries = 4)
     }
 
-    // return the promise which is either fulfilled by active ack, polling from the database,
-    // or the timeout alternative when the allowed duration expires (i.e., the action took
-    // longer than the permitted, per totalWaitTime).
-    promise.withAlternativeAfterTimeout(
-      totalWaitTime, {
-        Future.successful(Left(activationId)).andThen {
-          // result no longer interesting; terminate the finisher/shut down db polling if necessary
-          case _ => actorSystem.stop(finisher)
-        }
-      })
-  }
-
-  /** Max atomic action count allowed for sequences */
-  private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
-}
-
-/** Companion to the ActivationFinisher. */
-protected[actions] object ActivationFinisher {
-  case class Finish(activation: Right[ActivationId, WhiskActivation])
-
-  private type ActivationLookup = () => Future[WhiskActivation]
+    // 2. Poll the database slowly in case the active-ack never arrives
+    pollActivation(docid, result, _ => 15.seconds)
 
-  /** Periodically polls the db to cover missing active acks. */
-  private val datastorePollPeriodForActivation = 15.seconds
+    // 3. Timeout forces a fallback to activationId
+    val timeout = actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId)))
 
-  /**
-   * In case of a partial active ack where it is know an activation completed
-   * but the result could not be sent over the bus, use this periodicity to poll
-   * for a result.
-   */
-  private val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 7.seconds)
-
-  def props(activationLookup: ActivationLookup)(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging): (Future[Either[ActivationId, WhiskActivation]], ActorRef) = {
-
-    val (p, _, f) = props(activationLookup, datastorePollPeriodForActivation, datastorePreemptivePolling)
-    (p.future, f) // hides the polling actor
-  }
-
-  /**
-   * Creates the finishing actor.
-   * This is factored for testing.
-   */
-  protected[actions] def props(activationLookup: ActivationLookup,
-                               slowPoll: FiniteDuration,
-                               fastPolls: Seq[FiniteDuration])(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging): (Promise[Either[ActivationId, WhiskActivation]], ActorRef, ActorRef) = {
-
-    // this is strictly completed by the finishing actor
-    val promise = Promise[Either[ActivationId, WhiskActivation]]
-    val dbpoller = poller(slowPoll, promise, activationLookup)
-    val finisher = Props(new ActivationFinisher(dbpoller, fastPolls, promise))
-
-    (promise, dbpoller, actorSystem.actorOf(finisher))
+    result.future.andThen {
+      case _ => timeout.cancel()
+    }
   }
 
   /**
-   * An actor to complete a blocking activation request. It encapsulates a promise
-   * to be completed when the result is ready. This may happen in one of two ways.
-   * An active ack message is relayed to this actor to complete the promise when
-   * the active ack is received. Or in case of a partial/missing active ack, an
-   * explicitly scheduled datastore poll of the activation record, if found, will
-   * complete the transaction. When the promise is fulfilled, the actor self destructs.
+   * Polls the database for an activation.
+   *
+   * Does not use Future composition because an early exit is wanted, once any possible external source resolved the
+   * Promise.
+   *
+   * @param docid the docid to poll for
+   * @param result promise to resolve on result. Is also used to abort polling once completed.
    */
-  private class ActivationFinisher(poller: ActorRef, // the activation poller
-                                   fastPollPeriods: Seq[FiniteDuration],
-                                   promise: Promise[Either[ActivationId, WhiskActivation]])(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging)
-      extends Actor {
-
-    // when the future completes, self-destruct
-    promise.future.andThen { case _ => shutdown() }
-
-    var preemptiveMsgs = Vector.empty[Cancellable]
-
-    def receive = {
-      case ActivationFinisher.Finish(activation) =>
-        promise.trySuccess(activation)
-
-      case msg @ Scheduler.WorkOnceNow =>
-        // try up to three times when pre-emptying the schedule
-        fastPollPeriods.foreach { s =>
-          preemptiveMsgs = preemptiveMsgs :+ context.system.scheduler.scheduleOnce(s, poller, msg)
+  private def pollActivation(docid: DocId,
+                             result: Promise[Either[ActivationId, WhiskActivation]],
+                             wait: Int => FiniteDuration,
+                             retries: Int = 0,
+                             maxRetries: Int = Int.MaxValue)(implicit transid: TransactionId): Unit = {
+    if (!result.isCompleted && retries < maxRetries) {
+      val schedule = actorSystem.scheduler.scheduleOnce(wait(retries)) {
+        WhiskActivation.get(activationStore, docid).onComplete {
+          case Success(activation)             => result.trySuccess(Right(activation))
+          case Failure(_: NoDocumentException) => pollActivation(docid, result, wait, retries + 1, maxRetries)
+          case Failure(t: Throwable)           => result.tryFailure(t)
         }
-    }
-
-    def shutdown(): Unit = Option(context).foreach(_.stop(self))
+      }
 
-    override def postStop() = {
-      logging.debug(this, "finisher shutdown")
-      preemptiveMsgs.foreach(_.cancel())
-      preemptiveMsgs = Vector.empty
-      context.stop(poller)
+      // Halt the schedule if the result is provided during one execution
+      result.future.onComplete(_ => schedule.cancel())
     }
   }
 
-  /**
-   * This creates the inner datastore poller for the completed activation.
-   * It is a factory method to facilitate testing.
-   */
-  private def poller(slowPollPeriod: FiniteDuration,
-                     promise: Promise[Either[ActivationId, WhiskActivation]],
-                     activationLookup: ActivationLookup)(implicit transid: TransactionId,
-                                                         actorSystem: ActorSystem,
-                                                         executionContext: ExecutionContext,
-                                                         logging: Logging): ActorRef = {
-    Scheduler.scheduleWaitAtMost(slowPollPeriod, initialDelay = slowPollPeriod, name = "dbpoll")(() => {
-      if (!promise.isCompleted) {
-        activationLookup() map {
-          // complete the future, which in turn will poison pill this scheduler
-          activation =>
-            promise.trySuccess(Right(activation.withoutLogs)) // logs excluded on blocking calls
-        } andThen {
-          case Failure(e: NoDocumentException) => // do nothing, scheduler will reschedule another poll
-          case Failure(t: Throwable) => // something went wrong, abort
-            logging.error(this, s"failed while waiting on result: ${t.getMessage}")
-            promise.tryFailure(t) // complete the future, which in turn will poison pill this scheduler
-        }
-      } else Future.successful({}) // the scheduler will be halted because the promise is now resolved
-    })
-  }
+  /** Max atomic action count allowed for sequences */
+  private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
 }
diff --git a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala b/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
deleted file mode 100644
index 25f48ae..0000000
--- a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.controller.actions.test
-
-import java.time.Instant
-
-import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-
-import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-
-import common.StreamLogging
-import common.WskActorSystem
-import spray.json._
-import whisk.common.TransactionId
-import whisk.core.controller.actions.ActivationFinisher
-import whisk.core.entity._
-import whisk.core.entity.ActivationResponse
-import whisk.core.entity.size.SizeInt
-import whisk.core.database.NoDocumentException
-import akka.testkit.TestProbe
-import whisk.common.Scheduler
-import akka.actor.PoisonPill
-
-@RunWith(classOf[JUnitRunner])
-class ActivationFinisherTests
-    extends FlatSpec
-    with BeforeAndAfterEach
-    with Matchers
-    with WskActorSystem
-    with StreamLogging {
-
-  implicit val tid = TransactionId.testing
-
-  val activation = WhiskActivation(
-    namespace = EntityPath("ns"),
-    name = EntityName("a"),
-    Subject(),
-    activationId = ActivationId(),
-    start = Instant.now(),
-    end = Instant.now(),
-    response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(2)))),
-    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
-    duration = Some(123))
-
-  var activationLookupCounter = 0
-  @volatile var activationResult: Option[Throwable] = None
-
-  def activationLookup(): Future[WhiskActivation] = {
-    activationLookupCounter += 1
-    activationResult.map(Future.failed(_)).getOrElse(Future.successful(activation))
-  }
-
-  override def beforeEach() = {
-    activationLookupCounter = 0
-    activationResult = None
-  }
-
-  behavior of "activation finisher"
-
-  val slowPoll = 200.milliseconds
-  val fastPoll = Seq()
-
-  it should "poll until promise is completed" in {
-    activationResult = Some(NoDocumentException(""))
-    val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    testProbePoller.watch(poller)
-    testProbeFinisher.watch(finisher)
-
-    val slowPollWorkWindow = (slowPoll * 3) + (slowPoll / 2)
-    Thread.sleep(slowPollWorkWindow.toMillis)
-    activationLookupCounter should (be >= 2 and be <= 3)
-
-    // should terminate the parent finisher and child poller on completion
-    promise.trySuccess(Right(activation))
-
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-  }
-
-  it should "complete promise from poller" in {
-    val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    testProbePoller.watch(poller)
-    testProbeFinisher.watch(finisher)
-
-    val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
-    Thread.sleep(slowPollWorkWindow.toMillis)
-    activationLookupCounter should be(1)
-
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-
-    promise shouldBe 'completed
-  }
-
-  it should "finish when receiving corresponding message" in {
-    activationResult = Some(NoDocumentException(""))
-    val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    testProbePoller.watch(poller)
-    testProbeFinisher.watch(finisher)
-
-    val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
-    Thread.sleep(slowPollWorkWindow.toMillis)
-    activationLookupCounter should (be >= 1 and be <= 2)
-
-    // should terminate the parent finisher and child poller once message is received
-    finisher ! ActivationFinisher.Finish(Right(activation))
-
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-
-    promise shouldBe 'completed
-  }
-
-  it should "poll pre-emptively" in {
-    activationResult = Some(NoDocumentException(""))
-    val slowPoll = 600.milliseconds
-    val fastPoll = Seq(100.milliseconds, 200.milliseconds)
-    val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    testProbePoller.watch(poller)
-    testProbeFinisher.watch(finisher)
-
-    Thread.sleep(500.milliseconds.toMillis)
-    activationLookupCounter should be(0)
-
-    // should cause polls
-    finisher ! Scheduler.WorkOnceNow
-    Thread.sleep(500.milliseconds.toMillis)
-    activationLookupCounter should be(3)
-
-    finisher ! PoisonPill
-
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-  }
-
-}

-- 
To stop receiving notification emails like this one, please contact
rabbah@apache.org.