You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2022/03/29 07:34:14 UTC
[openwhisk] branch master updated: add enable/disable invoker support to old scheduler (#5205)
This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 3b6d07a add enable/disable invoker support to old scheduler (#5205)
3b6d07a is described below
commit 3b6d07a18cec3e92ceb5c54558464d1bfdbc0f82
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Tue Mar 29 00:33:56 2022 -0700
add enable/disable invoker support to old scheduler (#5205)
* add enable/disable invoker support to old scheduler and add is enabled route for invoker
* feedback
* fix enable complete
* fail fast invoker to offline with updated ping message
* test compilation
* add tests
* fix tests
Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
.../apache/openwhisk/core/connector/Message.scala | 6 ++--
.../core/loadBalancer/InvokerSupervision.scala | 20 +++++++++---
.../core/invoker/DefaultInvokerServer.scala | 2 ++
.../core/invoker/FPCInvokerReactive.scala | 7 +++-
.../openwhisk/core/invoker/FPCInvokerServer.scala | 2 ++
.../apache/openwhisk/core/invoker/Invoker.scala | 11 +++++++
.../openwhisk/core/invoker/InvokerReactive.scala | 37 +++++++++++++++++-----
.../invoker/test/DefaultInvokerServerTests.scala | 17 ++++++++++
.../core/invoker/test/FPCInvokerServerTests.scala | 17 ++++++++++
.../test/InvokerSupervisionTests.scala | 34 ++++++++++++++++++++
10 files changed, 137 insertions(+), 16 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index 9123747..e823cf1 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -278,14 +278,16 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
}
}
-case class PingMessage(instance: InvokerInstanceId) extends Message {
+case class PingMessage(instance: InvokerInstanceId, isEnabled: Option[Boolean] = None) extends Message {
override def serialize = PingMessage.serdes.write(this).compactPrint
+
+ def invokerEnabled: Boolean = isEnabled.getOrElse(true)
}
object PingMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
- implicit val serdes = jsonFormat(PingMessage.apply _, "name")
+ implicit val serdes = jsonFormat(PingMessage.apply, "name", "isEnabled")
}
trait EventMessageBody extends Message {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index 86ece17..f526da0 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -280,8 +280,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
// To be used for all states that should send test actions to reverify the invoker
val healthPingingState: StateFunction = {
- case Event(_: PingMessage, _) => stay
- case Event(StateTimeout, _) => goto(Offline)
+ case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping)
+ case Event(StateTimeout, _) => goto(Offline)
case Event(Tick, _) =>
invokeTestAction()
stay
@@ -300,7 +300,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
when(Offline) {
- case Event(_: PingMessage, _) => goto(Unhealthy)
+ case Event(ping: PingMessage, _) => if (ping.invokerEnabled) goto(Unhealthy) else stay
}
/** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
@@ -314,8 +314,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
* It will go offline if that state is not confirmed for 20 seconds.
*/
when(Healthy, stateTimeout = healthyTimeout) {
- case Event(_: PingMessage, _) => stay
- case Event(StateTimeout, _) => goto(Offline)
+ case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping)
+ case Event(StateTimeout, _) => goto(Offline)
}
/** Handles the completion of an Activation in every state. */
@@ -340,6 +340,16 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
initialize()
/**
+ * Handling for if a ping message from an invoker signals that it has been disabled to immediately transition to Offline.
+ *
+ * @param ping
+ * @return
+ */
+ private def goOfflineIfDisabled(ping: PingMessage) = {
+ if (ping.invokerEnabled) stay else goto(Offline)
+ }
+
+ /**
* Handling for active acks. This method saves the result (successful or unsuccessful)
* into an RingBuffer and checks, if the InvokerActor has to be changed to UnHealthy.
*
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
index d372be7..f2c4e56 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
@@ -47,6 +47,8 @@ class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, sys
invoker.enable()
} ~ (path("disable") & post) {
invoker.disable()
+ } ~ (path("isEnabled") & get) {
+ invoker.isEnabled()
}
case _ => terminate(StatusCodes.Unauthorized)
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index 8158fd3..9087bbd 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -34,11 +34,12 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.containerpool.v2._
import org.apache.openwhisk.core.database.{UserContext, _}
import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{containerPrefix}
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
@@ -402,6 +403,10 @@ class FPCInvokerReactive(config: WhiskConfig,
complete("Successfully disabled invoker")
}
+ override def isEnabled(): Route = {
+ complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).serialize())
+ }
+
override def backfillPrewarm(): Route = {
pool ! AdjustPrewarmedContainer
complete("backfilling prewarm container")
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
index f1b8e8c..a3b800e 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
@@ -47,6 +47,8 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP
invoker.enable()
} ~ (path("disable") & post) {
invoker.disable()
+ } ~ (path("isEnabled") & get) {
+ invoker.isEnabled()
}
case _ => terminate(StatusCodes.Unauthorized)
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 6de751f..89656c9 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -35,6 +35,7 @@ import org.apache.openwhisk.spi.{Spi, SpiLoader}
import org.apache.openwhisk.utils.ExecutionContextFactory
import pureconfig._
import pureconfig.generic.auto._
+import spray.json._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -74,6 +75,15 @@ object Invoker {
val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+ object InvokerEnabled extends DefaultJsonProtocol {
+ def parseJson(string: String) = Try(serdes.read(string.parseJson))
+ implicit val serdes = jsonFormat(InvokerEnabled.apply _, "enabled")
+ }
+
+ case class InvokerEnabled(isEnabled: Boolean) {
+ def serialize(): String = InvokerEnabled.serdes.write(this).compactPrint
+ }
+
/**
* An object which records the environment variables required for this component to run.
*/
@@ -220,6 +230,7 @@ trait InvokerProvider extends Spi {
trait InvokerCore {
def enable(): Route
def disable(): Route
+ def isEnabled(): Route
def backfillPrewarm(): Route
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index e32ece2..31d1f22 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -19,9 +19,8 @@ package org.apache.openwhisk.core.invoker
import java.nio.charset.StandardCharsets
import java.time.Instant
-
import akka.Done
-import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.server.Directives.complete
import akka.http.scaladsl.server.Route
@@ -34,6 +33,7 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{UserContext, _}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.spi.SpiLoader
@@ -46,7 +46,6 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object InvokerReactive extends InvokerProvider {
-
override def instance(
config: WhiskConfig,
instance: InvokerInstanceId,
@@ -293,18 +292,40 @@ class InvokerReactive(
}
private val healthProducer = msgProvider.getProducer(config)
- Scheduler.scheduleWaitAtMost(1.seconds)(() => {
- healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {
+
+ private def getHealthScheduler: ActorRef =
+ Scheduler.scheduleWaitAtMost(1.seconds)(() => pingController(isEnabled = true))
+
+ private def pingController(isEnabled: Boolean) = {
+ healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance, isEnabled = Some(isEnabled))).andThen {
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
}
- })
+ }
+
+ private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler)
override def enable(): Route = {
- complete("not supported")
+ if (healthScheduler.isEmpty) {
+ healthScheduler = Some(getHealthScheduler)
+ complete(s"${instance.toString} is now enabled.")
+ } else {
+ complete(s"${instance.toString} is already enabled.")
+ }
}
override def disable(): Route = {
- complete("not supported")
+ pingController(isEnabled = false)
+ if (healthScheduler.nonEmpty) {
+ actorSystem.stop(healthScheduler.get)
+ healthScheduler = None
+ complete(s"${instance.toString} is now disabled.")
+ } else {
+ complete(s"${instance.toString} is already disabled.")
+ }
+ }
+
+ override def isEnabled(): Route = {
+ complete(InvokerEnabled(healthScheduler.nonEmpty).serialize())
}
override def backfillPrewarm(): Route = {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
index 1095128..57cb976 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -21,8 +21,10 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.testkit.ScalatestRouteTest
+import akka.http.scaladsl.unmarshalling.Unmarshal
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
import org.apache.openwhisk.http.BasicHttpService
import org.junit.runner.RunWith
@@ -76,6 +78,17 @@ class DefaultInvokerServerTests
}
}
+ it should "check if invoker is enabled" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ Unmarshal(responseEntity).to[String].map(response => {
+ InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
+ })
+ }
+ }
+
it should "not enable invoker with invalid credential" in {
implicit val tid = transid()
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
@@ -130,6 +143,10 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService {
complete("")
}
+ override def isEnabled(): Route = {
+ complete(InvokerEnabled(true).serialize())
+ }
+
override def backfillPrewarm(): Route = {
complete("")
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
index e387cd6..e7ab02e 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -21,8 +21,10 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.testkit.ScalatestRouteTest
+import akka.http.scaladsl.unmarshalling.Unmarshal
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore}
import org.apache.openwhisk.http.BasicHttpService
import org.junit.runner.RunWith
@@ -76,6 +78,17 @@ class FPCInvokerServerTests
}
}
+ it should "check if invoker is enabled" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ Unmarshal(responseEntity).to[String].map(response => {
+ InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
+ })
+ }
+ }
+
it should "not enable invoker with invalid credential" in {
implicit val tid = transid()
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
@@ -129,6 +142,10 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService {
complete("")
}
+ override def isEnabled(): Route = {
+ complete(InvokerEnabled(true).serialize())
+ }
+
override def backfillPrewarm(): Route = {
complete("")
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index a4a6145..463005d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -364,6 +364,18 @@ class InvokerSupervisionTests
}
}
+ // unhealthy -> offline
+ // offline -> off
+ it should "go offline when unhealthy and disabled invoker ping received and stay offline if disabled ping received while offline" in {
+ val invoker =
+ TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
+ invoker.stateName shouldBe Unhealthy
+ invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
+ invoker.stateName shouldBe Offline
+ invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
+ invoker.stateName shouldBe Offline
+ }
+
it should "start timer to send test actions when unhealthy" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
@@ -382,6 +394,28 @@ class InvokerSupervisionTests
invoker.isTimerActive(InvokerActor.timerName) shouldBe false
}
+ // healthy -> offline
+ it should "go offline from healthy immediately when disabled invoker ping received" in {
+ val invoker =
+ TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
+ invoker.stateName shouldBe Unhealthy
+
+ invoker.isTimerActive(InvokerActor.timerName) shouldBe true
+
+ // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
+ (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
+ }
+ invoker.stateName shouldBe Healthy
+
+ invoker.isTimerActive(InvokerActor.timerName) shouldBe false
+
+ invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
+ invoker.stateName shouldBe Offline
+ }
+
it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {
val invoker0 = TestProbe()
val children = mutable.Queue(invoker0.ref)