You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2017/10/30 16:37:50 UTC
[incubator-openwhisk] branch master updated: Use batcher in
CouchDbRestStore. (#2835)
This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new ea11ad4 Use batcher in CouchDbRestStore. (#2835)
ea11ad4 is described below
commit ea11ad42523a02130d565d46c44ed0fbb1f1aaee
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Mon Oct 30 17:37:47 2017 +0100
Use batcher in CouchDbRestStore. (#2835)
---
.../scala/whisk/core/database/ArtifactStore.scala | 11 ---
.../core/database/ArtifactStoreExceptions.scala | 2 +
.../core/database/ArtifactStoreProvider.scala | 8 +-
.../whisk/core/database/CouchDbRestStore.scala | 92 ++++++++++++++--------
.../whisk/core/database/CouchDbStoreProvider.scala | 9 ++-
.../whisk/core/database/DocumentFactory.scala | 8 --
.../src/main/scala/whisk/core/entity/DocInfo.scala | 30 ++++++-
.../main/scala/whisk/core/entity/WhiskStore.scala | 11 +--
.../scala/whisk/core/invoker/InvokerReactive.scala | 24 +++---
.../core/cli/test/SequenceMigrationTests.scala | 3 +-
.../core/controller/test/ActivationsApiTests.scala | 10 +--
.../whisk/core/entity/test/DatastoreTests.scala | 2 +
.../scala/whisk/core/entity/test/ViewTests.scala | 3 +
13 files changed, 127 insertions(+), 86 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
index dbae8e5..2a61730 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
@@ -55,17 +55,6 @@ trait ArtifactStore[DocumentAbstraction] {
protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo]
/**
- * Puts (saves) documents to database in bulk using a future.
- * If the operation is successful, the future completes with DocId else an appropriate exception.
- *
- * @param ds the documents to put in the database
- * @param transid the transaction id for logging
- * @return a future that completes either with DocId
- */
- protected[database] def put(ds: Seq[DocumentAbstraction])(
- implicit transid: TransactionId): Future[Seq[Either[DocumentConflictException, DocInfo]]]
-
- /**
* Deletes document from database using a future.
* If the operation is successful, the future completes with true.
*
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala
index f949328..f8e7ec5 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala
@@ -26,3 +26,5 @@ case class DocumentConflictException(message: String) extends ArtifactStoreExcep
case class DocumentTypeMismatchException(message: String) extends ArtifactStoreException(message)
case class DocumentUnreadable(message: String) extends ArtifactStoreException(message)
+
+case class PutException(message: String) extends ArtifactStoreException(message)
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
index ba471aa..2080d47 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
@@ -18,6 +18,7 @@
package whisk.core.database
import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
import spray.json.RootJsonFormat
import whisk.common.Logging
import whisk.core.WhiskConfig
@@ -27,8 +28,11 @@ import whisk.spi.Spi
* An Spi for providing ArtifactStore implementations
*/
trait ArtifactStoreProvider extends Spi {
- def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)(
+ def makeStore[D <: DocumentSerializer](config: WhiskConfig,
+ name: WhiskConfig => String,
+ useBatching: Boolean = false)(
implicit jsonFormat: RootJsonFormat[D],
actorSystem: ActorSystem,
- logging: Logging): ArtifactStore[D]
+ logging: Logging,
+ materializer: ActorMaterializer): ArtifactStore[D]
}
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 90ede7e..0e66aee 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -20,22 +20,23 @@ package whisk.core.database
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
+
import akka.actor.ActorSystem
import akka.event.Logging.ErrorLevel
import akka.http.scaladsl.model._
+import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import spray.json._
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
+import whisk.core.entity.BulkEntityResult
import whisk.core.entity.DocInfo
import whisk.core.entity.DocRevision
import whisk.core.entity.WhiskDocument
import whisk.http.Messages
-import scala.util.{Failure, Success, Try}
-
/**
* Basic client to put and delete artifacts in a data store.
*
@@ -47,13 +48,17 @@ import scala.util.{Failure, Success, Try}
* @param dbName the name of the database to operate on
* @param serializerEvidence confirms the document abstraction is serializable to a Document with an id
*/
-class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](
- dbProtocol: String,
- dbHost: String,
- dbPort: Int,
- dbUsername: String,
- dbPassword: String,
- dbName: String)(implicit system: ActorSystem, val logging: Logging, jsonFormat: RootJsonFormat[DocumentAbstraction])
+class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: String,
+ dbHost: String,
+ dbPort: Int,
+ dbUsername: String,
+ dbPassword: String,
+ dbName: String,
+ useBatching: Boolean = false)(
+ implicit system: ActorSystem,
+ val logging: Logging,
+ jsonFormat: RootJsonFormat[DocumentAbstraction],
+ materializer: ActorMaterializer)
extends ArtifactStore[DocumentAbstraction]
with DefaultJsonProtocol {
@@ -62,6 +67,13 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](
private val client: CouchDbRestClient =
new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, dbName)
+ // This the the amount of allowed parallel requests for each entity, before batching starts. If there are already maxOpenDbRequests
+ // and more documents need to be stored, then all arriving documents will be put into batches (if enabled) to avoid a long queue.
+ private val maxOpenDbRequests = system.settings.config.getInt("akka.http.host-connection-pool.max-open-requests") / 2
+
+ private val batcher: Batcher[JsObject, Either[ArtifactStoreException, DocInfo]] =
+ new Batcher(500, maxOpenDbRequests)(put(_)(TransactionId.unknown))
+
override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
val asJson = d.toDocumentRecord
@@ -70,29 +82,42 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](
require(!id.isEmpty, "document id must be defined")
val docinfoStr = s"id: $id, rev: ${rev.getOrElse("null")}"
-
val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$dbName' saving document: '${docinfoStr}'")
- val request: CouchDbRestClient => Future[Either[StatusCode, JsObject]] = rev match {
- case Some(r) =>
- client =>
- client.putDoc(id, r, asJson)
- case None =>
- client =>
- client.putDoc(id, asJson)
- }
+ val f = if (useBatching) {
+ batcher.put(asJson).map { e =>
+ e match {
+ case Right(response) =>
+ transid.finished(this, start, s"[PUT] '$dbName' completed document: '${docinfoStr}', response: '$response'")
+ response
- val f = request(client).map { e =>
- e match {
+ case Left(e: DocumentConflictException) =>
+ transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; conflict.")
+ // For compatibility.
+ throw DocumentConflictException("conflict on 'put'")
+
+ case Left(e: ArtifactStoreException) =>
+ transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; ${e.getMessage}.")
+ throw PutException("error on 'put'")
+ }
+ }
+ } else {
+ val request: CouchDbRestClient => Future[Either[StatusCode, JsObject]] = rev match {
+ case Some(r) =>
+ client =>
+ client.putDoc(id, r, asJson)
+ case None =>
+ client =>
+ client.putDoc(id, asJson)
+ }
+ request(client).map {
case Right(response) =>
transid.finished(this, start, s"[PUT] '$dbName' completed document: '${docinfoStr}', response: '$response'")
response.convertTo[DocInfo]
-
case Left(StatusCodes.Conflict) =>
transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; conflict.")
// For compatibility.
throw DocumentConflictException("conflict on 'put'")
-
case Left(code) =>
transid.failed(
this,
@@ -109,22 +134,25 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](
transid.failed(this, start, s"[PUT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel))
}
- override protected[database] def put(ds: Seq[DocumentAbstraction])(
- implicit transid: TransactionId): Future[Seq[Either[DocumentConflictException, DocInfo]]] = {
+ private def put(ds: Seq[JsObject])(
+ implicit transid: TransactionId): Future[Seq[Either[ArtifactStoreException, DocInfo]]] = {
val count = ds.size
val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'$dbName' saving $count documents")
- val request: Future[Either[StatusCode, JsArray]] = client.putDocs(ds.map(_.toDocumentRecord))
-
- val f = request.map {
+ val f = client.putDocs(ds).map {
_ match {
case Right(response) =>
transid.finished(this, start, s"'$dbName' completed $count documents")
- response.convertTo[Seq[JsValue]].map { result =>
- Try(result.convertTo[DocInfo]) match {
- case Success(info) => Right(info)
- case Failure(_) => Left(DocumentConflictException("conflict on 'bulk_put'"))
- }
+
+ response.convertTo[Seq[BulkEntityResult]].map { singleResult =>
+ singleResult.error
+ .map {
+ case "conflict" => Left(DocumentConflictException("conflict on 'bulk_put'"))
+ case e => Left(PutException(s"Unexpected $e: ${singleResult.reason.getOrElse("")} on 'bulk_put'"))
+ }
+ .getOrElse {
+ Right(singleResult.toDocInfo)
+ }
}
case Left(code) =>
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
index e3713d8..d2c08dd 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
@@ -18,16 +18,18 @@
package whisk.core.database
import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
import spray.json.RootJsonFormat
import whisk.common.Logging
import whisk.core.WhiskConfig
object CouchDbStoreProvider extends ArtifactStoreProvider {
- def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)(
+ def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String, useBatching: Boolean)(
implicit jsonFormat: RootJsonFormat[D],
actorSystem: ActorSystem,
- logging: Logging): ArtifactStore[D] = {
+ logging: Logging,
+ materializer: ActorMaterializer): ArtifactStore[D] = {
require(config != null && config.isValid, "config is undefined or not valid")
require(
config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB",
@@ -43,6 +45,7 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
config.dbPort.toInt,
config.dbUsername,
config.dbPassword,
- name(config))
+ name(config),
+ useBatching)
}
}
diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index 45d6d28..6aa9515 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -164,14 +164,6 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
}
}
- def put[Wsuper >: W](db: ArtifactStore[Wsuper], docs: Seq[W])(
- implicit transid: TransactionId): Future[Seq[Either[DocumentConflictException, DocInfo]]] = {
- implicit val logger = db.logging
- implicit val ec = db.executionContext
-
- db.put(docs)
- }
-
def attach[Wsuper >: W](
db: ArtifactStore[Wsuper],
doc: DocInfo,
diff --git a/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala b/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala
index d03d3c1..b6e635a 100644
--- a/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala
@@ -17,14 +17,16 @@
package whisk.core.entity
-import whisk.core.entity.ArgNormalizer.trim
import scala.util.Try
-import spray.json.JsValue
-import spray.json.RootJsonFormat
+
+import spray.json.DefaultJsonProtocol
import spray.json.JsNull
import spray.json.JsString
+import spray.json.JsValue
+import spray.json.RootJsonFormat
import spray.json.deserializationError
-import spray.json.DefaultJsonProtocol
+
+import whisk.core.entity.ArgNormalizer.trim
/**
* A DocId is the document id === primary key in the datastore.
@@ -86,6 +88,22 @@ protected[core] case class DocInfo protected[entity] (id: DocId, rev: DocRevisio
}
}
+/**
+ * A BulkEntityResult is wrapping the fields that are returned for a single document on a bulk-put of several documents.
+ * http://docs.couchdb.org/en/2.1.0/api/database/bulk-api.html#post--db-_bulk_docs
+ *
+ * @param id the document id
+ * @param rev the document revision, optional; this is an opaque value determined by the datastore
+ * @param error the error, that occured on trying to put this document into CouchDB
+ * @param reason the error message that correspands to the error
+ */
+case class BulkEntityResult(id: String,
+ rev: DocRevision = DocRevision.empty,
+ error: Option[String],
+ reason: Option[String]) {
+ def toDocInfo = DocInfo(DocId(id), rev)
+}
+
protected[core] object DocId extends ArgNormalizer[DocId] {
/**
@@ -151,3 +169,7 @@ protected[core] object DocInfo extends DefaultJsonProtocol {
implicit val serdes = jsonFormat2(DocInfo.apply)
}
+
+object BulkEntityResult extends DefaultJsonProtocol {
+ implicit val serdes = jsonFormat4(BulkEntityResult.apply)
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
index e766d61..df8d1fe 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
@@ -24,6 +24,7 @@ import scala.language.postfixOps
import scala.util.Try
import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
import spray.json.JsObject
import spray.json.JsString
import spray.json.RootJsonFormat
@@ -97,7 +98,7 @@ object WhiskAuthStore {
dbPort -> null,
dbAuths -> null)
- def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
+ def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskAuth](config, _.dbAuths)
}
@@ -112,10 +113,10 @@ object WhiskEntityStore {
dbPort -> null,
dbWhisk -> null)
- def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
+ def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
SpiLoader
.get[ArtifactStoreProvider]
- .makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging)
+ .makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging, materializer)
}
@@ -130,8 +131,8 @@ object WhiskActivationStore {
dbPort -> null,
dbActivations -> null)
- def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
- SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskActivation](config, _.dbActivations)
+ def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
+ SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskActivation](config, _.dbActivations, true)
}
/**
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 2308913..76b9c3e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -22,8 +22,11 @@ import java.time.Instant
import scala.concurrent.Future
import scala.concurrent.duration._
-import scala.util.{Failure, Success}
+import scala.util.Failure
+import scala.util.Success
+
import org.apache.kafka.common.errors.RecordTooLargeException
+
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.actor.Props
@@ -44,7 +47,7 @@ import whisk.core.containerpool.ContainerPool
import whisk.core.containerpool.ContainerProxy
import whisk.core.containerpool.PrewarmingConfig
import whisk.core.containerpool.Run
-import whisk.core.database.{Batcher, DocumentConflictException, NoDocumentException}
+import whisk.core.database.NoDocumentException
import whisk.core.entity._
import whisk.core.entity.size._
import whisk.http.Messages
@@ -54,6 +57,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
implicit actorSystem: ActorSystem,
logging: Logging) {
+ implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec = actorSystem.dispatcher
implicit val cfg = config
@@ -123,23 +127,13 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
}
}
- implicit val materializer: ActorMaterializer = ActorMaterializer()
- // In worst case we need "maximumContainers" connections to load actions from the database. The remaining
- // connections can safely be used for writing activations.
- val maxOpenDbRequests = actorSystem.settings.config.getInt("akka.http.host-connection-pool.max-open-requests")
- val maxOpenActivationRequests = (maxOpenDbRequests - maximumContainers) max (maxOpenDbRequests / 2)
- val batcher: Batcher[WhiskActivation, Either[DocumentConflictException, DocInfo]] =
- new Batcher(500, maxOpenActivationRequests)(WhiskActivation.put(activationStore, _)(TransactionId.invoker))
-
/** Stores an activation in the database. */
val store = (tid: TransactionId, activation: WhiskActivation) => {
implicit val transid = tid
logging.info(this, "recording the activation result to the data store")
-
- batcher.put(activation).andThen {
- case Success(Right(_)) => logging.info(this, s"recorded activation")
- case Success(Left(t)) => logging.error(this, s"failed to record activation, $t")
- case Failure(t) => logging.error(this, s"failed to record activation, $t")
+ WhiskActivation.put(activationStore, activation)(tid, notifier = None).andThen {
+ case Success(id) => logging.info(this, s"recorded activation")
+ case Failure(t) => logging.error(this, s"failed to record activation")
}
}
diff --git a/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala b/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala
index b3fd9ce..977505f 100644
--- a/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala
+++ b/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala
@@ -26,6 +26,7 @@ import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfter
import org.scalatest.junit.JUnitRunner
+import akka.stream.ActorMaterializer
import common.TestHelpers
import common.TestUtils
import common.Wsk
@@ -33,7 +34,6 @@ import common.WskProps
import common.WskTestHelpers
import spray.json._
import spray.json.DefaultJsonProtocol.StringJsonFormat
-
import whisk.core.WhiskConfig
import whisk.core.database.test.DbUtils
import whisk.core.entity._
@@ -45,6 +45,7 @@ import whisk.core.entity.test.ExecHelpers
@RunWith(classOf[JUnitRunner])
class SequenceMigrationTests extends TestHelpers with BeforeAndAfter with DbUtils with ExecHelpers with WskTestHelpers {
+ implicit val matzerializer = ActorMaterializer()
implicit val wskprops = WskProps()
val wsk = new Wsk
val whiskConfig = new WhiskConfig(WhiskEntityStore.requiredProperties)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
index 2af35df..548c612 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
@@ -19,16 +19,16 @@ package whisk.core.controller.test
import java.time.Clock
import java.time.Instant
+
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
-import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Route
-
+import akka.stream.ActorMaterializer
import spray.json._
import spray.json.DefaultJsonProtocol._
-
import whisk.core.controller.WhiskActivationsApi
import whisk.core.database.ArtifactStoreProvider
import whisk.core.entity._
@@ -445,10 +445,10 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
}
it should "report proper error when record is corrupted on get" in {
-
+ implicit val materializer = ActorMaterializer()
val activationStore = SpiLoader
.get[ArtifactStoreProvider]
- .makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging)
+ .makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging, materializer)
implicit val tid = transid()
val entity = BadEntity(namespace, EntityName(ActivationId().toString))
put(activationStore, entity)
diff --git a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
index 3a289d7..f850f6b 100644
--- a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
@@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
+import akka.stream.ActorMaterializer
import common.StreamLogging
import common.WskActorSystem
import whisk.core.WhiskConfig
@@ -47,6 +48,7 @@ class DatastoreTests
with ExecHelpers
with StreamLogging {
+ implicit val materializer = ActorMaterializer()
val namespace = EntityPath("test namespace")
val config = new WhiskConfig(WhiskAuthStore.requiredProperties ++ WhiskEntityStore.requiredProperties)
val datastore = WhiskEntityStore.datastore(config)
diff --git a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala
index 5be4ac4..2347170 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala
@@ -30,6 +30,7 @@ import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
+import akka.stream.ActorMaterializer
import common.StreamLogging
import common.WskActorSystem
import whisk.core.WhiskConfig
@@ -68,6 +69,8 @@ class ViewTests
val creds2 = WhiskAuthHelpers.newAuth(Subject("t12345"))
val namespace2 = EntityPath(creds2.subject.asString)
+ implicit val materializer = ActorMaterializer()
+
val config = new WhiskConfig(WhiskEntityStore.requiredProperties ++ WhiskActivationStore.requiredProperties)
val entityStore = WhiskEntityStore.datastore(config)
val activationStore = WhiskActivationStore.datastore(config)
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].