You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/08/04 06:54:30 UTC
[incubator-openwhisk] branch master updated: SPI approach for
pluggable implementations. (#2414)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 50fb60e SPI approach for pluggable implementations. (#2414)
50fb60e is described below
commit 50fb60e221b47dd82026aa10a889a00047b75b0c
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Thu Aug 3 23:54:28 2017 -0700
SPI approach for pluggable implementations. (#2414)
Adds the ability to add pluggable implementations for defined Service Provider Interfaces (SPI). The implementation to load is chosen via configuration.
First set of plug-points are:
- ArtifactStoreProvider
- MessagingProvider
---
common/scala/src/main/resources/reference.conf | 4 +
.../connector/kafka/KafkaMessagingProvider.scala | 43 ++++++
.../whisk/core/connector/MessagingProvider.scala | 33 +++++
.../core/database/ArtifactStoreProvider.scala | 35 +++++
.../whisk/core/database/CouchDbStoreProvider.scala | 45 ++++++
.../main/scala/whisk/core/entity/WhiskStore.scala | 33 ++---
.../scala/src/main/scala/whisk/spi/SpiLoader.scala | 100 +++++++++++++
.../scala/whisk/core/controller/Controller.scala | 4 +-
.../core/loadBalancer/LoadBalancerService.scala | 20 ++-
.../main/scala/whisk/core/invoker/Invoker.scala | 29 ++--
docs/spi.md | 77 ++++++++++
tests/src/test/resources/application.conf | 8 ++
.../core/controller/test/ActivationsApiTests.scala | 7 +-
tests/src/test/scala/whisk/spi/SpiTests.scala | 159 +++++++++++++++++++++
14 files changed, 542 insertions(+), 55 deletions(-)
diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
new file mode 100644
index 0000000..52f30c3
--- /dev/null
+++ b/common/scala/src/main/resources/reference.conf
@@ -0,0 +1,4 @@
+whisk.spi{
+ ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
+ MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
+}
\ No newline at end of file
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
new file mode 100644
index 0000000..2d6bfe6
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.connector.kafka
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.core.connector.MessageConsumer
+import whisk.core.connector.MessageProducer
+import whisk.core.connector.MessagingProvider
+import whisk.spi.Dependencies
+import whisk.spi.SingletonSpiFactory
+
+/**
+ * A Kafka based implementation of MessagingProvider
+ */
+class KafkaMessagingProvider() extends MessagingProvider {
+ def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(implicit logging: Logging): MessageConsumer =
+ new KafkaConsumerConnector(config.kafkaHost, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
+
+ def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
+ new KafkaProducerConnector(config.kafkaHost, ec)
+}
+
+object KafkaMessagingProvider extends SingletonSpiFactory[MessagingProvider] {
+ override def apply(dependencies: Dependencies): MessagingProvider = new KafkaMessagingProvider
+}
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
new file mode 100644
index 0000000..b88e8d9
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.FiniteDuration
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.spi.Spi
+
+/**
+ * An Spi for providing Messaging implementations.
+ */
+trait MessagingProvider extends Spi {
+ def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int = Int.MaxValue, maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging): MessageConsumer
+ def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
new file mode 100644
index 0000000..5234fc8
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import akka.actor.ActorSystem
+import spray.json.RootJsonFormat
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.spi.Spi
+
+/**
+ * An Spi for providing ArtifactStore implementations
+ */
+
+trait ArtifactStoreProvider extends Spi {
+ def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)(
+ implicit jsonFormat: RootJsonFormat[D],
+ actorSystem: ActorSystem,
+ logging: Logging): ArtifactStore[D]
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
new file mode 100644
index 0000000..8b26f93
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import akka.actor.ActorSystem
+import spray.json.RootJsonFormat
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.spi.Dependencies
+import whisk.spi.SpiFactory
+
+/**
+ * A CouchDB implementation of ArtifactStoreProvider
+ */
+class CouchDbStoreProvider extends ArtifactStoreProvider {
+ def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)(
+ implicit jsonFormat: RootJsonFormat[D],
+ actorSystem: ActorSystem,
+ logging: Logging): ArtifactStore[D] = {
+ require(config != null && config.isValid, "config is undefined or not valid")
+ require(config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB", "Unsupported db.provider: " + config.dbProvider)
+ assume(Set(config.dbProtocol, config.dbHost, config.dbPort, config.dbUsername, config.dbPassword, name(config)).forall(_.nonEmpty), "At least one expected property is missing")
+
+ new CouchDbRestStore[D](config.dbProtocol, config.dbHost, config.dbPort.toInt, config.dbUsername, config.dbPassword, name(config))
+ }
+}
+
+object CouchDbStoreProvider extends SpiFactory[ArtifactStoreProvider] {
+ override def apply(deps: Dependencies): ArtifactStoreProvider = new CouchDbStoreProvider
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
index aa48419..c25ff2e 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
@@ -18,11 +18,6 @@
package whisk.core.entity
import java.time.Instant
-
-import scala.concurrent.Future
-import scala.language.postfixOps
-import scala.util.Try
-
import akka.actor.ActorSystem
import spray.json.JsObject
import spray.json.JsString
@@ -40,16 +35,19 @@ import whisk.core.WhiskConfig.dbProvider
import whisk.core.WhiskConfig.dbUsername
import whisk.core.WhiskConfig.dbWhisk
import whisk.core.database.ArtifactStore
-import whisk.core.database.CouchDbRestStore
+import whisk.core.database.ArtifactStoreProvider
import whisk.core.database.DocumentRevisionProvider
import whisk.core.database.DocumentSerializer
+import scala.concurrent.Future
+import scala.language.postfixOps
+import scala.util.Try
+import whisk.spi.SpiLoader
package object types {
type AuthStore = ArtifactStore[WhiskAuth]
type EntityStore = ArtifactStore[WhiskEntity]
type ActivationStore = ArtifactStore[WhiskActivation]
}
-
protected[core] trait WhiskDocument
extends DocumentSerializer
with DocumentRevisionProvider {
@@ -86,19 +84,6 @@ protected[core] trait WhiskDocument
}
}
-protected[core] object Util {
- def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)(
- implicit jsonFormat: RootJsonFormat[D],
- actorSystem: ActorSystem,
- logging: Logging): ArtifactStore[D] = {
- require(config != null && config.isValid, "config is undefined or not valid")
- require(config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB", "Unsupported db.provider: " + config.dbProvider)
- assume(Set(config.dbProtocol, config.dbHost, config.dbPort, config.dbUsername, config.dbPassword, name(config)).forall(_.nonEmpty), "At least one expected property is missing")
-
- new CouchDbRestStore[D](config.dbProtocol, config.dbHost, config.dbPort.toInt, config.dbUsername, config.dbPassword, name(config))
- }
-}
-
object WhiskAuthStore {
def requiredProperties =
Map(dbProvider -> null,
@@ -110,7 +95,7 @@ object WhiskAuthStore {
dbAuths -> null)
def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
- Util.makeStore[WhiskAuth](config, _.dbAuths)
+ SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskAuth](config, _.dbAuths)
}
object WhiskEntityStore {
@@ -124,7 +109,8 @@ object WhiskEntityStore {
dbWhisk -> null)
def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
- Util.makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging)
+ SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging)
+
}
object WhiskActivationStore {
@@ -138,9 +124,10 @@ object WhiskActivationStore {
dbActivations -> null)
def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
- Util.makeStore[WhiskActivation](config, _.dbActivations)
+ SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskActivation](config, _.dbActivations)
}
+
/**
* This object provides some utilities that query the whisk datastore.
* The datastore is assumed to have views (pre-computed joins or indexes)
diff --git a/common/scala/src/main/scala/whisk/spi/SpiLoader.scala b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala
new file mode 100644
index 0000000..849a1b6
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.spi
+
+import com.typesafe.config.ConfigFactory
+import java.util.concurrent.atomic.AtomicReference
+
+/** Marker trait to mark an Spi */
+trait Spi
+
+/** Trait to be extended by factory objects creating Spi implementations */
+trait SpiFactory[T <: Spi] {
+ def apply(dependencies: Dependencies): T
+
+ /**
+ * Proxy method only called by the SpiLoader so the apply method
+ * is overridable even with custom logic implemented.
+ *
+ * @param dependencies Dependencies to pass to the Spi
+ */
+ def buildInstance(dependencies: Dependencies): T = apply(dependencies)
+}
+
+/**
+ * SpiFactory which is guaranteed to always return the same reference for
+ * the same type of Spi.
+ */
+trait SingletonSpiFactory[T <: Spi] extends SpiFactory[T] {
+ private val ref = new AtomicReference[T]()
+
+ override def buildInstance(dependencies: Dependencies): T = {
+ val oldValue = ref.get()
+ if (oldValue != null.asInstanceOf[T]) {
+ oldValue
+ } else {
+ val newValue = apply(dependencies)
+ if (ref.compareAndSet(null.asInstanceOf[T], newValue)) {
+ newValue
+ } else {
+ ref.get()
+ }
+ }
+ }
+}
+
+trait SpiClassResolver {
+ /** Resolves the implementation for a given type */
+ def getClassNameForType[T](implicit man: Manifest[T]): String
+}
+
+object SpiLoader {
+ /**
+ * Instantiates an object of the given type.
+ *
+ * The ClassName to load is resolved via the SpiClassResolver in scode, which defaults to
+ * a TypesafeConfig based resolver.
+ */
+ def get[A <: Spi](deps: Dependencies = Dependencies())(implicit resolver: SpiClassResolver = TypesafeConfigClassResolver, man: Manifest[A]): A = {
+ val clazz = Class.forName(resolver.getClassNameForType[A] + "$")
+ clazz.getField("MODULE$").get(clazz).asInstanceOf[SpiFactory[A]].buildInstance(deps)
+ }
+}
+
+/** Lookup the classname for the SPI impl based on a key in the provided Config */
+object TypesafeConfigClassResolver extends SpiClassResolver {
+ private val config = ConfigFactory.load()
+
+ override def getClassNameForType[T](implicit man: Manifest[T]): String = config.getString("whisk.spi." + man.runtimeClass.getSimpleName)
+}
+
+/**
+ * Object containing arbitrary objects acting as dependencies.
+ *
+ * This is solely a helper type to cross the border between possibly heterogeneous Spi
+ * interfaces and the production code.
+ */
+case class Dependencies(private val deps: Any*) {
+ require(deps.map(_.getClass).distinct.size == deps.size, "A type can only occur once as a dependency")
+
+ def get[T](implicit man: Manifest[T]): T =
+ deps.find(d => man.runtimeClass.isAssignableFrom(d.getClass)) match {
+ case Some(d: T) => d
+ case _ => throw new IllegalArgumentException(s"missing dependency of type ${man.runtimeClass.getName}")
+ }
+}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 359e78c..0f5236b 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -26,8 +26,8 @@ import akka.japi.Creator
import spray.http.StatusCodes._
import spray.http.Uri
import spray.httpx.SprayJsonSupport._
-import spray.json._
import spray.json.DefaultJsonProtocol._
+import spray.json._
import spray.routing.Directive.pimpApply
import spray.routing.Route
import whisk.common.AkkaLogging
@@ -43,8 +43,8 @@ import whisk.core.entity.ExecManifest.Runtimes
import whisk.core.loadBalancer.LoadBalancerService
import whisk.http.BasicHttpService
import whisk.http.BasicRasService
+import scala.util.{Failure, Success}
-import scala.util.{ Failure, Success }
/**
* The Controller is the service that provides the REST API for OpenWhisk.
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 3418951..b50eebc 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -18,7 +18,6 @@
package whisk.core.loadBalancer
import java.nio.charset.StandardCharsets
-
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
@@ -26,22 +25,18 @@ import scala.concurrent.Promise
import scala.concurrent.duration.DurationInt
import scala.util.Failure
import scala.util.Success
-
import org.apache.kafka.clients.producer.RecordMetadata
-
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
-
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
-import whisk.connector.kafka.KafkaConsumerConnector
-import whisk.connector.kafka.KafkaProducerConnector
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
+import whisk.core.connector.MessagingProvider
import whisk.core.connector.{ ActivationMessage, CompletionMessage }
import whisk.core.connector.MessageFeed
import whisk.core.connector.MessageProducer
@@ -55,6 +50,7 @@ import whisk.core.entity.types.EntityStore
import scala.annotation.tailrec
import whisk.core.entity.EntityName
import whisk.core.entity.Identity
+import whisk.spi.SpiLoader
trait LoadBalancer {
@@ -175,7 +171,8 @@ class LoadBalancerService(
}
/** Gets a producer which can publish messages to the kafka bus. */
- private val messageProducer = new KafkaProducerConnector(config.kafkaHost, executionContext)
+ private val messasgingProvider = SpiLoader.get[MessagingProvider]()
+ private val messageProducer = messasgingProvider.getProducer(config, executionContext)
private def sendActivationToInvoker(producer: MessageProducer, msg: ActivationMessage, invoker: InstanceId): Future[RecordMetadata] = {
implicit val transid = msg.transid
@@ -200,8 +197,7 @@ class LoadBalancerService(
}
val maxPingsPerPoll = 128
- // Each controller gets its own Group Id, to receive all messages
- val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll)
+ val pingConsumer = messasgingProvider.getConsumer(config, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll)
val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) => f.actorOf(InvokerActor.props(invokerInstance, instance))
actorSystem.actorOf(InvokerPool.props(
@@ -215,10 +211,10 @@ class LoadBalancerService(
*/
val maxActiveAcksPerPoll = 128
val activeAckPollDuration = 1.second
-
- private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll)
+ private val activeAckConsumer = messasgingProvider.getConsumer(config, "completions", s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll)
val activationFeed = actorSystem.actorOf(Props {
- new MessageFeed("activeack", logging, activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration, processActiveAck)
+ new MessageFeed("activeack", logging,
+ activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration, processActiveAck)
})
def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index dc2f09d..e5dc5dc 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -18,36 +18,34 @@
package whisk.core.invoker
import java.nio.charset.StandardCharsets
-import java.time.{ Clock, Instant }
-
-import scala.concurrent.{ Await, ExecutionContext, Future }
+import java.time.{Clock, Instant}
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.Promise
-import scala.concurrent.duration.{ Duration, DurationInt }
+import scala.concurrent.duration.{Duration, DurationInt}
import scala.language.postfixOps
-import scala.util.{ Failure, Success }
+import scala.util.{Failure, Success}
import scala.util.Try
-
import org.apache.kafka.common.errors.RecordTooLargeException
-
-import akka.actor.{ ActorRef, ActorSystem, actorRef2Scala }
+import akka.actor.{ActorRef, ActorSystem, actorRef2Scala}
import akka.japi.Creator
import spray.json._
import spray.json.DefaultJsonProtocol._
-import whisk.common.{ Counter, Logging, LoggingMarkers, TransactionId }
+import whisk.common.{Counter, Logging, LoggingMarkers, TransactionId}
import whisk.common.AkkaLogging
import whisk.common.Scheduler
-import whisk.connector.kafka.{ KafkaConsumerConnector, KafkaProducerConnector }
import whisk.core.WhiskConfig
-import whisk.core.WhiskConfig.{ dockerImagePrefix, dockerRegistry, kafkaHost, logsDir, servicePort, invokerUseReactivePool }
-import whisk.core.connector.{ ActivationMessage, CompletionMessage }
+import whisk.core.WhiskConfig.{dockerImagePrefix, dockerRegistry, invokerUseReactivePool, kafkaHost, logsDir, servicePort}
+import whisk.core.connector.{ActivationMessage, CompletionMessage}
import whisk.core.connector.MessageFeed
import whisk.core.connector.MessageProducer
+import whisk.core.connector.MessagingProvider
import whisk.core.connector.PingMessage
import whisk.core.container._
-import whisk.core.dispatcher.{ Dispatcher, MessageHandler }
+import whisk.core.dispatcher.{Dispatcher, MessageHandler}
import whisk.core.entity._
import whisk.http.BasicHttpService
import whisk.http.Messages
+import whisk.spi.SpiLoader
import whisk.utils.ExecutionContextFactory
/**
@@ -477,8 +475,9 @@ object Invoker {
val topic = s"invoker${invokerInstance.toInt}"
val maxdepth = ContainerPool.getDefaultMaxActive(config)
- val consumer = new KafkaConsumerConnector(config.kafkaHost, "invokers", topic, maxdepth, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
- val producer = new KafkaProducerConnector(config.kafkaHost, ec)
+ val msgProvider = SpiLoader.get[MessagingProvider]()
+ val consumer = msgProvider.getConsumer(config, "invokers", topic, maxdepth, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+ val producer = msgProvider.getProducer(config, ec)
val dispatcher = new Dispatcher(consumer, 500 milliseconds, maxdepth, actorSystem)
val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) {
diff --git a/docs/spi.md b/docs/spi.md
new file mode 100644
index 0000000..b6da0de
--- /dev/null
+++ b/docs/spi.md
@@ -0,0 +1,77 @@
+# SPI extensions in OpenWhisk
+
+Alternate implementations of various components follow an SPI (Service Provider Interface) pattern:
+* The pluggable component is defined as an Spi trait:
+```scala
+import whisk.spi.Spi
+trait ThisIsPluggable extends Spi { ... }
+```
+* Implementations implement the Spi trait
+```scala
+class TheImpl extends ThisIsPluggable { ... }
+class TheOtherImpl extends ThisIsPluggable { ... }
+```
+
+Runtime resolution of an Spi trait to a specific impl is provided by:
+* SpiLoader - a utility for loading the impl of a specific Spi, using a resolver to determine the impls factory classname, and reflection to load the factory object
+* SpiFactory - a way to define a factory for each impl, all of which are loaded via reflection
+* application.conf - each SpiFactory is resolved to a classname based on the config key provided to SpiLoader
+
+A single SpiFactory per unique Spi is usable at runtime, since the key will have a single string value.
+
+# Example
+
+The process to create and use an SPI is as follows:
+
+## Define the Spi and impl(s)
+
+* create your Spi trait `YourSpi` as an class that is an extension of `whisk.spi.Spi`
+* create you SpiFactory impl `YourSpiFactory` as an object that is an extension of `whisk.spi.SpiFactory` (or `whisk.spi.SingletonSpiFactory`)
+* create your impls as classes that extend `YourSpi`
+
+## Define the SpiFactory to load the impl
+
+```scala
+class YourImplFactory extends SpiFactory[YourSpi]{
+ def apply(dependencies: Dependencies): { ...construct the impl...}
+}
+```
+for singleton behavior you can use
+```scala
+class YourImplFactory extends SingletonSpiFactory[YourSpi]{
+ def apply(dependencies: Dependencies): { ...construct the impl...}
+}
+```
+
+## Invoke SpiLoader.get to acquire an instance of the SPI
+
+SpiLoader uses a TypesafeConfig key to use for resolving which impl should be loaded.
+
+The config key used to find the impl classname is `whisk.spi.<SpiInterface>`
+
+For example, the SPI interface `whisk.core.database.ArtifactStoreProvider` would load a specific impl indicated by the `whisk.spi.ArtifactStoreProvider` config key.
+
+(so you cannot use multiple SPI interfaces with the same class name in different packages)
+
+
+Invoke the loader using `SpiLoader.get[<the SPI interface>]()(<implicit resolver>)`
+
+```scala
+val messagingProvider = SpiLoader.get[MessagingProvider]()
+```
+
+## Defaults
+
+Default impls resolution is dependent on the config values in order of priority from:
+1. application.conf
+2. reference.conf
+
+So use `reference.conf` to specify defaults.
+
+# Runtime
+
+Since SPI impls are loaded from the classpath, and a specific impl is used only if explicitly configured it is possible to optimize the classpath based on your preference of:
+* include only default impls, and only use default impls
+* include all impls, and only use the specified impls
+* include some combination of defaults and alternate impls, and use the specified impls for the alternates, and default impls for the rest
+
diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf
new file mode 100644
index 0000000..3dbdb36
--- /dev/null
+++ b/tests/src/test/resources/application.conf
@@ -0,0 +1,8 @@
+
+whisk.spi {
+ DependentSpi = whisk.spi.DepSpiImpl
+ TestSpi = whisk.spi.TestSpiImpl
+ SimpleSpi = whisk.spi.SimpleSpiImpl
+ MissingSpi = whisk.spi.MissingImpl
+ MissingModule = missing.module
+}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
index a2a9f4e..66209d2 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
@@ -19,19 +19,19 @@ package whisk.core.controller.test
import java.time.Clock
import java.time.Instant
-
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
-
import spray.http.StatusCodes._
import spray.httpx.SprayJsonSupport._
import spray.json._
import spray.json.DefaultJsonProtocol._
import whisk.core.controller.WhiskActivationsApi
+import whisk.core.database.ArtifactStoreProvider
import whisk.core.entity._
import whisk.core.entity.size._
import whisk.http.ErrorResponse
import whisk.http.Messages
+import whisk.spi.SpiLoader
/**
* Tests Activations API.
@@ -349,7 +349,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
}
it should "report proper error when record is corrupted on get" in {
- val activationStore = Util.makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging)
+
+ val activationStore = SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging)
implicit val tid = transid()
val entity = BadEntity(namespace, EntityName(ActivationId().toString))
put(activationStore, entity)
diff --git a/tests/src/test/scala/whisk/spi/SpiTests.scala b/tests/src/test/scala/whisk/spi/SpiTests.scala
new file mode 100644
index 0000000..dc309b0
--- /dev/null
+++ b/tests/src/test/scala/whisk/spi/SpiTests.scala
@@ -0,0 +1,159 @@
+package whisk.spi
+
+import com.typesafe.config.ConfigException
+import common.StreamLogging
+import common.WskActorSystem
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+import whisk.core.WhiskConfig
+
+@RunWith(classOf[JUnitRunner])
+class SpiTests extends FlatSpec with Matchers with WskActorSystem with StreamLogging {
+
+ behavior of "SpiProvider"
+
+ it should "load an Spi from SpiLoader via typesafe config" in {
+ val simpleSpi = SpiLoader.get[SimpleSpi]()
+ simpleSpi shouldBe a[SimpleSpi]
+ }
+
+ it should "throw an exception if the impl defined in application.conf is missing" in {
+ a[ClassNotFoundException] should be thrownBy SpiLoader.get[MissingSpi]() // MissingSpi(actorSystem)
+ }
+
+ it should "throw an exception if the module is missing" in {
+ a[ClassNotFoundException] should be thrownBy SpiLoader.get[MissingModule]() // MissingModule(actorSystem)
+ }
+
+ it should "throw an exception if the config key is missing" in {
+ a[ConfigException] should be thrownBy SpiLoader.get[MissingKey]() // MissingModule(actorSystem)
+ }
+
+ it should "load an Spi with injected WhiskConfig" in {
+ val whiskConfig = new WhiskConfig(Map())
+ val deps = Dependencies("some name", whiskConfig)
+ val dependentSpi = SpiLoader.get[DependentSpi](deps)
+ dependentSpi.config shouldBe whiskConfig
+ }
+
+ it should "load an Spi with injected Spi" in {
+ val whiskConfig = new WhiskConfig(Map())
+ val deps = Dependencies("some name", whiskConfig)
+ val dependentSpi = SpiLoader.get[DependentSpi](deps)
+
+ val deps2 = Dependencies("dep2", dependentSpi)
+ val testSpi = SpiLoader.get[TestSpi](deps2)
+
+ testSpi.dep shouldBe dependentSpi
+ }
+
+ it should "not allow duplicate-type dependencies" in {
+ a[IllegalArgumentException] should be thrownBy Dependencies("some string", "some other string")
+ }
+
+ it should "load SPI impls as singletons via SingletonSpiFactory" in {
+ val instance1 = SpiLoader.get[DependentSpi]()
+ val instance2 = SpiLoader.get[DependentSpi]()
+ val instance3 = SpiLoader.get[DependentSpi]()
+
+ instance1 shouldBe instance2
+ instance2 shouldBe instance3
+ }
+
+ it should "load SPI impls as singletons via lazy val init" in {
+ val instance1 = SpiLoader.get[SimpleSpi]()
+ val instance2 = SpiLoader.get[SimpleSpi]()
+ val instance3 = SpiLoader.get[SimpleSpi]()
+
+ instance1 shouldBe instance2
+ instance2 shouldBe instance3
+ }
+}
+
+trait TestSpi extends Spi {
+ val name: String
+ val dep: DependentSpi
+}
+
+trait DependentSpi extends Spi {
+ val name: String
+ val config: WhiskConfig
+}
+
+trait TestSpiFactory extends Spi {
+ def getTestSpi(name: String, dep: DependentSpi): TestSpi
+}
+
+trait DependentSpiFactory extends Spi {
+ def getDependentSpi(name: String, config: WhiskConfig): DependentSpi
+}
+
+abstract class Key(key: String) {
+
+}
+
+trait SimpleSpi extends Spi {
+ val name: String
+}
+
+trait MissingSpi extends Spi {
+ val name: String
+}
+
+trait MissingModule extends Spi {
+ val name: String
+}
+trait MissingKey extends Spi
+
+//SPI impls
+//a singleton enforced by SingletonSpiFactory
+class DepSpiImpl(val name: String, val config: WhiskConfig) extends DependentSpi
+object DepSpiImpl extends SingletonSpiFactory[DependentSpi] {
+ override def apply(deps: Dependencies): DependentSpi = {
+ new DepSpiImpl(deps.get[String], deps.get[WhiskConfig])
+ }
+}
+
+class TestSpiImpl(val name: String, val dep: DependentSpi) extends TestSpi
+//an alternative to extending SingletonSpiFactory is using lazy val:
+object TestSpiImpl extends SpiFactory[TestSpi] {
+ var name: String = null
+ var conf: DependentSpi = null
+ lazy val instance = new TestSpiImpl(name, conf)
+ override def apply(dependencies: Dependencies): TestSpi = {
+ name = dependencies.get[String]
+ conf = dependencies.get[DependentSpi]
+ instance
+ }
+
+}
+
+class TestSpiFactoryImpl extends TestSpiFactory {
+ def getTestSpi(name: String, dep: DependentSpi) = new TestSpiImpl(name, dep)
+}
+
+object TestSpiFactoryImpl extends SpiFactory[TestSpiFactory] {
+ override def apply(deps: Dependencies): TestSpiFactory = new TestSpiFactoryImpl()
+}
+
+class DependentSpiFactoryImpl extends DependentSpiFactory {
+ override def getDependentSpi(name: String, config: WhiskConfig): DependentSpi = new DepSpiImpl(name, config)
+}
+
+object DependentSpiFactoryImpl extends SpiFactory[DependentSpiFactory] {
+ override def apply(deps: Dependencies): DependentSpiFactory = new DependentSpiFactoryImpl()
+}
+
+class SimpleSpiImpl(val name: String) extends SimpleSpi
+
+object SimpleSpiImpl extends SingletonSpiFactory[SimpleSpi] {
+ override def apply(dependencies: Dependencies): SimpleSpi = new SimpleSpiImpl("some val ")
+}
+
+class MissingSpiImpl(val name: String) extends MissingSpi
+
+object MissingSpiImpl extends SpiFactory[MissingSpi] {
+ override def apply(deps: Dependencies): MissingSpi = new MissingSpiImpl("some val ")
+}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].