You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/02/01 07:43:16 UTC
[incubator-openwhisk] branch master updated: Externalize
InvokerPool initialization logic. (#3238)
This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 27c3e10 Externalize InvokerPool initialization logic. (#3238)
27c3e10 is described below
commit 27c3e10266bbd9e1a0a0e64aa35054c965f3d4bf
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Thu Feb 1 08:43:12 2018 +0100
Externalize InvokerPool initialization logic. (#3238)
This piece of logic clutters the loadbalancer's code for no good reason. We should externalize it.
---
.../core/loadBalancer/ContainerPoolBalancer.scala | 57 +++-------------------
.../core/loadBalancer/InvokerSupervision.scala | 57 ++++++++++++++++++----
2 files changed, 55 insertions(+), 59 deletions(-)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index aed332e..786a94a 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -19,7 +19,7 @@ package whisk.core.loadBalancer
import java.nio.charset.StandardCharsets
-import akka.actor.{ActorRefFactory, ActorSystem, Props}
+import akka.actor.{ActorSystem, Props}
import akka.cluster.Cluster
import akka.pattern.ask
import akka.stream.ActorMaterializer
@@ -29,16 +29,14 @@ import pureconfig._
import whisk.common.{Logging, LoggingMarkers, TransactionId}
import whisk.core.WhiskConfig._
import whisk.core.connector._
-import whisk.core.database.NoDocumentException
import whisk.core.entity._
-import whisk.core.entity.types.EntityStore
import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.spi.SpiLoader
import akka.event.Logging.InfoLevel
import scala.annotation.tailrec
import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
@@ -50,9 +48,6 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
private val lbConfig = loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer)
- /** Used to manage an action for testing invoker health */ /** Used to manage an action for testing invoker health */
- private val entityStore = WhiskEntityStore.datastore(config)
-
/** The execution context for futures */
private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
@@ -168,28 +163,6 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
})
}
- /**
- * Creates or updates a health test action by updating the entity store.
- * This method is intended for use on startup.
- * @return Future that completes successfully iff the action is added to the database
- */
- private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = {
- implicit val tid = TransactionId.loadbalancer
- WhiskAction
- .get(db, action.docid)
- .flatMap { oldAction =>
- WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None)
- }
- .recover {
- case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None)
- }
- .map(_ => {})
- .andThen {
- case Success(_) => logging.info(this, "test action for invoker health now exists")
- case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e")
- }
- }
-
/** Gets a producer which can publish messages to the kafka bus. */
private val messagingProvider = SpiLoader.get[MessagingProvider]
private val messageProducer = messagingProvider.getProducer(config, executionContext)
@@ -216,29 +189,15 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic")
}
}
- private val invokerPool = {
- // Do not create the invokerPool if it is not possible to create the health test action to recover the invokers.
- InvokerPool
- .healthAction(controllerInstance)
- .map {
- // Await the creation of the test action; on failure, this will abort the constructor which should
- // in turn abort the startup of the controller.
- a =>
- Await.result(createTestActionForInvokerHealth(entityStore, a), 1.minute)
- }
- .orElse {
- throw new IllegalStateException(
- "cannot create test action for invoker health because runtime manifest is not valid")
- }
- val maxPingsPerPoll = 128
- val pingConsumer =
- messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = maxPingsPerPoll)
- val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) =>
- f.actorOf(InvokerActor.props(invokerInstance, controllerInstance))
+ private val invokerPool = {
+ InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore(config))
actorSystem.actorOf(
- InvokerPool.props(invokerFactory, (m, i) => sendActivationToInvoker(messageProducer, m, i), pingConsumer))
+ InvokerPool.props(
+ (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
+ (m, i) => sendActivationToInvoker(messageProducer, m, i),
+ messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = 128)))
}
/**
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 13c3a70..0c75176 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -20,29 +20,24 @@ package whisk.core.loadBalancer
import java.nio.charset.StandardCharsets
import scala.collection.immutable
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import org.apache.kafka.clients.producer.RecordMetadata
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.ActorRefFactory
-import akka.actor.FSM
+import akka.actor.{Actor, ActorRef, ActorRefFactory, FSM, Props}
import akka.actor.FSM.CurrentState
import akka.actor.FSM.SubscribeTransitionCallBack
import akka.actor.FSM.Transition
-import akka.actor.Props
import akka.pattern.pipe
import akka.util.Timeout
-import whisk.common.AkkaLogging
-import whisk.common.LoggingMarkers
-import whisk.common.RingBuffer
-import whisk.common.TransactionId
+import whisk.common._
import whisk.core.connector._
+import whisk.core.database.NoDocumentException
import whisk.core.entitlement.Privilege
import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity._
+import whisk.core.entity.types.EntityStore
// Received events
case object GetStatus
@@ -169,6 +164,48 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
}
object InvokerPool {
+ private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = {
+ implicit val tid = TransactionId.loadbalancer
+ implicit val ec = db.executionContext
+ implicit val logging = db.logging
+
+ WhiskAction
+ .get(db, action.docid)
+ .flatMap { oldAction =>
+ WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None)
+ }
+ .recover {
+ case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None)
+ }
+ .map(_ => {})
+ .andThen {
+ case Success(_) => logging.info(this, "test action for invoker health now exists")
+ case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e")
+ }
+ }
+
+ /**
+ * Prepares everything for the health protocol to work (i.e. creates a testaction)
+ *
+ * @param controllerInstance instance of the controller we run in
+ * @param entityStore store to write the action to
+ * @return throws an exception on failure to prepare
+ */
+ def prepare(controllerInstance: InstanceId, entityStore: EntityStore): Unit = {
+ InvokerPool
+ .healthAction(controllerInstance)
+ .map {
+ // Await the creation of the test action; on failure, this will abort the constructor which should
+ // in turn abort the startup of the controller.
+ a =>
+ Await.result(createTestActionForInvokerHealth(entityStore, a), 1.minute)
+ }
+ .orElse {
+ throw new IllegalStateException(
+ "cannot create test action for invoker health because runtime manifest is not valid")
+ }
+ }
+
def props(f: (ActorRefFactory, InstanceId) => ActorRef,
p: (ActivationMessage, InstanceId) => Future[RecordMetadata],
pc: MessageConsumer) = {
--
To stop receiving notification emails like this one, please contact
cbickel@apache.org.