You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2021/01/26 15:42:06 UTC

[openwhisk] branch master updated: Fixes bug in invoker supervision on startup. (#5050)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 6686820  Fixes bug in invoker supervision on startup. (#5050)
6686820 is described below

commit 66868205b52ee65f28756038c44d8df5b96d2bcc
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Tue Jan 26 10:41:53 2021 -0500

    Fixes bug in invoker supervision on startup. (#5050)
---
 .../core/loadBalancer/InvokerSupervision.scala     | 83 ++++++++++++++--------
 .../core/containerpool/ContainerPool.scala         |  8 ++-
 .../test/InvokerSupervisionTests.scala             | 51 ++++++++++++-
 3 files changed, 108 insertions(+), 34 deletions(-)

diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index 5a9d367..d0a648b 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -293,20 +293,12 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
 
   val healthyTimeout: FiniteDuration = 10.seconds
 
-  // This is done at this point to not intermingle with the state-machine
-  // especially their timeouts.
+  // This is done at this point to not intermingle with the state-machine especially their timeouts.
   def customReceive: Receive = {
-    case _: RecordMetadata => // The response of putting testactions to the MessageProducer. We don't have to do anything with them.
+    case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
   }
-  override def receive: Receive = customReceive.orElse(super.receive)
 
-  /** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
-  startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
-
-  /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
-  when(Offline) {
-    case Event(_: PingMessage, _) => goto(Unhealthy)
-  }
+  override def receive: Receive = customReceive.orElse(super.receive)
 
   // To be used for all states that should send test actions to reverify the invoker
   val healthPingingState: StateFunction = {
@@ -317,6 +309,22 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
       stay
   }
 
+  // To be used for all states that should send test actions to reverify the invoker
+  def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
+    case _ -> `state` =>
+      invokeTestAction()
+      setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
+    case `state` -> _ => cancelTimer(InvokerActor.timerName)
+  }
+
+  /** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
+  startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
+
+  /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
+  when(Offline) {
+    case Event(_: PingMessage, _) => goto(Unhealthy)
+  }
+
   /** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
   when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState)
 
@@ -324,20 +332,20 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
   when(Unresponsive, stateTimeout = healthyTimeout)(healthPingingState)
 
   /**
-   * A Healthy invoker is characterized by continuously getting pings. It will go offline if that state is not confirmed
-   * for 20 seconds.
+   * A Healthy invoker is characterized by continuously getting pings.
+   * It will go offline if that state is not confirmed for 20 seconds.
    */
   when(Healthy, stateTimeout = healthyTimeout) {
     case Event(_: PingMessage, _) => stay
     case Event(StateTimeout, _)   => goto(Offline)
   }
 
-  /** Handle the completion of an Activation in every state. */
+  /** Handles the completion of an Activation in every state. */
   whenUnhandled {
     case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer)
   }
 
-  /** Logging on Transition change */
+  /** Logs transition changes. */
   onTransition {
     case _ -> newState if !newState.isUsable =>
       transid.mark(
@@ -348,14 +356,6 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
     case _ -> newState if newState.isUsable => logging.info(this, s"$name is ${newState.asString}")
   }
 
-  // To be used for all states that should send test actions to reverify the invoker
-  def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
-    case _ -> `state` =>
-      invokeTestAction()
-      setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
-    case `state` -> _ => cancelTimer(InvokerActor.timerName)
-  }
-
   onTransition(healthPingingTransitionHandler(Unhealthy))
   onTransition(healthPingingTransitionHandler(Unresponsive))
 
@@ -372,8 +372,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
                                       buffer: RingBuffer[InvocationFinishedResult]) = {
     buffer.add(result)
 
-    // If the action is successful it seems like the Invoker is Healthy again. So we execute immediately
-    // a new test action to remove the errors out of the RingBuffer as fast as possible.
+    // If the action is successful, the Invoker is Healthy. We execute additional test actions
+    // immediately to clear the RingBuffer as fast as possible.
     // The actions that arrive while the invoker is unhealthy are most likely health actions.
     // It is possible they are normal user actions as well. This can happen if such actions were in the
     // invoker queue or in progress while the invoker's status flipped to Unhealthy.
@@ -381,19 +381,44 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
       invokeTestAction()
     }
 
-    // Stay in online if the activations was successful.
-    // Stay in offline, if an activeAck reaches the controller.
+    // Stay online if the activations was successful.
+    // Stay offline if an activeAck is received (a stale activation) but the invoker ceased pinging.
     if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) {
       stay
     } else {
       val entries = buffer.toList
-      // Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer, else goto Healthy
+
+      // Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer at steady state.
+      // Otherwise transition to Healthy on successful activations only.
       if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) {
+        // Note: The predicate is false if the ring buffer is still being primed
+        // (i.e., the entries.size <=  bufferErrorTolerance).
         gotoIfNotThere(Unhealthy)
       } else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) {
+        // Note: The predicate is false if the ring buffer is still being primed
+        // (i.e., the entries.size <=  bufferErrorTolerance).
         gotoIfNotThere(Unresponsive)
       } else {
-        gotoIfNotThere(Healthy)
+        result match {
+          case InvocationFinishedResult.Success =>
+            // Eagerly transition to healthy, at steady state (there aren't sufficient contra-indications) or
+            // during priming of the ring buffer. In case of the latter, there is at least one additional test
+            // action in flight which can reverse the transition later.
+            gotoIfNotThere(Healthy)
+
+          case InvocationFinishedResult.SystemError if (entries.size <= InvokerActor.bufferErrorTolerance) =>
+            // The ring buffer is not fully primed yet, stay/goto Unhealthy.
+            gotoIfNotThere(Unhealthy)
+
+          case InvocationFinishedResult.Timeout if (entries.size <= InvokerActor.bufferErrorTolerance) =>
+            // The ring buffer is not fully primed yet, stay/goto Unresponsive.
+            gotoIfNotThere(Unresponsive)
+
+          case _ =>
+            // At steady state, the state of the buffer superceded and we hold the current state
+            // until enough events have occured to transition to a new state.
+            stay
+        }
       }
     }
   }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 92f058b..4b66e99 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -270,19 +270,21 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
       freePool.get(sender()).foreach { f =>
         freePool = freePool - sender()
       }
+
       // container was busy (busy indicates at full capacity), so there is capacity to accept another job request
       busyPool.get(sender()).foreach { _ =>
         busyPool = busyPool - sender()
       }
       processBufferOrFeed()
 
-      //in case this was a prewarm
+      // in case this was a prewarm
       prewarmedPool.get(sender()).foreach { data =>
         prewarmedPool = prewarmedPool - sender()
       }
-      //in case this was a starting prewarm
+
+      // in case this was a starting prewarm
       prewarmStartingPool.get(sender()).foreach { _ =>
-        logging.info(this, "failed starting prewarm removed")
+        logging.info(this, "failed starting prewarm, removed")
         prewarmStartingPool = prewarmStartingPool - sender()
       }
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 1cf7383..7e0d88f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -207,6 +207,54 @@ class InvokerSupervisionTests
 
   behavior of "InvokerActor"
 
+  it should "start and stay unhealthy while min threshold is not met" in {
+    val invoker =
+      TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
+    invoker.stateName shouldBe Unhealthy
+
+    (1 to InvokerActor.bufferErrorTolerance + 1).foreach { _ =>
+      invoker ! InvocationFinishedMessage(
+        InvokerInstanceId(0, userMemory = defaultUserMemory),
+        InvocationFinishedResult.SystemError)
+      invoker.stateName shouldBe Unhealthy
+    }
+
+    (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance - 1).foreach { _ =>
+      invoker ! InvocationFinishedMessage(
+        InvokerInstanceId(0, userMemory = defaultUserMemory),
+        InvocationFinishedResult.Success)
+      invoker.stateName shouldBe Unhealthy
+    }
+
+    invoker ! InvocationFinishedMessage(
+      InvokerInstanceId(0, userMemory = defaultUserMemory),
+      InvocationFinishedResult.Success)
+    invoker.stateName shouldBe Healthy
+  }
+
+  it should "revert to unhealthy during initial startup if there is a failed test activation" in {
+    assume(InvokerActor.bufferErrorTolerance >= 3)
+
+    val invoker =
+      TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
+    invoker.stateName shouldBe Unhealthy
+
+    invoker ! InvocationFinishedMessage(
+      InvokerInstanceId(0, userMemory = defaultUserMemory),
+      InvocationFinishedResult.SystemError)
+    invoker.stateName shouldBe Unhealthy
+
+    invoker ! InvocationFinishedMessage(
+      InvokerInstanceId(0, userMemory = defaultUserMemory),
+      InvocationFinishedResult.Success)
+    invoker.stateName shouldBe Healthy
+
+    invoker ! InvocationFinishedMessage(
+      InvokerInstanceId(0, userMemory = defaultUserMemory),
+      InvocationFinishedResult.SystemError)
+    invoker.stateName shouldBe Unhealthy
+  }
+
   // unHealthy -> offline
   // offline -> unhealthy
   it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
@@ -318,7 +366,7 @@ class InvokerSupervisionTests
     }
   }
 
-  it should "start timer to send testactions when unhealthy" in {
+  it should "start timer to send test actions when unhealthy" in {
     val invoker =
       TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
     invoker.stateName shouldBe Unhealthy
@@ -337,7 +385,6 @@ class InvokerSupervisionTests
   }
 
   it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {
-
     val invoker0 = TestProbe()
     val children = mutable.Queue(invoker0.ref)
     val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()