You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2021/01/26 15:42:06 UTC
[openwhisk] branch master updated: Fixes bug in invoker supervision
on startup. (#5050)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 6686820 Fixes bug in invoker supervision on startup. (#5050)
6686820 is described below
commit 66868205b52ee65f28756038c44d8df5b96d2bcc
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Tue Jan 26 10:41:53 2021 -0500
Fixes bug in invoker supervision on startup. (#5050)
---
.../core/loadBalancer/InvokerSupervision.scala | 83 ++++++++++++++--------
.../core/containerpool/ContainerPool.scala | 8 ++-
.../test/InvokerSupervisionTests.scala | 51 ++++++++++++-
3 files changed, 108 insertions(+), 34 deletions(-)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index 5a9d367..d0a648b 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -293,20 +293,12 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
val healthyTimeout: FiniteDuration = 10.seconds
- // This is done at this point to not intermingle with the state-machine
- // especially their timeouts.
+ // This is done at this point to not intermingle with the state-machine especially their timeouts.
def customReceive: Receive = {
- case _: RecordMetadata => // The response of putting testactions to the MessageProducer. We don't have to do anything with them.
+ case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
}
- override def receive: Receive = customReceive.orElse(super.receive)
- /** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
- startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
-
- /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
- when(Offline) {
- case Event(_: PingMessage, _) => goto(Unhealthy)
- }
+ override def receive: Receive = customReceive.orElse(super.receive)
// To be used for all states that should send test actions to reverify the invoker
val healthPingingState: StateFunction = {
@@ -317,6 +309,22 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
stay
}
+ // To be used for all states that should send test actions to reverify the invoker
+ def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
+ case _ -> `state` =>
+ invokeTestAction()
+ setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
+ case `state` -> _ => cancelTimer(InvokerActor.timerName)
+ }
+
+ /** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
+ startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
+
+ /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
+ when(Offline) {
+ case Event(_: PingMessage, _) => goto(Unhealthy)
+ }
+
/** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState)
@@ -324,20 +332,20 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
when(Unresponsive, stateTimeout = healthyTimeout)(healthPingingState)
/**
- * A Healthy invoker is characterized by continuously getting pings. It will go offline if that state is not confirmed
- * for 20 seconds.
+ * A Healthy invoker is characterized by continuously getting pings.
+ * It will go offline if that state is not confirmed for 20 seconds.
*/
when(Healthy, stateTimeout = healthyTimeout) {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
}
- /** Handle the completion of an Activation in every state. */
+ /** Handles the completion of an Activation in every state. */
whenUnhandled {
case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer)
}
- /** Logging on Transition change */
+ /** Logs transition changes. */
onTransition {
case _ -> newState if !newState.isUsable =>
transid.mark(
@@ -348,14 +356,6 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
case _ -> newState if newState.isUsable => logging.info(this, s"$name is ${newState.asString}")
}
- // To be used for all states that should send test actions to reverify the invoker
- def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
- case _ -> `state` =>
- invokeTestAction()
- setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
- case `state` -> _ => cancelTimer(InvokerActor.timerName)
- }
-
onTransition(healthPingingTransitionHandler(Unhealthy))
onTransition(healthPingingTransitionHandler(Unresponsive))
@@ -372,8 +372,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
buffer: RingBuffer[InvocationFinishedResult]) = {
buffer.add(result)
- // If the action is successful it seems like the Invoker is Healthy again. So we execute immediately
- // a new test action to remove the errors out of the RingBuffer as fast as possible.
+ // If the action is successful, the Invoker is Healthy. We execute additional test actions
+ // immediately to clear the RingBuffer as fast as possible.
// The actions that arrive while the invoker is unhealthy are most likely health actions.
// It is possible they are normal user actions as well. This can happen if such actions were in the
// invoker queue or in progress while the invoker's status flipped to Unhealthy.
@@ -381,19 +381,44 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
invokeTestAction()
}
- // Stay in online if the activations was successful.
- // Stay in offline, if an activeAck reaches the controller.
+ // Stay online if the activations was successful.
+ // Stay offline if an activeAck is received (a stale activation) but the invoker ceased pinging.
if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) {
stay
} else {
val entries = buffer.toList
- // Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer, else goto Healthy
+
+ // Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer at steady state.
+ // Otherwise transition to Healthy on successful activations only.
if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) {
+ // Note: The predicate is false if the ring buffer is still being primed
+ // (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unhealthy)
} else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) {
+ // Note: The predicate is false if the ring buffer is still being primed
+ // (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unresponsive)
} else {
- gotoIfNotThere(Healthy)
+ result match {
+ case InvocationFinishedResult.Success =>
+ // Eagerly transition to healthy, at steady state (there aren't sufficient contra-indications) or
+ // during priming of the ring buffer. In case of the latter, there is at least one additional test
+ // action in flight which can reverse the transition later.
+ gotoIfNotThere(Healthy)
+
+ case InvocationFinishedResult.SystemError if (entries.size <= InvokerActor.bufferErrorTolerance) =>
+ // The ring buffer is not fully primed yet, stay/goto Unhealthy.
+ gotoIfNotThere(Unhealthy)
+
+ case InvocationFinishedResult.Timeout if (entries.size <= InvokerActor.bufferErrorTolerance) =>
+ // The ring buffer is not fully primed yet, stay/goto Unresponsive.
+ gotoIfNotThere(Unresponsive)
+
+ case _ =>
+ // At steady state, the state of the buffer superceded and we hold the current state
+ // until enough events have occured to transition to a new state.
+ stay
+ }
}
}
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 92f058b..4b66e99 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -270,19 +270,21 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
freePool.get(sender()).foreach { f =>
freePool = freePool - sender()
}
+
// container was busy (busy indicates at full capacity), so there is capacity to accept another job request
busyPool.get(sender()).foreach { _ =>
busyPool = busyPool - sender()
}
processBufferOrFeed()
- //in case this was a prewarm
+ // in case this was a prewarm
prewarmedPool.get(sender()).foreach { data =>
prewarmedPool = prewarmedPool - sender()
}
- //in case this was a starting prewarm
+
+ // in case this was a starting prewarm
prewarmStartingPool.get(sender()).foreach { _ =>
- logging.info(this, "failed starting prewarm removed")
+ logging.info(this, "failed starting prewarm, removed")
prewarmStartingPool = prewarmStartingPool - sender()
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 1cf7383..7e0d88f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -207,6 +207,54 @@ class InvokerSupervisionTests
behavior of "InvokerActor"
+ it should "start and stay unhealthy while min threshold is not met" in {
+ val invoker =
+ TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
+ invoker.stateName shouldBe Unhealthy
+
+ (1 to InvokerActor.bufferErrorTolerance + 1).foreach { _ =>
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.SystemError)
+ invoker.stateName shouldBe Unhealthy
+ }
+
+ (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance - 1).foreach { _ =>
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
+ invoker.stateName shouldBe Unhealthy
+ }
+
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
+ invoker.stateName shouldBe Healthy
+ }
+
+ it should "revert to unhealthy during initial startup if there is a failed test activation" in {
+ assume(InvokerActor.bufferErrorTolerance >= 3)
+
+ val invoker =
+ TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
+ invoker.stateName shouldBe Unhealthy
+
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.SystemError)
+ invoker.stateName shouldBe Unhealthy
+
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
+ invoker.stateName shouldBe Healthy
+
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.SystemError)
+ invoker.stateName shouldBe Unhealthy
+ }
+
// unHealthy -> offline
// offline -> unhealthy
it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
@@ -318,7 +366,7 @@ class InvokerSupervisionTests
}
}
- it should "start timer to send testactions when unhealthy" in {
+ it should "start timer to send test actions when unhealthy" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy
@@ -337,7 +385,6 @@ class InvokerSupervisionTests
}
it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {
-
val invoker0 = TestProbe()
val children = mutable.Queue(invoker0.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()