You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/09/27 19:49:32 UTC

[incubator-openwhisk] branch master updated: Treat a timed out active ack as failed activation in invokerhealth protocol (#2658)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e7c8e5  Treat a timed out active ack as failed activation in invokerhealth protocol (#2658)
8e7c8e5 is described below

commit 8e7c8e5bc7f2cd672c2314d13e56ef8f94cb96f6
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Wed Sep 27 21:49:29 2017 +0200

    Treat a timed out active ack as failed activation in invokerhealth protocol (#2658)
---
 .../core/loadBalancer/InvokerSupervision.scala     |  6 ++--
 .../core/loadBalancer/LoadBalancerService.scala    | 33 ++++++++++++++++------
 2 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 94fc070..13517de 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -283,9 +283,11 @@ class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId)
   private def handleCompletionMessage(wasActivationSuccessful: Boolean, buffer: RingBuffer[Boolean]) = {
     buffer.add(wasActivationSuccessful)
 
-    // If the current state is UnHealthy, then the active ack is the result of a test action.
-    // If this is successful it seems like the Invoker is Healthy again. So we execute immediately
+    // If the action is successful it seems like the Invoker is Healthy again. So we execute immediately
     // a new test action to remove the errors out of the RingBuffer as fast as possible.
+    // The actions that arrive while the invoker is unhealthy are most likely health actions.
+    // It is possible they are normal user actions as well. This can happen if such actions were in the
+    // invoker queue or in progress while the invoker's status flipped to Unhealthy.
     if (wasActivationSuccessful && stateName == UnHealthy) {
       invokeTestAction()
     }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index b7ee0dd..c957fa2 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -125,19 +125,39 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
    */
   private def processCompletion(response: Either[ActivationId, WhiskActivation],
                                 tid: TransactionId,
-                                forced: Boolean): Unit = {
+                                forced: Boolean,
+                                invoker: InstanceId): Unit = {
     val aid = response.fold(l => l, r => r.activationId)
+
+    // treat left as success (as it is the result of a message exceeding the bus limit)
+    val isSuccess = response.fold(l => true, r => !r.response.isWhiskError)
+
     loadBalancerData.removeActivation(aid) match {
       case Some(entry) =>
         logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
+        // Active acks that are received here are strictly from user actions - health actions are not part of
+        // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
+        // If the active ack was forced, because the waiting period expired, treat it as a failed activation.
+        // A cluster of such failures will eventually turn the invoker unhealthy and suspend queuing activations
+        // to that invoker topic.
+        invokerPool ! InvocationFinishedMessage(invoker, isSuccess && !forced)
         if (!forced) {
           entry.promise.trySuccess(response)
         } else {
           entry.promise.tryFailure(new Throwable("no active ack received"))
         }
-      case None =>
-        // the entry was already removed
+      case None if !forced =>
+        // the entry has already been removed but we receive an active ack for this activation Id.
+        // This happens for health actions, because they don't have an entry in Loadbalancerdata or
+        // for activations that already timed out.
+        // For both cases, it looks like the invoker works again and we should send the status of
+        // the activation to the invokerPool.
+        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
         logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
+      case None =>
+        // the entry has already been removed by an active ack. This part of the code is reached by the timeout.
+        // As the active ack is already processed we don't have to do anything here.
+        logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
     }
   }
 
@@ -156,7 +176,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
     // in this case, if the activation handler is still registered, remove it and update the books.
     loadBalancerData.putActivation(activationId, {
       actorSystem.scheduler.scheduleOnce(timeout) {
-        processCompletion(Left(activationId), transid, forced = true)
+        processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
       }
 
       ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]())
@@ -254,11 +274,8 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
     val raw = new String(bytes, StandardCharsets.UTF_8)
     CompletionMessage.parse(raw) match {
       case Success(m: CompletionMessage) =>
-        processCompletion(m.response, m.transid, false)
-        // treat left as success (as it is the result a the message exceeding the bus limit)
-        val isSuccess = m.response.fold(l => true, r => !r.response.isWhiskError)
+        processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
         activationFeed ! MessageFeed.Processed
-        invokerPool ! InvocationFinishedMessage(m.invoker, isSuccess)
 
       case Failure(t) =>
         activationFeed ! MessageFeed.Processed

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].