You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/01 07:43:17 UTC

[GitHub] cbickel closed pull request #3238: Externalize InvokerPool initialization logic.

cbickel closed pull request #3238: Externalize InvokerPool initialization logic.
URL: https://github.com/apache/incubator-openwhisk/pull/3238
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index aed332e95b..786a94a08e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -19,7 +19,7 @@ package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
 
-import akka.actor.{ActorRefFactory, ActorSystem, Props}
+import akka.actor.{ActorSystem, Props}
 import akka.cluster.Cluster
 import akka.pattern.ask
 import akka.stream.ActorMaterializer
@@ -29,16 +29,14 @@ import pureconfig._
 import whisk.common.{Logging, LoggingMarkers, TransactionId}
 import whisk.core.WhiskConfig._
 import whisk.core.connector._
-import whisk.core.database.NoDocumentException
 import whisk.core.entity._
-import whisk.core.entity.types.EntityStore
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.spi.SpiLoader
 import akka.event.Logging.InfoLevel
 
 import scala.annotation.tailrec
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.util.{Failure, Success}
 
 case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
@@ -50,9 +48,6 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
 
   private val lbConfig = loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer)
 
-  /** Used to manage an action for testing invoker health */ /** Used to manage an action for testing invoker health */
-  private val entityStore = WhiskEntityStore.datastore(config)
-
   /** The execution context for futures */
   private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
 
@@ -168,28 +163,6 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
       })
   }
 
-  /**
-   * Creates or updates a health test action by updating the entity store.
-   * This method is intended for use on startup.
-   * @return Future that completes successfully iff the action is added to the database
-   */
-  private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = {
-    implicit val tid = TransactionId.loadbalancer
-    WhiskAction
-      .get(db, action.docid)
-      .flatMap { oldAction =>
-        WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None)
-      }
-      .recover {
-        case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None)
-      }
-      .map(_ => {})
-      .andThen {
-        case Success(_) => logging.info(this, "test action for invoker health now exists")
-        case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e")
-      }
-  }
-
   /** Gets a producer which can publish messages to the kafka bus. */
   private val messagingProvider = SpiLoader.get[MessagingProvider]
   private val messageProducer = messagingProvider.getProducer(config, executionContext)
@@ -216,29 +189,15 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
       case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic")
     }
   }
-  private val invokerPool = {
-    // Do not create the invokerPool if it is not possible to create the health test action to recover the invokers.
-    InvokerPool
-      .healthAction(controllerInstance)
-      .map {
-        // Await the creation of the test action; on failure, this will abort the constructor which should
-        // in turn abort the startup of the controller.
-        a =>
-          Await.result(createTestActionForInvokerHealth(entityStore, a), 1.minute)
-      }
-      .orElse {
-        throw new IllegalStateException(
-          "cannot create test action for invoker health because runtime manifest is not valid")
-      }
 
-    val maxPingsPerPoll = 128
-    val pingConsumer =
-      messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = maxPingsPerPoll)
-    val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) =>
-      f.actorOf(InvokerActor.props(invokerInstance, controllerInstance))
+  private val invokerPool = {
+    InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore(config))
 
     actorSystem.actorOf(
-      InvokerPool.props(invokerFactory, (m, i) => sendActivationToInvoker(messageProducer, m, i), pingConsumer))
+      InvokerPool.props(
+        (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
+        (m, i) => sendActivationToInvoker(messageProducer, m, i),
+        messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = 128)))
   }
 
   /**
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 13c3a70b95..0c75176208 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -20,29 +20,24 @@ package whisk.core.loadBalancer
 import java.nio.charset.StandardCharsets
 
 import scala.collection.immutable
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
 import org.apache.kafka.clients.producer.RecordMetadata
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.ActorRefFactory
-import akka.actor.FSM
+import akka.actor.{Actor, ActorRef, ActorRefFactory, FSM, Props}
 import akka.actor.FSM.CurrentState
 import akka.actor.FSM.SubscribeTransitionCallBack
 import akka.actor.FSM.Transition
-import akka.actor.Props
 import akka.pattern.pipe
 import akka.util.Timeout
-import whisk.common.AkkaLogging
-import whisk.common.LoggingMarkers
-import whisk.common.RingBuffer
-import whisk.common.TransactionId
+import whisk.common._
 import whisk.core.connector._
+import whisk.core.database.NoDocumentException
 import whisk.core.entitlement.Privilege
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity._
+import whisk.core.entity.types.EntityStore
 
 // Received events
 case object GetStatus
@@ -169,6 +164,48 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
 }
 
 object InvokerPool {
+  private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = {
+    implicit val tid = TransactionId.loadbalancer
+    implicit val ec = db.executionContext
+    implicit val logging = db.logging
+
+    WhiskAction
+      .get(db, action.docid)
+      .flatMap { oldAction =>
+        WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None)
+      }
+      .recover {
+        case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None)
+      }
+      .map(_ => {})
+      .andThen {
+        case Success(_) => logging.info(this, "test action for invoker health now exists")
+        case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e")
+      }
+  }
+
+  /**
+   * Prepares everything for the health protocol to work (i.e. creates a testaction)
+   *
+   * @param controllerInstance instance of the controller we run in
+   * @param entityStore store to write the action to
+   * @return throws an exception on failure to prepare
+   */
+  def prepare(controllerInstance: InstanceId, entityStore: EntityStore): Unit = {
+    InvokerPool
+      .healthAction(controllerInstance)
+      .map {
+        // Await the creation of the test action; on failure, this will abort the constructor which should
+        // in turn abort the startup of the controller.
+        a =>
+          Await.result(createTestActionForInvokerHealth(entityStore, a), 1.minute)
+      }
+      .orElse {
+        throw new IllegalStateException(
+          "cannot create test action for invoker health because runtime manifest is not valid")
+      }
+  }
+
   def props(f: (ActorRefFactory, InstanceId) => ActorRef,
             p: (ActivationMessage, InstanceId) => Future[RecordMetadata],
             pc: MessageConsumer) = {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services