You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2022/03/29 07:34:14 UTC

[openwhisk] branch master updated: add enable/disable invoker support to old scheduler (#5205)

This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b6d07a  add enable/disable invoker support to old scheduler (#5205)
3b6d07a is described below

commit 3b6d07a18cec3e92ceb5c54558464d1bfdbc0f82
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Tue Mar 29 00:33:56 2022 -0700

    add enable/disable invoker support to old scheduler (#5205)
    
    * add enable/disable invoker support to old scheduler and add is enabled route for invoker
    
    * feedback
    
    * fix enable complete
    
    * fail fast invoker to offline with updated ping message
    
    * test compilation
    
    * add tests
    
    * fix tests
    
    Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
 .../apache/openwhisk/core/connector/Message.scala  |  6 ++--
 .../core/loadBalancer/InvokerSupervision.scala     | 20 +++++++++---
 .../core/invoker/DefaultInvokerServer.scala        |  2 ++
 .../core/invoker/FPCInvokerReactive.scala          |  7 +++-
 .../openwhisk/core/invoker/FPCInvokerServer.scala  |  2 ++
 .../apache/openwhisk/core/invoker/Invoker.scala    | 11 +++++++
 .../openwhisk/core/invoker/InvokerReactive.scala   | 37 +++++++++++++++++-----
 .../invoker/test/DefaultInvokerServerTests.scala   | 17 ++++++++++
 .../core/invoker/test/FPCInvokerServerTests.scala  | 17 ++++++++++
 .../test/InvokerSupervisionTests.scala             | 34 ++++++++++++++++++++
 10 files changed, 137 insertions(+), 16 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index 9123747..e823cf1 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -278,14 +278,16 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
   }
 }
 
-case class PingMessage(instance: InvokerInstanceId) extends Message {
+case class PingMessage(instance: InvokerInstanceId, isEnabled: Option[Boolean] = None) extends Message {
   override def serialize = PingMessage.serdes.write(this).compactPrint
+
+  def invokerEnabled: Boolean = isEnabled.getOrElse(true)
 }
 
 object PingMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(serdes.read(msg.parseJson))
 
-  implicit val serdes = jsonFormat(PingMessage.apply _, "name")
+  implicit val serdes = jsonFormat(PingMessage.apply, "name", "isEnabled")
 }
 
 trait EventMessageBody extends Message {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index 86ece17..f526da0 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -280,8 +280,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
 
   // To be used for all states that should send test actions to reverify the invoker
   val healthPingingState: StateFunction = {
-    case Event(_: PingMessage, _) => stay
-    case Event(StateTimeout, _)   => goto(Offline)
+    case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping)
+    case Event(StateTimeout, _)      => goto(Offline)
     case Event(Tick, _) =>
       invokeTestAction()
       stay
@@ -300,7 +300,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
 
   /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
   when(Offline) {
-    case Event(_: PingMessage, _) => goto(Unhealthy)
+    case Event(ping: PingMessage, _) => if (ping.invokerEnabled) goto(Unhealthy) else stay
   }
 
   /** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
@@ -314,8 +314,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
    * It will go offline if that state is not confirmed for 20 seconds.
    */
   when(Healthy, stateTimeout = healthyTimeout) {
-    case Event(_: PingMessage, _) => stay
-    case Event(StateTimeout, _)   => goto(Offline)
+    case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping)
+    case Event(StateTimeout, _)      => goto(Offline)
   }
 
   /** Handles the completion of an Activation in every state. */
@@ -340,6 +340,16 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
   initialize()
 
   /**
+   * Handling for if a ping message from an invoker signals that it has been disabled to immediately transition to Offline.
+   *
+   * @param ping
+   * @return
+   */
+  private def goOfflineIfDisabled(ping: PingMessage) = {
+    if (ping.invokerEnabled) stay else goto(Offline)
+  }
+
+  /**
    * Handling for active acks. This method saves the result (successful or unsuccessful)
    * into an RingBuffer and checks, if the InvokerActor has to be changed to UnHealthy.
    *
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
index d372be7..f2c4e56 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
@@ -47,6 +47,8 @@ class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, sys
           invoker.enable()
         } ~ (path("disable") & post) {
           invoker.disable()
+        } ~ (path("isEnabled") & get) {
+          invoker.isEnabled()
         }
       case _ => terminate(StatusCodes.Unauthorized)
     }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index 8158fd3..9087bbd 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -34,11 +34,12 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
 import org.apache.openwhisk.core.containerpool.v2._
 import org.apache.openwhisk.core.database.{UserContext, _}
 import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{containerPrefix}
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
 import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
 import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
 import org.apache.openwhisk.core.etcd.EtcdType._
 import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
 import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
 import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
 import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
@@ -402,6 +403,10 @@ class FPCInvokerReactive(config: WhiskConfig,
     complete("Successfully disabled invoker")
   }
 
+  override def isEnabled(): Route = {
+    complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).serialize())
+  }
+
   override def backfillPrewarm(): Route = {
     pool ! AdjustPrewarmedContainer
     complete("backfilling prewarm container")
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
index f1b8e8c..a3b800e 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
@@ -47,6 +47,8 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP
           invoker.enable()
         } ~ (path("disable") & post) {
           invoker.disable()
+        } ~ (path("isEnabled") & get) {
+          invoker.isEnabled()
         }
       case _ => terminate(StatusCodes.Unauthorized)
     }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 6de751f..89656c9 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -35,6 +35,7 @@ import org.apache.openwhisk.spi.{Spi, SpiLoader}
 import org.apache.openwhisk.utils.ExecutionContextFactory
 import pureconfig._
 import pureconfig.generic.auto._
+import spray.json._
 
 import scala.concurrent.duration._
 import scala.concurrent.{Await, ExecutionContext, Future}
@@ -74,6 +75,15 @@ object Invoker {
 
   val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
 
+  object InvokerEnabled extends DefaultJsonProtocol {
+    def parseJson(string: String) = Try(serdes.read(string.parseJson))
+    implicit val serdes = jsonFormat(InvokerEnabled.apply _, "enabled")
+  }
+
+  case class InvokerEnabled(isEnabled: Boolean) {
+    def serialize(): String = InvokerEnabled.serdes.write(this).compactPrint
+  }
+
   /**
    * An object which records the environment variables required for this component to run.
    */
@@ -220,6 +230,7 @@ trait InvokerProvider extends Spi {
 trait InvokerCore {
   def enable(): Route
   def disable(): Route
+  def isEnabled(): Route
   def backfillPrewarm(): Route
 }
 
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index e32ece2..31d1f22 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -19,9 +19,8 @@ package org.apache.openwhisk.core.invoker
 
 import java.nio.charset.StandardCharsets
 import java.time.Instant
-
 import akka.Done
-import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
 import akka.event.Logging.InfoLevel
 import akka.http.scaladsl.server.Directives.complete
 import akka.http.scaladsl.server.Route
@@ -34,6 +33,7 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
 import org.apache.openwhisk.core.database.{UserContext, _}
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.spi.SpiLoader
@@ -46,7 +46,6 @@ import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
 object InvokerReactive extends InvokerProvider {
-
   override def instance(
     config: WhiskConfig,
     instance: InvokerInstanceId,
@@ -293,18 +292,40 @@ class InvokerReactive(
   }
 
   private val healthProducer = msgProvider.getProducer(config)
-  Scheduler.scheduleWaitAtMost(1.seconds)(() => {
-    healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {
+
+  private def getHealthScheduler: ActorRef =
+    Scheduler.scheduleWaitAtMost(1.seconds)(() => pingController(isEnabled = true))
+
+  private def pingController(isEnabled: Boolean) = {
+    healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance, isEnabled = Some(isEnabled))).andThen {
       case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
     }
-  })
+  }
+
+  private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler)
 
   override def enable(): Route = {
-    complete("not supported")
+    if (healthScheduler.isEmpty) {
+      healthScheduler = Some(getHealthScheduler)
+      complete(s"${instance.toString} is now enabled.")
+    } else {
+      complete(s"${instance.toString} is already enabled.")
+    }
   }
 
   override def disable(): Route = {
-    complete("not supported")
+    pingController(isEnabled = false)
+    if (healthScheduler.nonEmpty) {
+      actorSystem.stop(healthScheduler.get)
+      healthScheduler = None
+      complete(s"${instance.toString} is now disabled.")
+    } else {
+      complete(s"${instance.toString} is already disabled.")
+    }
+  }
+
+  override def isEnabled(): Route = {
+    complete(InvokerEnabled(healthScheduler.nonEmpty).serialize())
   }
 
   override def backfillPrewarm(): Route = {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
index 1095128..57cb976 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -21,8 +21,10 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
 import akka.http.scaladsl.model.headers.BasicHttpCredentials
 import akka.http.scaladsl.server.Route
 import akka.http.scaladsl.testkit.ScalatestRouteTest
+import akka.http.scaladsl.unmarshalling.Unmarshal
 import common.StreamLogging
 import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
 import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
 import org.apache.openwhisk.http.BasicHttpService
 import org.junit.runner.RunWith
@@ -76,6 +78,17 @@ class DefaultInvokerServerTests
     }
   }
 
+  it should "check if invoker is enabled" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      Unmarshal(responseEntity).to[String].map(response => {
+        InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
+      })
+    }
+  }
+
   it should "not enable invoker with invalid credential" in {
     implicit val tid = transid()
     val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
@@ -130,6 +143,10 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService {
     complete("")
   }
 
+  override def isEnabled(): Route = {
+    complete(InvokerEnabled(true).serialize())
+  }
+
   override def backfillPrewarm(): Route = {
     complete("")
   }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
index e387cd6..e7ab02e 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -21,8 +21,10 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
 import akka.http.scaladsl.model.headers.BasicHttpCredentials
 import akka.http.scaladsl.server.Route
 import akka.http.scaladsl.testkit.ScalatestRouteTest
+import akka.http.scaladsl.unmarshalling.Unmarshal
 import common.StreamLogging
 import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
 import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore}
 import org.apache.openwhisk.http.BasicHttpService
 import org.junit.runner.RunWith
@@ -76,6 +78,17 @@ class FPCInvokerServerTests
     }
   }
 
+  it should "check if invoker is enabled" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      Unmarshal(responseEntity).to[String].map(response => {
+        InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
+      })
+    }
+  }
+
   it should "not enable invoker with invalid credential" in {
     implicit val tid = transid()
     val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
@@ -129,6 +142,10 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService {
     complete("")
   }
 
+  override def isEnabled(): Route = {
+    complete(InvokerEnabled(true).serialize())
+  }
+
   override def backfillPrewarm(): Route = {
     complete("")
   }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index a4a6145..463005d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -364,6 +364,18 @@ class InvokerSupervisionTests
     }
   }
 
+  // unhealthy -> offline
+  // offline -> off
+  it should "go offline when unhealthy and disabled invoker ping received and stay offline if disabled ping received while offline" in {
+    val invoker =
+      TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
+    invoker.stateName shouldBe Unhealthy
+    invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
+    invoker.stateName shouldBe Offline
+    invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
+    invoker.stateName shouldBe Offline
+  }
+
   it should "start timer to send test actions when unhealthy" in {
     val invoker =
       TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
@@ -382,6 +394,28 @@ class InvokerSupervisionTests
     invoker.isTimerActive(InvokerActor.timerName) shouldBe false
   }
 
+  // healthy -> offline
+  it should "go offline from healthy immediately when disabled invoker ping received" in {
+    val invoker =
+      TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
+    invoker.stateName shouldBe Unhealthy
+
+    invoker.isTimerActive(InvokerActor.timerName) shouldBe true
+
+    // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
+    (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
+      invoker ! InvocationFinishedMessage(
+        InvokerInstanceId(0, userMemory = defaultUserMemory),
+        InvocationFinishedResult.Success)
+    }
+    invoker.stateName shouldBe Healthy
+
+    invoker.isTimerActive(InvokerActor.timerName) shouldBe false
+
+    invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
+    invoker.stateName shouldBe Offline
+  }
+
   it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {
     val invoker0 = TestProbe()
     val children = mutable.Queue(invoker0.ref)