You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by gi...@git.apache.org on 2017/06/21 13:29:30 UTC

[GitHub] rabbah commented on a change in pull request #2384: Send active ack on failed activations

rabbah commented on a change in pull request #2384: Send active ack on failed activations
URL: https://github.com/apache/incubator-openwhisk/pull/2384#discussion_r123247830
 
 

 ##########
 File path: core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
 ##########
 @@ -102,95 +106,101 @@ protected[actions] trait PrimitiveActions {
             args,
             cause = cause)
 
-        val startActivation = transid.started(this, if (blocking) LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING else LoggingMarkers.CONTROLLER_ACTIVATION)
-        val startLoadbalancer = transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"[POST] action activation id: ${message.activationId}")
-        val postedFuture = loadBalancer.publish(action, message, activeAckTimeout)
-        postedFuture flatMap { activationResponse =>
+        val startActivation = transid.started(this, waitForResponse.map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING).getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION))
+        val startLoadbalancer = transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${message.activationId}")
+        val postedFuture = loadBalancer.publish(action, message)
+
+        postedFuture.flatMap { activeAckResponse =>
+            // successfully posted activation request to the message bus
             transid.finished(this, startLoadbalancer)
-            if (blocking) {
-                waitForActivationResponse(user, message.activationId, timeout, activationResponse) map {
-                    whiskActivation => (whiskActivation.activationId, Some(whiskActivation))
-                } andThen {
-                    case _ => transid.finished(this, startActivation)
-                }
-            } else {
+
+            // is caller waiting for the result of the activation?
+            waitForResponse.map { timeout =>
+                // yes, then wait for the activation response from the message bus
+                // (known as the active response or active ack)
+                waitForActivationResponse(user, message.activationId, timeout, activeAckResponse)
+                    .andThen { case _ => transid.finished(this, startActivation) }
+            }.getOrElse {
+                // no, return the activation id
                 transid.finished(this, startActivation)
-                Future.successful { (message.activationId, None) }
+                Future.successful(Left(message.activationId))
             }
         }
     }
 
     /**
-     * This is a fast path used for blocking calls in which we do not need the full WhiskActivation record from the DB.
-     * Polls for the activation response from an underlying data structure populated from Kafka active acknowledgements.
-     * If this mechanism fails to produce an answer quickly, the future will switch to polling the database for the response
-     * record.
+     * Waits for a response from the message bus (e.g., Kafka) containing the result of the activation. This is the fast path
+     * used for blocking calls where only the result of the activation is needed. This path is called active acknowledgement
+     * or active ack.
+     *
+     * While waiting for the active ack, periodically poll the datastore in case there is a failure in the fast path delivery
+     * which could happen if the connection from an invoker to the message bus is disrupted, or if the publishing of the response
+     * fails because the message is too large.
      */
-    private def waitForActivationResponse(user: Identity, activationId: ActivationId, totalWaitTime: FiniteDuration, activationResponse: Future[WhiskActivation])(implicit transid: TransactionId) = {
-        // this is the promise which active ack or db polling will try to complete in one of four ways:
-        // 1. active ack response
+    private def waitForActivationResponse(
+        user: Identity,
+        activationId: ActivationId,
+        totalWaitTime: FiniteDuration,
+        activeAckResponse: Future[Either[ActivationId, WhiskActivation]])(
+            implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
+        // this is the promise which active ack or db polling will try to complete via:
+        // 1. active ack response, or
         // 2. failing active ack (due to active ack timeout), fall over to db polling
         // 3. timeout on db polling => converts activation to non-blocking (returns activation id only)
-        // 4. internal error
-        val promise = Promise[WhiskActivation]
-        val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId))
+        // 4. internal error message
+        val promise = Promise[Either[ActivationId, WhiskActivation]]
 
-        logging.info(this, s"[POST] action activation will block on result up to $totalWaitTime")
+        logging.info(this, s"action activation will block for result upto $totalWaitTime")
 
-        // the active ack will timeout after specified duration, causing the db polling to kick in
-        activationResponse map {
-            activation => promise.trySuccess(activation)
+        activeAckResponse map {
+            case ok @ Right(_) => promise.trySuccess(ok) // activation complete, result is available
+            case _             => // no active ack received, no result available except by polling the db
         } onFailure {
-            case t: TimeoutException =>
-                logging.info(this, s"[POST] switching to poll db, active ack expired")
-                pollDbForResult(docid, activationId, promise)
-            case t: Throwable =>
-                logging.info(this, s"[POST] switching to poll db, active ack exception: ${t.getMessage}")
-                pollDbForResult(docid, activationId, promise)
+            case error: Throwable =>
+                // in case of catastrophic failure while waiting for the active ack, result
+                // is only available by polling db
+                logging.info(this, s"polling db, active ack exception: ${error.getMessage}")
         }
 
-        // install a timeout handler; this is the handler for "the action took longer than its Limit"
-        // note the use of WeakReferences; this is to avoid the handler's closure holding on to the
-        // WhiskActivation, which in turn holds on to the full JsObject of the response
-        val promiseRef = new java.lang.ref.WeakReference(promise)
-        actorSystem.scheduler.scheduleOnce(totalWaitTime) {
-            val p = promiseRef.get
-            if (p != null) {
-                p.tryFailure(new BlockingInvokeTimeout(activationId))
-            }
-        }
+        val poller = periodicallyPollForActivation(user, activationId, promise)
 
-        // return the future. note that pollDbForResult's isCompleted check will protect against unnecessary db activity
-        // that may overlap with a totalWaitTime timeout (because the promise will have already by tryFailure'd)
+        // return the promise which is either fulfilled by active ack, polling from the database,
+        // or the timeout alternative when the allowed duration expires (i.e., the action took
+        // longer than the permitted, per totalWaitTime).
         promise.future
+            .withAlternativeAfterTimeout(totalWaitTime, Future.successful(Left(activationId)))
+            .andThen { case _ => poller ! PoisonPill }
     }
 
-    /**
-     * Polls for activation record. It is assumed that an activation record is created atomically and never updated.
-     * Fetch the activation record by its id. If it exists, complete the promise. Otherwise recursively poll until
-     * either there is an error in the get, or the promise has completed because it timed out. The promise MUST
-     * complete in the caller to terminate the polling.
-     */
-    private def pollDbForResult(
-        docid: DocId,
+    /** Periodically polls the db to cover missing active acks. */
+    private def periodicallyPollForActivation(
+        user: Identity,
         activationId: ActivationId,
-        promise: Promise[WhiskActivation])(
-            implicit transid: TransactionId): Unit = {
-        // check if promise already completed due to timeout expiration (abort polling if so)
-        if (!promise.isCompleted) {
-            WhiskActivation.get(activationStore, docid) map {
-                activation => promise.trySuccess(activation.withoutLogs) // Logs always not provided on blocking call
-            } onFailure {
-                case e: NoDocumentException =>
-                    Thread.sleep(500)
-                    logging.debug(this, s"[POST] action activation not yet timed out, will poll for result")
-                    pollDbForResult(docid, activationId, promise)
-                case t: Throwable =>
-                    logging.error(this, s"[POST] action activation failed while waiting on result: ${t.getMessage}")
-                    promise.tryFailure(t)
-            }
-        } else {
-            logging.info(this, s"[POST] action activation timed out, terminated polling for result")
-        }
+        promise: Promise[Either[ActivationId, WhiskActivation]])(
+            implicit transid: TransactionId) = {
+
+        val datastorePollPeriodForActivation = 15.seconds
+        val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId))
+
+        Scheduler.scheduleWaitAtMost(
+            datastorePollPeriodForActivation,
+            initialDelay = datastorePollPeriodForActivation,
+            name = "DbPoll",
+            logLevel = InfoLevel)(() => {
+                Option(promise) match {
+                    case Some(p) if !p.isCompleted =>
+                        WhiskActivation.get(activationStore, docid) map {
+                            // complete the future, which in turn will poison pill this scheduler
+                            activation => p.trySuccess(Right(activation.withoutLogs)) // Logs always not provided on blocking call
+                        } andThen {
+                            case Failure(e: NoDocumentException) => // do nothing, scheduler will reschedule another poll
+                            case Failure(t: Throwable) => // something went wrong, abort
+                                logging.error(this, s"failed while waiting on result: ${t.getMessage}")
+                                p.tryFailure(t) // complete the future, which in turn will poison pill this scheduler
+                        }
+
+                    case _ => throw new Exception("activation completed") // will halt the scheduler
 
 Review comment:
   for benefit of others the future doesn't have a reference to containing actor.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services