You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/07/26 11:45:19 UTC
[incubator-openwhisk] branch master updated: Throttle the system
based on active-ack timeouts. (#3875)
This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 9dd34f2 Throttle the system based on active-ack timeouts. (#3875)
9dd34f2 is described below
commit 9dd34f2f7f82a52d8e2400559781626bda8b8d02
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Thu Jul 26 13:45:16 2018 +0200
Throttle the system based on active-ack timeouts. (#3875)
Today, we have an arbitrary system-wide limit of maximum concurrent connections. In general that is fine, but it doesn't have a direct correlation to what's actually happening in the system.
This adds a new state to each monitored invoker: Overloaded. An invoker will go into overloaded state if active-acks are starting to timeout. Eventually, if the system is really overloaded, all Invokers will be in overloaded state which will cause the loadbalancer to return a failure. This failure now results in a 503 - System overloaded message back to the user.
---
ansible/README.md | 2 -
ansible/group_vars/all | 1 -
ansible/roles/controller/tasks/deploy.yml | 2 -
ansible/templates/whisk.properties.j2 | 1 -
.../src/main/scala/whisk/common/Logging.scala | 4 +-
.../src/main/scala/whisk/common/RingBuffer.scala | 2 +-
.../src/main/scala/whisk/core/WhiskConfig.scala | 2 -
.../main/scala/whisk/core/controller/Actions.scala | 4 +
.../scala/whisk/core/controller/Controller.scala | 4 +-
.../scala/whisk/core/controller/WebActions.scala | 5 +
.../core/entitlement/ActivationThrottler.scala | 19 +--
.../scala/whisk/core/entitlement/Entitlement.scala | 29 +---
.../core/loadBalancer/InvokerSupervision.scala | 176 ++++++++++++---------
.../ShardingContainerPoolBalancer.scala | 49 ++++--
tests/performance/preparation/deploy.sh | 2 +-
.../test/InvokerSupervisionTests.scala | 67 ++++++--
.../test/ShardingContainerPoolBalancerTests.scala | 3 +-
17 files changed, 211 insertions(+), 161 deletions(-)
diff --git a/ansible/README.md b/ansible/README.md
index f4f147c..ec7d086 100644
--- a/ansible/README.md
+++ b/ansible/README.md
@@ -348,12 +348,10 @@ The default system throttling limits are configured in this file [./group_vars/a
limits:
invocationsPerMinute: "{{ limit_invocations_per_minute | default(60) }}"
concurrentInvocations: "{{ limit_invocations_concurrent | default(30) }}"
- concurrentInvocationsSystem: "{{ limit_invocations_concurrent_system | default(5000) }}"
firesPerMinute: "{{ limit_fires_per_minute | default(60) }}"
sequenceMaxLength: "{{ limit_sequence_max_length | default(50) }}"
```
- The `limits.invocationsPerMinute` represents the allowed namespace action invocations per minute.
- The `limits.concurrentInvocations` represents the maximum concurrent invocations allowed per namespace.
-- The `limits.concurrentInvocationsSystem` represents the maximum concurrent invocations the system will allow across all namespaces.
- The `limits.firesPerMinute` represents the allowed namespace trigger firings per minute.
- The `limits.sequenceMaxLength` represents the maximum length of a sequence action.
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index aa32ede..2114630 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -53,7 +53,6 @@ runtimesManifest: "{{ runtimes_manifest | default(lookup('file', openwhisk_home
limits:
invocationsPerMinute: "{{ limit_invocations_per_minute | default(60) }}"
concurrentInvocations: "{{ limit_invocations_concurrent | default(30) }}"
- concurrentInvocationsSystem: "{{ limit_invocations_concurrent_system | default(5000) }}"
firesPerMinute: "{{ limit_fires_per_minute | default(60) }}"
sequenceMaxLength: "{{ limit_sequence_max_length | default(50) }}"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 92c576d..11d7269 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -178,8 +178,6 @@
"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
- "LIMITS_ACTIONS_INVOKES_CONCURRENTINSYSTEM":
- "{{ limits.concurrentInvocationsSystem }}"
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
"LIMITS_ACTIONS_SEQUENCE_MAXLENGTH": "{{ limits.sequenceMaxLength }}"
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index bfdba3b..ce3f2db 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -35,7 +35,6 @@ runtimes.manifest={{ runtimesManifest | to_json }}
limits.actions.invokes.perMinute={{ limits.invocationsPerMinute }}
limits.actions.invokes.concurrent={{ limits.concurrentInvocations }}
-limits.actions.invokes.concurrentInSystem={{ limits.concurrentInvocationsSystem }}
limits.triggers.fires.perMinute={{ limits.firesPerMinute }}
limits.actions.sequence.maxLength={{ limits.sequenceMaxLength }}
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 17bc2d3..56ab754 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -272,8 +272,8 @@ object LoggingMarkers {
def INVOKER_STARTUP(i: Int) = LogMarkerToken(invoker, s"startup$i", count)
// Check invoker healthy state from loadbalancer
- val LOADBALANCER_INVOKER_OFFLINE = LogMarkerToken(loadbalancer, "invokerOffline", count)
- val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count)
+ def LOADBALANCER_INVOKER_STATUS_CHANGE(state: String) =
+ LogMarkerToken(loadbalancer, "invokerState", count, Some(state))
val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, "activations", count)
def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: ControllerInstanceId) =
diff --git a/common/scala/src/main/scala/whisk/common/RingBuffer.scala b/common/scala/src/main/scala/whisk/common/RingBuffer.scala
index 4f5a6c7..8c4713d 100644
--- a/common/scala/src/main/scala/whisk/common/RingBuffer.scala
+++ b/common/scala/src/main/scala/whisk/common/RingBuffer.scala
@@ -28,5 +28,5 @@ class RingBuffer[T](size: Int) {
def add(el: T) = inner.add(el)
- def toList() = inner.toArray().asInstanceOf[Array[T]].toList
+ def toList = inner.toArray().asInstanceOf[Array[T]].toList
}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index fd6eeec..7396c02 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -78,7 +78,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
val actionInvokeConcurrentLimit = this(WhiskConfig.actionInvokeConcurrentLimit)
val triggerFirePerMinuteLimit = this(WhiskConfig.triggerFirePerMinuteLimit)
- val actionInvokeSystemOverloadLimit = this(WhiskConfig.actionInvokeSystemOverloadLimit)
val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
}
@@ -189,7 +188,6 @@ object WhiskConfig {
val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
- val actionInvokeSystemOverloadLimit = "limits.actions.invokes.concurrentInSystem"
val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
val controllerSeedNodes = "akka.cluster.seed.nodes"
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index 4bfdb65..12c8ec6 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -45,6 +45,7 @@ import whisk.http.Messages
import whisk.http.Messages._
import whisk.core.entitlement.Resource
import whisk.core.entitlement.Collection
+import whisk.core.loadBalancer.LoadBalancerException
/**
* A singleton object which defines the properties that must be present in a configuration
@@ -280,6 +281,9 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
case Failure(RejectRequest(code, message)) =>
logging.debug(this, s"[POST] action rejected with code $code: $message")
terminate(code, message)
+ case Failure(t: LoadBalancerException) =>
+ logging.error(this, s"[POST] failed in loadbalancer: ${t.getMessage}")
+ terminate(ServiceUnavailable)
case Failure(t: Throwable) =>
logging.error(this, s"[POST] action activation failed: ${t.getMessage}")
terminate(InternalServerError)
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 8167a13..c130727 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -42,7 +42,7 @@ import whisk.core.entitlement._
import whisk.core.entity._
import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity.ExecManifest.Runtimes
-import whisk.core.loadBalancer.{Healthy, LoadBalancerProvider}
+import whisk.core.loadBalancer.{InvokerState, LoadBalancerProvider}
import whisk.http.BasicHttpService
import whisk.http.BasicRasService
import whisk.spi.SpiLoader
@@ -151,7 +151,7 @@ class Controller(val instance: ControllerInstanceId,
complete {
loadBalancer
.invokerHealth()
- .map(_.count(_.status == Healthy).toJson)
+ .map(_.count(_.status == InvokerState.Healthy).toJson)
}
}
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
index f373a1f..e195561 100644
--- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
@@ -52,6 +52,7 @@ import whisk.core.controller.actions.PostActionActivation
import whisk.core.database._
import whisk.core.entity._
import whisk.core.entity.types._
+import whisk.core.loadBalancer.LoadBalancerException
import whisk.http.ErrorResponse.terminate
import whisk.http.Messages
import whisk.http.LenientSprayJsonSupport._
@@ -673,6 +674,10 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
case Failure(t: RejectRequest) => terminate(t.code, t.message)
+ case Failure(t: LoadBalancerException) =>
+ logging.error(this, s"failed in loadbalancer: $t")
+ terminate(ServiceUnavailable)
+
case Failure(t) =>
logging.error(this, s"exception in completeRequest: $t")
terminate(InternalServerError)
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 92d5434..b48a4dd 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -30,14 +30,11 @@ import scala.concurrent.{ExecutionContext, Future}
*
* @param loadBalancer contains active quotas
* @param concurrencyLimit a calculated limit relative to the user using the system
- * @param systemOverloadLimit the limit when the system is considered overloaded
*/
-class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity => Int, systemOverloadLimit: Int)(
+class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity => Int)(
implicit logging: Logging,
executionContext: ExecutionContext) {
- logging.info(this, s"systemOverloadLimit = $systemOverloadLimit")(TransactionId.controller)
-
/**
* Checks whether the operation should be allowed to proceed.
*/
@@ -50,20 +47,6 @@ class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity
ConcurrentRateLimit(concurrentActivations, currentLimit)
}
}
-
- /**
- * Checks whether the system is in a generally overloaded state.
- */
- def isOverloaded()(implicit tid: TransactionId): Future[Boolean] = {
- loadBalancer.totalActiveActivations.map { concurrentActivations =>
- val overloaded = concurrentActivations > systemOverloadLimit
- if (overloaded)
- logging.info(
- this,
- s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
- overloaded
- }
- }
}
sealed trait RateLimit {
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 078530a..a553357 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -35,7 +35,6 @@ import whisk.core.entity._
import whisk.core.loadBalancer.{LoadBalancer, ShardingContainerPoolBalancer}
import whisk.http.ErrorResponse
import whisk.http.Messages
-import whisk.http.Messages._
import whisk.core.connector.MessagingProvider
import whisk.spi.SpiLoader
import whisk.spi.Spi
@@ -74,8 +73,7 @@ protected[core] object EntitlementProvider {
val requiredProperties = Map(
WhiskConfig.actionInvokePerMinuteLimit -> null,
WhiskConfig.actionInvokeConcurrentLimit -> null,
- WhiskConfig.triggerFirePerMinuteLimit -> null,
- WhiskConfig.actionInvokeSystemOverloadLimit -> null)
+ WhiskConfig.triggerFirePerMinuteLimit -> null)
}
/**
@@ -148,8 +146,7 @@ protected[core] abstract class EntitlementProvider(
private val concurrentInvokeThrottler =
new ActivationThrottler(
loadBalancer,
- activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations),
- config.actionInvokeSystemOverloadLimit.toInt)
+ activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations))
private val messagingProvider = SpiLoader.get[MessagingProvider]
private val eventProducer = messagingProvider.getProducer(this.config)
@@ -196,8 +193,7 @@ protected[core] abstract class EntitlementProvider(
protected[core] def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = {
logging.debug(this, s"checking user '${user.subject}' has not exceeded activation quota")
- checkSystemOverload(ACTIVATE)
- .flatMap(_ => checkThrottleOverload(Future.successful(invokeRateThrottler.check(user)), user))
+ checkThrottleOverload(Future.successful(invokeRateThrottler.check(user)), user)
.flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user), user))
}
@@ -257,8 +253,7 @@ protected[core] abstract class EntitlementProvider(
val throttleCheck =
if (noThrottle) Future.successful(())
else
- checkSystemOverload(right)
- .flatMap(_ => checkUserThrottle(user, right, resources))
+ checkUserThrottle(user, right, resources)
.flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
throttleCheck
.flatMap(_ => checkPrivilege(user, right, resources))
@@ -312,22 +307,6 @@ protected[core] abstract class EntitlementProvider(
}
/**
- * Limits activations if the system is overloaded.
- *
- * @param right the privilege, if ACTIVATE then check quota else return None
- * @return future completing successfully if system is not overloaded else failing with a rejection
- */
- protected def checkSystemOverload(right: Privilege)(implicit transid: TransactionId): Future[Unit] = {
- concurrentInvokeThrottler.isOverloaded.flatMap { isOverloaded =>
- val systemOverload = right == ACTIVATE && isOverloaded
- if (systemOverload) {
- logging.error(this, "system is overloaded")
- Future.failed(RejectRequest(TooManyRequests, systemOverloaded))
- } else Future.successful(())
- }
- }
-
- /**
* Limits activations if subject exceeds their own limits.
* If the requested right is an activation, the set of resources must contain an activation of an action or filter to be throttled.
* While it is possible for the set of resources to contain more than one action or trigger, the plurality is ignored and treated
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 15646b8..223b747 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -20,7 +20,7 @@ package whisk.core.loadBalancer
import java.nio.charset.StandardCharsets
import scala.collection.immutable
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
@@ -45,19 +45,47 @@ case object GetStatus
case object Tick
// States an Invoker can be in
-sealed trait InvokerState { val asString: String }
-case object Offline extends InvokerState { val asString = "down" }
-case object Healthy extends InvokerState { val asString = "up" }
-case object UnHealthy extends InvokerState { val asString = "unhealthy" }
+sealed trait InvokerState {
+ val asString: String
+ val isUsable: Boolean
+}
+
+object InvokerState {
+ // Invokers in this state can be used to schedule workload to
+ sealed trait Usable extends InvokerState { val isUsable = true }
+ // No workload should be scheduled to invokers in this state
+ sealed trait Unusable extends InvokerState { val isUsable = false }
+
+ // A completely healthy invoker, pings arriving fine, no system errors
+ case object Healthy extends Usable { val asString = "up" }
+ // Pings are arriving fine, the invoker returns system errors though
+ case object Unhealthy extends Unusable { val asString = "unhealthy" }
+ // Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
+ case object Unresponsible extends Unusable { val asString = "unresponsible" }
+ // Pings are not arriving for this invoker
+ case object Offline extends Unusable { val asString = "down" }
+}
+
+// Possible answers of an activation
+sealed trait InvocationFinishedResult
+object InvocationFinishedResult {
+ // The activation could be successfully executed from the system's point of view. That includes user- and application
+ // errors
+ case object Success extends InvocationFinishedResult
+ // The activation could not be executed because of a system error
+ case object SystemError extends InvocationFinishedResult
+ // The active-ack did not arrive before it timed out
+ case object Timeout extends InvocationFinishedResult
+}
case class ActivationRequest(msg: ActivationMessage, invoker: InvokerInstanceId)
-case class InvocationFinishedMessage(invokerInstance: InvokerInstanceId, successful: Boolean)
+case class InvocationFinishedMessage(invokerInstance: InvokerInstanceId, result: InvocationFinishedResult)
// Sent to a monitor if the state changed
case class CurrentInvokerPoolState(newState: IndexedSeq[InvokerHealth])
// Data stored in the Invoker
-final case class InvokerInfo(buffer: RingBuffer[Boolean])
+final case class InvokerInfo(buffer: RingBuffer[InvocationFinishedResult])
/**
* Actor representing a pool of invokers
@@ -76,10 +104,12 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef
monitor: Option[ActorRef])
extends Actor {
- implicit val transid = TransactionId.invokerHealth
- implicit val logging = new AkkaLogging(context.system.log)
- implicit val timeout = Timeout(5.seconds)
- implicit val ec = context.dispatcher
+ import InvokerState._
+
+ implicit val transid: TransactionId = TransactionId.invokerHealth
+ implicit val logging: Logging = new AkkaLogging(context.system.log)
+ implicit val timeout: Timeout = Timeout(5.seconds)
+ implicit val ec: ExecutionContext = context.dispatcher
// State of the actor. Mutable vars with immutable collections prevents closures or messages
// from leaking the state for external mutation
@@ -87,7 +117,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef
var refToInstance = immutable.Map.empty[ActorRef, InvokerInstanceId]
var status = IndexedSeq[InvokerHealth]()
- def receive = {
+ def receive: Receive = {
case p: PingMessage =>
val invoker = instanceToRef.getOrElse(p.instance, registerInvoker(p.instance))
instanceToRef = instanceToRef.updated(p.instance, invoker)
@@ -116,15 +146,15 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef
case msg: ActivationRequest => sendActivationToInvoker(msg.msg, msg.invoker).pipeTo(sender)
}
- def logStatus() = {
+ def logStatus(): Unit = {
monitor.foreach(_ ! CurrentInvokerPoolState(status))
val pretty = status.map(i => s"${i.id.toInt} -> ${i.status}")
logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}")
}
/** Receive Ping messages from invokers. */
- val pingPollDuration = 1.second
- val invokerPingFeed = context.system.actorOf(Props {
+ val pingPollDuration: FiniteDuration = 1.second
+ val invokerPingFeed: ActorRef = context.system.actorOf(Props {
new MessageFeed(
"ping",
logging,
@@ -149,7 +179,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef
}
/** Pads a list to a given length using the given function to compute entries */
- def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A) = list ++ (list.size until n).map(f)
+ def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A): IndexedSeq[A] = list ++ (list.size until n).map(f)
// Register a new invoker
def registerInvoker(instanceId: InvokerInstanceId): ActorRef = {
@@ -170,9 +200,9 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef
object InvokerPool {
private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = {
- implicit val tid = TransactionId.loadbalancer
- implicit val ec = db.executionContext
- implicit val logging = db.logging
+ implicit val tid: TransactionId = TransactionId.loadbalancer
+ implicit val ec: ExecutionContext = db.executionContext
+ implicit val logging: Logging = db.logging
WhiskAction
.get(db, action.docid)
@@ -214,12 +244,12 @@ object InvokerPool {
def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef,
p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
pc: MessageConsumer,
- m: Option[ActorRef] = None) = {
+ m: Option[ActorRef] = None): Props = {
Props(new InvokerPool(f, p, pc, m))
}
/** A stub identity for invoking the test action. This does not need to be a valid identity. */
- val healthActionIdentity = {
+ val healthActionIdentity: Identity = {
val whiskSystem = "whisk.system"
val uuid = UUID()
Identity(
@@ -230,13 +260,13 @@ object InvokerPool {
}
/** An action to use for monitoring invoker health. */
- def healthAction(i: ControllerInstanceId) = ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:6").map {
- manifest =>
+ def healthAction(i: ControllerInstanceId): Option[WhiskAction] =
+ ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:6").map { manifest =>
new WhiskAction(
namespace = healthActionIdentity.namespace.name.toPath,
name = EntityName(s"invokerHealthTestAction${i.asString}"),
exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None))
- }
+ }
}
/**
@@ -247,47 +277,47 @@ object InvokerPool {
*/
class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId)
extends FSM[InvokerState, InvokerInfo] {
- implicit val transid = TransactionId.invokerHealth
- implicit val logging = new AkkaLogging(context.system.log)
+
+ import InvokerState._
+
+ implicit val transid: TransactionId = TransactionId.invokerHealth
+ implicit val logging: Logging = new AkkaLogging(context.system.log)
val name = s"invoker${invokerInstance.toInt}"
- val healthyTimeout = 10.seconds
+ val healthyTimeout: FiniteDuration = 10.seconds
// This is done at this point to not intermingle with the state-machine
// especially their timeouts.
def customReceive: Receive = {
case _: RecordMetadata => // The response of putting testactions to the MessageProducer. We don't have to do anything with them.
}
- override def receive = customReceive.orElse(super.receive)
+ override def receive: Receive = customReceive.orElse(super.receive)
- /**
- * Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy.
- */
- startWith(UnHealthy, InvokerInfo(new RingBuffer[Boolean](InvokerActor.bufferSize)))
+ /** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
+ startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
- /**
- * An Offline invoker represents an existing but broken
- * invoker. This means, that it does not send pings anymore.
- */
+ /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
when(Offline) {
- case Event(_: PingMessage, _) => goto(UnHealthy)
+ case Event(_: PingMessage, _) => goto(Unhealthy)
}
- /**
- * An UnHealthy invoker represents an invoker that was not able to handle actions successfully.
- */
- when(UnHealthy, stateTimeout = healthyTimeout) {
+ // To be used for all states that should send test actions to reverify the invoker
+ val healthPingingState: StateFunction = {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
- case Event(Tick, info) => {
+ case Event(Tick, _) =>
invokeTestAction()
stay
- }
}
+ /** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
+ when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState)
+
+ /** An Unresponsible invoker represents an invoker that is not responding with active acks in a timely manner */
+ when(Unresponsible, stateTimeout = healthyTimeout)(healthPingingState)
+
/**
- * A Healthy invoker is characterized by continuously getting
- * pings. It will go offline if that state is not confirmed
+ * A Healthy invoker is characterized by continuously getting pings. It will go offline if that state is not confirmed
* for 20 seconds.
*/
when(Healthy, stateTimeout = healthyTimeout) {
@@ -295,70 +325,68 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
case Event(StateTimeout, _) => goto(Offline)
}
- /**
- * Handle the completion of an Activation in every state.
- */
+ /** Handle the completion of an Activation in every state. */
whenUnhandled {
- case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.successful, info.buffer)
+ case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer)
}
/** Logging on Transition change */
onTransition {
- case _ -> Offline =>
- transid.mark(
- this,
- LoggingMarkers.LOADBALANCER_INVOKER_OFFLINE,
- s"$name is offline",
- akka.event.Logging.WarningLevel)
- case _ -> UnHealthy =>
+ case _ -> newState if !newState.isUsable =>
transid.mark(
this,
- LoggingMarkers.LOADBALANCER_INVOKER_UNHEALTHY,
- s"$name is unhealthy",
+ LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(newState.asString),
+ s"$name is ${newState.asString}",
akka.event.Logging.WarningLevel)
- case _ -> Healthy => logging.info(this, s"$name is healthy")
+ case _ -> newState if newState.isUsable => logging.info(this, s"$name is ${newState.asString}")
}
- /** Scheduler to send test activations when the invoker is unhealthy. */
- onTransition {
- case _ -> UnHealthy => {
+ // To be used for all states that should send test actions to reverify the invoker
+ def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
+ case _ -> `state` =>
invokeTestAction()
- setTimer(InvokerActor.timerName, Tick, 1.minute, true)
- }
- case UnHealthy -> _ => cancelTimer(InvokerActor.timerName)
+ setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
+ case `state` -> _ => cancelTimer(InvokerActor.timerName)
}
+ onTransition(healthPingingTransitionHandler(Unhealthy))
+ onTransition(healthPingingTransitionHandler(Unresponsible))
+
initialize()
/**
* Handling for active acks. This method saves the result (successful or unsuccessful)
* into an RingBuffer and checks, if the InvokerActor has to be changed to UnHealthy.
*
- * @param wasActivationSuccessful: result of Activation
+ * @param result: result of Activation
* @param buffer to be used
*/
- private def handleCompletionMessage(wasActivationSuccessful: Boolean, buffer: RingBuffer[Boolean]) = {
- buffer.add(wasActivationSuccessful)
+ private def handleCompletionMessage(result: InvocationFinishedResult,
+ buffer: RingBuffer[InvocationFinishedResult]) = {
+ buffer.add(result)
// If the action is successful it seems like the Invoker is Healthy again. So we execute immediately
// a new test action to remove the errors out of the RingBuffer as fast as possible.
// The actions that arrive while the invoker is unhealthy are most likely health actions.
// It is possible they are normal user actions as well. This can happen if such actions were in the
// invoker queue or in progress while the invoker's status flipped to Unhealthy.
- if (wasActivationSuccessful && stateName == UnHealthy) {
+ if (result == InvocationFinishedResult.Success && stateName == Unhealthy) {
invokeTestAction()
}
// Stay in online if the activations was successful.
// Stay in offline, if an activeAck reaches the controller.
- if ((stateName == Healthy && wasActivationSuccessful) || stateName == Offline) {
+ if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) {
stay
} else {
- // Goto UnHealthy if there are more errors than accepted in buffer, else goto Healthy
- if (buffer.toList.count(_ == true) >= InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance) {
- gotoIfNotThere(Healthy)
+ val entries = buffer.toList
+ // Goto Unhealthy or Unresponsible respectively if there are more errors than accepted in buffer, else goto Healthy
+ if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) {
+ gotoIfNotThere(Unhealthy)
+ } else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) {
+ gotoIfNotThere(Unresponsible)
} else {
- gotoIfNotThere(UnHealthy)
+ gotoIfNotThere(Healthy)
}
}
}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index dd84af9..3cdce1d 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -244,7 +244,17 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
entry.promise.future
}
}
- .getOrElse(Future.failed(LoadBalancerException("No invokers available")))
+ .getOrElse {
+ // report the state of all invokers
+ val actionType = if (!action.exec.pull) "non-blackbox" else "blackbox"
+ val invokerStates = invokersToUse.foldLeft(Map.empty[InvokerState, Int]) { (agg, curr) =>
+ val count = agg.getOrElse(curr.status, 0) + 1
+ agg + (curr.status -> count)
+ }
+
+ logging.error(this, s"failed to schedule $actionType action, invokers to use: $invokerStates")
+ Future.failed(LoadBalancerException("No invokers available"))
+ }
}
/** 2. Update local state with the to be executed activation */
@@ -353,8 +363,18 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
invoker: InvokerInstanceId): Unit = {
val aid = response.fold(l => l, r => r.activationId)
- // treat left as success (as it is the result of a message exceeding the bus limit)
- val isSuccess = response.fold(_ => true, r => !r.response.isWhiskError)
+ val invocationResult = if (forced) {
+ InvocationFinishedResult.Timeout
+ } else {
+ // If the response contains a system error, report that, otherwise report Success
+ // Left generally is considered a Success, since that could be a message not fitting into Kafka
+ val isSystemError = response.fold(_ => false, _.response.isWhiskError)
+ if (isSystemError) {
+ InvocationFinishedResult.SystemError
+ } else {
+ InvocationFinishedResult.Success
+ }
+ }
activations.remove(aid) match {
case Some(entry) =>
@@ -373,16 +393,21 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
// Active acks that are received here are strictly from user actions - health actions are not part of
// the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
- invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
+ invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
+ case None if tid == TransactionId.invokerHealth =>
+ // Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result
+ // is important to pass to the invokerPool because they are used to determine if the invoker can be considered
+ // healthy again.
+ logging.info(this, s"received active ack for health action on $invoker")(tid)
+ invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if !forced =>
- // the entry has already been removed but we receive an active ack for this activation Id.
- // This happens for health actions, because they don't have an entry in Loadbalancerdata or
- // for activations that already timed out.
- invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
+ // Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack).
+ // The result is ignored because a timeout has already been reported to the invokerPool per the force.
logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
case None =>
- // the entry has already been removed by an active ack. This part of the code is reached by the timeout.
- // As the active ack is already processed we don't have to do anything here.
+ // The entry has already been removed by an active ack. This part of the code is reached by the timeout and can
+ // happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active
+ // ack canceled the timer). As the active ack is already processed we don't have to do anything here.
logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
}
}
@@ -446,12 +471,12 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
if (numInvokers > 0) {
val invoker = invokers(index)
// If the current invoker is healthy and we can get a slot
- if (invoker.status == Healthy && dispatched(invoker.id.toInt).tryAcquire()) {
+ if (invoker.status.isUsable && dispatched(invoker.id.toInt).tryAcquire()) {
Some(invoker.id)
} else {
// If we've gone through all invokers
if (stepsDone == numInvokers + 1) {
- val healthyInvokers = invokers.filter(_.status == Healthy)
+ val healthyInvokers = invokers.filter(_.status.isUsable)
if (healthyInvokers.nonEmpty) {
// Choose a healthy invoker randomly
val random = healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id
diff --git a/tests/performance/preparation/deploy.sh b/tests/performance/preparation/deploy.sh
index 6e01097..ba4377e 100755
--- a/tests/performance/preparation/deploy.sh
+++ b/tests/performance/preparation/deploy.sh
@@ -25,7 +25,7 @@ TERM=dumb ./gradlew distDocker -PdockerImagePrefix=testing $GRADLE_PROJS_SKIP
# Deploy Openwhisk
cd $ROOTDIR/ansible
-ANSIBLE_CMD="$ANSIBLE_CMD -e limit_invocations_per_minute=999999 -e limit_invocations_concurrent=999999 -e limit_invocations_concurrent_system=999999 -e controller_client_auth=false"
+ANSIBLE_CMD="$ANSIBLE_CMD -e limit_invocations_per_minute=999999 -e limit_invocations_concurrent=999999 -e controller_client_auth=false"
$ANSIBLE_CMD setup.yml
$ANSIBLE_CMD prereq.yml
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 36d0e42..7537ab0 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -51,13 +51,12 @@ import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity._
import whisk.core.loadBalancer.ActivationRequest
import whisk.core.loadBalancer.GetStatus
-import whisk.core.loadBalancer.Healthy
+import whisk.core.loadBalancer.InvokerState._
+import whisk.core.loadBalancer.InvocationFinishedResult
import whisk.core.loadBalancer.InvocationFinishedMessage
import whisk.core.loadBalancer.InvokerActor
import whisk.core.loadBalancer.InvokerPool
import whisk.core.loadBalancer.InvokerState
-import whisk.core.loadBalancer.Offline
-import whisk.core.loadBalancer.UnHealthy
import whisk.core.loadBalancer.InvokerHealth
import whisk.utils.retry
import whisk.core.connector.test.TestConnector
@@ -163,7 +162,7 @@ class InvokerSupervisionTests
allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Healthy))
// Send message and expect receive in invoker
- val msg = InvocationFinishedMessage(invokerInstance, true)
+ val msg = InvocationFinishedMessage(invokerInstance, InvocationFinishedResult.Success)
supervisor ! msg
invoker.expectMsg(msg)
}
@@ -214,34 +213,68 @@ class InvokerSupervisionTests
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
- pool.expectMsg(CurrentState(invoker, UnHealthy))
+ pool.expectMsg(CurrentState(invoker, Unhealthy))
timeout(invoker)
- pool.expectMsg(Transition(invoker, UnHealthy, Offline))
+ pool.expectMsg(Transition(invoker, Unhealthy, Offline))
invoker ! PingMessage(InvokerInstanceId(0))
- pool.expectMsg(Transition(invoker, Offline, UnHealthy))
+ pool.expectMsg(Transition(invoker, Offline, Unhealthy))
}
}
- // unhealthy -> healthy
+ // unhealthy -> healthy -> unhealthy -> healthy
it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in {
val pool = TestProbe()
val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
- pool.expectMsg(CurrentState(invoker, UnHealthy))
+ pool.expectMsg(CurrentState(invoker, Unhealthy))
+
+ (1 to InvokerActor.bufferSize).foreach { _ =>
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+ }
+ pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
// Fill buffer with errors
(1 to InvokerActor.bufferSize).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), false)
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.SystemError)
+ }
+ pool.expectMsg(Transition(invoker, Healthy, Unhealthy))
+
+ // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
+ (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+ }
+ pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
+ }
+ }
+
+ // unhealthy -> healthy -> overloaded -> healthy
+ it should "goto healthy again, if overloaded and error buffer has enough successful invocations" in {
+ val pool = TestProbe()
+ val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
+
+ within(timeout.duration) {
+ pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
+ pool.expectMsg(CurrentState(invoker, Unhealthy))
+
+ (1 to InvokerActor.bufferSize).foreach { _ =>
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+ }
+ pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
+
+ // Fill buffer with timeouts
+ (1 to InvokerActor.bufferSize).foreach { _ =>
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Timeout)
}
+ pool.expectMsg(Transition(invoker, Healthy, Unresponsible))
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), true)
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
}
- pool.expectMsg(Transition(invoker, UnHealthy, Healthy))
+ pool.expectMsg(Transition(invoker, Unresponsible, Healthy))
}
}
@@ -253,25 +286,25 @@ class InvokerSupervisionTests
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
- pool.expectMsg(CurrentState(invoker, UnHealthy))
+ pool.expectMsg(CurrentState(invoker, Unhealthy))
timeout(invoker)
- pool.expectMsg(Transition(invoker, UnHealthy, Offline))
+ pool.expectMsg(Transition(invoker, Unhealthy, Offline))
invoker ! PingMessage(InvokerInstanceId(0))
- pool.expectMsg(Transition(invoker, Offline, UnHealthy))
+ pool.expectMsg(Transition(invoker, Offline, Unhealthy))
}
}
it should "start timer to send testactions when unhealthy" in {
val invoker = TestFSMRef(new InvokerActor(InvokerInstanceId(0), ControllerInstanceId("0")))
- invoker.stateName shouldBe UnHealthy
+ invoker.stateName shouldBe Unhealthy
invoker.isTimerActive(InvokerActor.timerName) shouldBe true
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), true)
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
}
invoker.stateName shouldBe Healthy
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 96a2cc3..0755b99 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -24,6 +24,7 @@ import org.scalatest.junit.JUnitRunner
import whisk.common.{ForcableSemaphore, TransactionId}
import whisk.core.entity.InvokerInstanceId
import whisk.core.loadBalancer._
+import whisk.core.loadBalancer.InvokerState._
/**
* Unit tests for the ContainerPool object.
@@ -36,7 +37,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
behavior of "ShardingContainerPoolBalancerState"
def healthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Healthy)
- def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), UnHealthy)
+ def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Unhealthy)
def offline(i: Int) = new InvokerHealth(InvokerInstanceId(i), Offline)
def semaphores(count: Int, max: Int): IndexedSeq[ForcableSemaphore] =