You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/09/27 19:49:32 UTC
[incubator-openwhisk] branch master updated: Treat a timed out
active ack as failed activation in invokerhealth protocol (#2658)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 8e7c8e5 Treat a timed out active ack as failed activation in invokerhealth protocol (#2658)
8e7c8e5 is described below
commit 8e7c8e5bc7f2cd672c2314d13e56ef8f94cb96f6
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Wed Sep 27 21:49:29 2017 +0200
Treat a timed out active ack as failed activation in invokerhealth protocol (#2658)
---
.../core/loadBalancer/InvokerSupervision.scala | 6 ++--
.../core/loadBalancer/LoadBalancerService.scala | 33 ++++++++++++++++------
2 files changed, 29 insertions(+), 10 deletions(-)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 94fc070..13517de 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -283,9 +283,11 @@ class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId)
private def handleCompletionMessage(wasActivationSuccessful: Boolean, buffer: RingBuffer[Boolean]) = {
buffer.add(wasActivationSuccessful)
- // If the current state is UnHealthy, then the active ack is the result of a test action.
- // If this is successful it seems like the Invoker is Healthy again. So we execute immediately
+ // If the action is successful it seems like the Invoker is Healthy again. So we execute immediately
// a new test action to remove the errors out of the RingBuffer as fast as possible.
+ // The actions that arrive while the invoker is unhealthy are most likely health actions.
+ // It is possible they are normal user actions as well. This can happen if such actions were in the
+ // invoker queue or in progress while the invoker's status flipped to Unhealthy.
if (wasActivationSuccessful && stateName == UnHealthy) {
invokeTestAction()
}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index b7ee0dd..c957fa2 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -125,19 +125,39 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
*/
private def processCompletion(response: Either[ActivationId, WhiskActivation],
tid: TransactionId,
- forced: Boolean): Unit = {
+ forced: Boolean,
+ invoker: InstanceId): Unit = {
val aid = response.fold(l => l, r => r.activationId)
+
+ // treat left as success (as it is the result of a message exceeding the bus limit)
+ val isSuccess = response.fold(l => true, r => !r.response.isWhiskError)
+
loadBalancerData.removeActivation(aid) match {
case Some(entry) =>
logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
+ // Active acks that are received here are strictly from user actions - health actions are not part of
+ // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
+ // If the active ack was forced, because the waiting period expired, treat it as a failed activation.
+ // A cluster of such failures will eventually turn the invoker unhealthy and suspend queuing activations
+ // to that invoker topic.
+ invokerPool ! InvocationFinishedMessage(invoker, isSuccess && !forced)
if (!forced) {
entry.promise.trySuccess(response)
} else {
entry.promise.tryFailure(new Throwable("no active ack received"))
}
- case None =>
- // the entry was already removed
+ case None if !forced =>
+ // the entry has already been removed but we receive an active ack for this activation Id.
+ // This happens for health actions, because they don't have an entry in Loadbalancerdata or
+ // for activations that already timed out.
+ // For both cases, it looks like the invoker works again and we should send the status of
+ // the activation to the invokerPool.
+ invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
+ case None =>
+ // the entry has already been removed by an active ack. This part of the code is reached by the timeout.
+ // As the active ack is already processed we don't have to do anything here.
+ logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
}
}
@@ -156,7 +176,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
// in this case, if the activation handler is still registered, remove it and update the books.
loadBalancerData.putActivation(activationId, {
actorSystem.scheduler.scheduleOnce(timeout) {
- processCompletion(Left(activationId), transid, forced = true)
+ processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
}
ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]())
@@ -254,11 +274,8 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
val raw = new String(bytes, StandardCharsets.UTF_8)
CompletionMessage.parse(raw) match {
case Success(m: CompletionMessage) =>
- processCompletion(m.response, m.transid, false)
- // treat left as success (as it is the result a the message exceeding the bus limit)
- val isSuccess = m.response.fold(l => true, r => !r.response.isWhiskError)
+ processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
activationFeed ! MessageFeed.Processed
- invokerPool ! InvocationFinishedMessage(m.invoker, isSuccess)
case Failure(t) =>
activationFeed ! MessageFeed.Processed
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].