You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2019/05/02 03:05:30 UTC

[incubator-openwhisk] branch master updated: Add SPI for invoker (#4453)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 6982f46  Add SPI for invoker (#4453)
6982f46 is described below

commit 6982f460c293214928f4b62b2572d6bd83d61a4e
Author: Dominic Kim <do...@navercorp.com>
AuthorDate: Thu May 2 12:05:20 2019 +0900

    Add SPI for invoker (#4453)
---
 common/scala/src/main/resources/reference.conf     |  2 +
 .../apache/openwhisk/core/invoker/Invoker.scala    | 57 ++++++++++++++++------
 .../openwhisk/core/invoker/InvokerReactive.scala   | 30 +++++++++---
 3 files changed, 65 insertions(+), 24 deletions(-)

diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index ad3bedd..fde5073 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -12,6 +12,8 @@ whisk.spi {
   LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer
   EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider
   AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective
+  InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive
+  InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer
 }
 
 dispatchers {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 9e59232..8854801 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -22,23 +22,22 @@ import akka.actor.{ActorSystem, CoordinatedShutdown}
 import akka.stream.ActorMaterializer
 import com.typesafe.config.ConfigValueFactory
 import kamon.Kamon
-import pureconfig.loadConfigOrThrow
 import org.apache.openwhisk.common.Https.HttpsConfig
 import org.apache.openwhisk.common._
-import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.core.WhiskConfig._
-import org.apache.openwhisk.core.connector.{MessagingProvider, PingMessage}
+import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider}
 import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
-import org.apache.openwhisk.core.entity.{ExecManifest, InvokerInstanceId}
-import org.apache.openwhisk.core.entity.ActivationEntityLimit
+import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ConcurrencyLimitConfig, ExecManifest, InvokerInstanceId}
 import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
-import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.spi.{Spi, SpiLoader}
 import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
 
 import scala.concurrent.duration._
-import scala.concurrent.Await
-import scala.util.{Failure, Try}
+import scala.concurrent.{Await, ExecutionContext}
+import scala.util.Try
 
 case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
 
@@ -71,6 +70,7 @@ object Invoker {
       ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
     implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
     val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
+    val limitConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit)
 
     // Prepare Kamon shutdown
     CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
@@ -156,25 +156,50 @@ object Invoker {
           .isFailure) {
       abort(s"failure during msgProvider.ensureTopic for topic $topicName")
     }
+
     val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
     val invoker = try {
-      new InvokerReactive(config, invokerInstance, producer, poolConfig)
+      SpiLoader.get[InvokerProvider].instance(config, invokerInstance, producer, poolConfig, limitConfig)
     } catch {
       case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
     }
 
-    Scheduler.scheduleWaitAtMost(1.seconds)(() => {
-      producer.send("health", PingMessage(invokerInstance)).andThen {
-        case Failure(t) => logger.error(this, s"failed to ping the controller: $t")
-      }
-    })
-
     val port = config.servicePort.toInt
     val httpsConfig =
       if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None
 
-    BasicHttpService.startHttpService(new BasicRasService {}.route, port, httpsConfig)(
+    val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker)
+    BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)(
       actorSystem,
       ActorMaterializer.create(actorSystem))
   }
 }
+
+/**
+ * An Spi for providing invoker implementation.
+ */
+trait InvokerProvider extends Spi {
+  def instance(config: WhiskConfig,
+               instance: InvokerInstanceId,
+               producer: MessageProducer,
+               poolConfig: ContainerPoolConfig,
+               limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore
+}
+
+// this trait can be used to add common implementation
+trait InvokerCore {}
+
+/**
+ * An Spi for providing RestAPI implementation for invoker.
+ * The given invoker may require corresponding RestAPI implementation.
+ */
+trait InvokerServerProvider extends Spi {
+  def instance(
+    invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
+}
+
+object DefaultInvokerServer extends InvokerServerProvider {
+  override def instance(
+    invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
+    new BasicRasService {}
+}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 5f682d7..93e168d 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -24,26 +24,24 @@ import akka.actor.{ActorRefFactory, ActorSystem, Props}
 import akka.event.Logging.InfoLevel
 import akka.stream.ActorMaterializer
 import org.apache.kafka.common.errors.RecordTooLargeException
-import pureconfig._
-import spray.json._
-import org.apache.openwhisk.common.tracing.WhiskTracerProvider
 import org.apache.openwhisk.common._
+import org.apache.openwhisk.common.tracing.WhiskTracerProvider
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.containerpool._
 import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
-import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.database.{UserContext, _}
 import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.spi.SpiLoader
-import org.apache.openwhisk.core.database.UserContext
+import pureconfig._
+import spray.json._
 
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
-object InvokerReactive {
+object InvokerReactive extends InvokerProvider {
 
   /**
    * An method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages
@@ -58,6 +56,15 @@ object InvokerReactive {
    * @param Boolean is true this is resource free message and false if this is a result forwarding message
    */
   type ActiveAck = (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any]
+
+  override def instance(
+    config: WhiskConfig,
+    instance: InvokerInstanceId,
+    producer: MessageProducer,
+    poolConfig: ContainerPoolConfig,
+    limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore =
+    new InvokerReactive(config, instance, producer, poolConfig, limitsConfig)
+
 }
 
 class InvokerReactive(
@@ -67,7 +74,8 @@ class InvokerReactive(
   poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool),
   limitsConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit))(
   implicit actorSystem: ActorSystem,
-  logging: Logging) {
+  logging: Logging)
+    extends InvokerCore {
 
   implicit val materializer: ActorMaterializer = ActorMaterializer()
   implicit val ec: ExecutionContext = actorSystem.dispatcher
@@ -299,4 +307,10 @@ class InvokerReactive(
       })
   }
 
+  private val healthProducer = msgProvider.getProducer(config)
+  Scheduler.scheduleWaitAtMost(1.seconds)(() => {
+    healthProducer.send("health", PingMessage(instance)).andThen {
+      case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
+    }
+  })
 }