You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2019/05/02 03:05:30 UTC
[incubator-openwhisk] branch master updated: Add SPI for invoker
(#4453)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 6982f46 Add SPI for invoker (#4453)
6982f46 is described below
commit 6982f460c293214928f4b62b2572d6bd83d61a4e
Author: Dominic Kim <do...@navercorp.com>
AuthorDate: Thu May 2 12:05:20 2019 +0900
Add SPI for invoker (#4453)
---
common/scala/src/main/resources/reference.conf | 2 +
.../apache/openwhisk/core/invoker/Invoker.scala | 57 ++++++++++++++++------
.../openwhisk/core/invoker/InvokerReactive.scala | 30 +++++++++---
3 files changed, 65 insertions(+), 24 deletions(-)
diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index ad3bedd..fde5073 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -12,6 +12,8 @@ whisk.spi {
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer
EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider
AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective
+ InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive
+ InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer
}
dispatchers {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 9e59232..8854801 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -22,23 +22,22 @@ import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
-import pureconfig.loadConfigOrThrow
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
-import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.WhiskConfig._
-import org.apache.openwhisk.core.connector.{MessagingProvider, PingMessage}
+import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider}
import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
-import org.apache.openwhisk.core.entity.{ExecManifest, InvokerInstanceId}
-import org.apache.openwhisk.core.entity.ActivationEntityLimit
+import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ConcurrencyLimitConfig, ExecManifest, InvokerInstanceId}
import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
-import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.spi.{Spi, SpiLoader}
import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
import scala.concurrent.duration._
-import scala.concurrent.Await
-import scala.util.{Failure, Try}
+import scala.concurrent.{Await, ExecutionContext}
+import scala.util.Try
case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
@@ -71,6 +70,7 @@ object Invoker {
ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
+ val limitConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit)
// Prepare Kamon shutdown
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
@@ -156,25 +156,50 @@ object Invoker {
.isFailure) {
abort(s"failure during msgProvider.ensureTopic for topic $topicName")
}
+
val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
val invoker = try {
- new InvokerReactive(config, invokerInstance, producer, poolConfig)
+ SpiLoader.get[InvokerProvider].instance(config, invokerInstance, producer, poolConfig, limitConfig)
} catch {
case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
}
- Scheduler.scheduleWaitAtMost(1.seconds)(() => {
- producer.send("health", PingMessage(invokerInstance)).andThen {
- case Failure(t) => logger.error(this, s"failed to ping the controller: $t")
- }
- })
-
val port = config.servicePort.toInt
val httpsConfig =
if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None
- BasicHttpService.startHttpService(new BasicRasService {}.route, port, httpsConfig)(
+ val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker)
+ BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)(
actorSystem,
ActorMaterializer.create(actorSystem))
}
}
+
+/**
+ * An Spi for providing invoker implementation.
+ */
+trait InvokerProvider extends Spi {
+ def instance(config: WhiskConfig,
+ instance: InvokerInstanceId,
+ producer: MessageProducer,
+ poolConfig: ContainerPoolConfig,
+ limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore
+}
+
+// this trait can be used to add common implementation
+trait InvokerCore {}
+
+/**
+ * An Spi for providing RestAPI implementation for invoker.
+ * The given invoker may require corresponding RestAPI implementation.
+ */
+trait InvokerServerProvider extends Spi {
+ def instance(
+ invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
+}
+
+object DefaultInvokerServer extends InvokerServerProvider {
+ override def instance(
+ invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
+ new BasicRasService {}
+}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 5f682d7..93e168d 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -24,26 +24,24 @@ import akka.actor.{ActorRefFactory, ActorSystem, Props}
import akka.event.Logging.InfoLevel
import akka.stream.ActorMaterializer
import org.apache.kafka.common.errors.RecordTooLargeException
-import pureconfig._
-import spray.json._
-import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.common._
+import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
-import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.database.{UserContext, _}
import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.spi.SpiLoader
-import org.apache.openwhisk.core.database.UserContext
+import pureconfig._
+import spray.json._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
-object InvokerReactive {
+object InvokerReactive extends InvokerProvider {
/**
* An method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages
@@ -58,6 +56,15 @@ object InvokerReactive {
* @param Boolean is true this is resource free message and false if this is a result forwarding message
*/
type ActiveAck = (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any]
+
+ override def instance(
+ config: WhiskConfig,
+ instance: InvokerInstanceId,
+ producer: MessageProducer,
+ poolConfig: ContainerPoolConfig,
+ limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore =
+ new InvokerReactive(config, instance, producer, poolConfig, limitsConfig)
+
}
class InvokerReactive(
@@ -67,7 +74,8 @@ class InvokerReactive(
poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool),
limitsConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit))(
implicit actorSystem: ActorSystem,
- logging: Logging) {
+ logging: Logging)
+ extends InvokerCore {
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = actorSystem.dispatcher
@@ -299,4 +307,10 @@ class InvokerReactive(
})
}
+ private val healthProducer = msgProvider.getProducer(config)
+ Scheduler.scheduleWaitAtMost(1.seconds)(() => {
+ healthProducer.send("health", PingMessage(instance)).andThen {
+ case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
+ }
+ })
}