You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2017/10/30 16:37:50 UTC

[incubator-openwhisk] branch master updated: Use batcher in CouchDbRestStore. (#2835)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ea11ad4  Use batcher in CouchDbRestStore. (#2835)
ea11ad4 is described below

commit ea11ad42523a02130d565d46c44ed0fbb1f1aaee
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Mon Oct 30 17:37:47 2017 +0100

    Use batcher in CouchDbRestStore. (#2835)
---
 .../scala/whisk/core/database/ArtifactStore.scala  | 11 ---
 .../core/database/ArtifactStoreExceptions.scala    |  2 +
 .../core/database/ArtifactStoreProvider.scala      |  8 +-
 .../whisk/core/database/CouchDbRestStore.scala     | 92 ++++++++++++++--------
 .../whisk/core/database/CouchDbStoreProvider.scala |  9 ++-
 .../whisk/core/database/DocumentFactory.scala      |  8 --
 .../src/main/scala/whisk/core/entity/DocInfo.scala | 30 ++++++-
 .../main/scala/whisk/core/entity/WhiskStore.scala  | 11 +--
 .../scala/whisk/core/invoker/InvokerReactive.scala | 24 +++---
 .../core/cli/test/SequenceMigrationTests.scala     |  3 +-
 .../core/controller/test/ActivationsApiTests.scala | 10 +--
 .../whisk/core/entity/test/DatastoreTests.scala    |  2 +
 .../scala/whisk/core/entity/test/ViewTests.scala   |  3 +
 13 files changed, 127 insertions(+), 86 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
index dbae8e5..2a61730 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
@@ -55,17 +55,6 @@ trait ArtifactStore[DocumentAbstraction] {
   protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo]
 
   /**
-   * Puts (saves) documents to database in bulk using a future.
-   * If the operation is successful, the future completes with DocId else an appropriate exception.
-   *
-   * @param ds the documents to put in the database
-   * @param transid the transaction id for logging
-   * @return a future that completes either with DocId
-   */
-  protected[database] def put(ds: Seq[DocumentAbstraction])(
-    implicit transid: TransactionId): Future[Seq[Either[DocumentConflictException, DocInfo]]]
-
-  /**
    * Deletes document from database using a future.
    * If the operation is successful, the future completes with true.
    *
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala
index f949328..f8e7ec5 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala
@@ -26,3 +26,5 @@ case class DocumentConflictException(message: String) extends ArtifactStoreExcep
 case class DocumentTypeMismatchException(message: String) extends ArtifactStoreException(message)
 
 case class DocumentUnreadable(message: String) extends ArtifactStoreException(message)
+
+case class PutException(message: String) extends ArtifactStoreException(message)
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
index ba471aa..2080d47 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
@@ -18,6 +18,7 @@
 package whisk.core.database
 
 import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
 import spray.json.RootJsonFormat
 import whisk.common.Logging
 import whisk.core.WhiskConfig
@@ -27,8 +28,11 @@ import whisk.spi.Spi
  * An Spi for providing ArtifactStore implementations
  */
 trait ArtifactStoreProvider extends Spi {
-  def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)(
+  def makeStore[D <: DocumentSerializer](config: WhiskConfig,
+                                         name: WhiskConfig => String,
+                                         useBatching: Boolean = false)(
     implicit jsonFormat: RootJsonFormat[D],
     actorSystem: ActorSystem,
-    logging: Logging): ArtifactStore[D]
+    logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D]
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 90ede7e..0e66aee 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -20,22 +20,23 @@ package whisk.core.database
 import scala.concurrent.Await
 import scala.concurrent.Future
 import scala.concurrent.duration._
+
 import akka.actor.ActorSystem
 import akka.event.Logging.ErrorLevel
 import akka.http.scaladsl.model._
+import akka.stream.ActorMaterializer
 import akka.stream.scaladsl._
 import akka.util.ByteString
 import spray.json._
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
+import whisk.core.entity.BulkEntityResult
 import whisk.core.entity.DocInfo
 import whisk.core.entity.DocRevision
 import whisk.core.entity.WhiskDocument
 import whisk.http.Messages
 
-import scala.util.{Failure, Success, Try}
-
 /**
  * Basic client to put and delete artifacts in a data store.
  *
@@ -47,13 +48,17 @@ import scala.util.{Failure, Success, Try}
  * @param dbName the name of the database to operate on
  * @param serializerEvidence confirms the document abstraction is serializable to a Document with an id
  */
-class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](
-  dbProtocol: String,
-  dbHost: String,
-  dbPort: Int,
-  dbUsername: String,
-  dbPassword: String,
-  dbName: String)(implicit system: ActorSystem, val logging: Logging, jsonFormat: RootJsonFormat[DocumentAbstraction])
+class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: String,
+                                                                  dbHost: String,
+                                                                  dbPort: Int,
+                                                                  dbUsername: String,
+                                                                  dbPassword: String,
+                                                                  dbName: String,
+                                                                  useBatching: Boolean = false)(
+  implicit system: ActorSystem,
+  val logging: Logging,
+  jsonFormat: RootJsonFormat[DocumentAbstraction],
+  materializer: ActorMaterializer)
     extends ArtifactStore[DocumentAbstraction]
     with DefaultJsonProtocol {
 
@@ -62,6 +67,13 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](
   private val client: CouchDbRestClient =
     new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, dbName)
 
+  // This the the amount of allowed parallel requests for each entity, before batching starts. If there are already maxOpenDbRequests
+  // and more documents need to be stored, then all arriving documents will be put into batches (if enabled) to avoid a long queue.
+  private val maxOpenDbRequests = system.settings.config.getInt("akka.http.host-connection-pool.max-open-requests") / 2
+
+  private val batcher: Batcher[JsObject, Either[ArtifactStoreException, DocInfo]] =
+    new Batcher(500, maxOpenDbRequests)(put(_)(TransactionId.unknown))
+
   override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
     val asJson = d.toDocumentRecord
 
@@ -70,29 +82,42 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](
     require(!id.isEmpty, "document id must be defined")
 
     val docinfoStr = s"id: $id, rev: ${rev.getOrElse("null")}"
-
     val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$dbName' saving document: '${docinfoStr}'")
 
-    val request: CouchDbRestClient => Future[Either[StatusCode, JsObject]] = rev match {
-      case Some(r) =>
-        client =>
-          client.putDoc(id, r, asJson)
-      case None =>
-        client =>
-          client.putDoc(id, asJson)
-    }
+    val f = if (useBatching) {
+      batcher.put(asJson).map { e =>
+        e match {
+          case Right(response) =>
+            transid.finished(this, start, s"[PUT] '$dbName' completed document: '${docinfoStr}', response: '$response'")
+            response
 
-    val f = request(client).map { e =>
-      e match {
+          case Left(e: DocumentConflictException) =>
+            transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; conflict.")
+            // For compatibility.
+            throw DocumentConflictException("conflict on 'put'")
+
+          case Left(e: ArtifactStoreException) =>
+            transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; ${e.getMessage}.")
+            throw PutException("error on 'put'")
+        }
+      }
+    } else {
+      val request: CouchDbRestClient => Future[Either[StatusCode, JsObject]] = rev match {
+        case Some(r) =>
+          client =>
+            client.putDoc(id, r, asJson)
+        case None =>
+          client =>
+            client.putDoc(id, asJson)
+      }
+      request(client).map {
         case Right(response) =>
           transid.finished(this, start, s"[PUT] '$dbName' completed document: '${docinfoStr}', response: '$response'")
           response.convertTo[DocInfo]
-
         case Left(StatusCodes.Conflict) =>
           transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; conflict.")
           // For compatibility.
           throw DocumentConflictException("conflict on 'put'")
-
         case Left(code) =>
           transid.failed(
             this,
@@ -109,22 +134,25 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](
         transid.failed(this, start, s"[PUT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel))
   }
 
-  override protected[database] def put(ds: Seq[DocumentAbstraction])(
-    implicit transid: TransactionId): Future[Seq[Either[DocumentConflictException, DocInfo]]] = {
+  private def put(ds: Seq[JsObject])(
+    implicit transid: TransactionId): Future[Seq[Either[ArtifactStoreException, DocInfo]]] = {
     val count = ds.size
     val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'$dbName' saving $count documents")
 
-    val request: Future[Either[StatusCode, JsArray]] = client.putDocs(ds.map(_.toDocumentRecord))
-
-    val f = request.map {
+    val f = client.putDocs(ds).map {
       _ match {
         case Right(response) =>
           transid.finished(this, start, s"'$dbName' completed $count documents")
-          response.convertTo[Seq[JsValue]].map { result =>
-            Try(result.convertTo[DocInfo]) match {
-              case Success(info) => Right(info)
-              case Failure(_)    => Left(DocumentConflictException("conflict on 'bulk_put'"))
-            }
+
+          response.convertTo[Seq[BulkEntityResult]].map { singleResult =>
+            singleResult.error
+              .map {
+                case "conflict" => Left(DocumentConflictException("conflict on 'bulk_put'"))
+                case e          => Left(PutException(s"Unexpected $e: ${singleResult.reason.getOrElse("")} on 'bulk_put'"))
+              }
+              .getOrElse {
+                Right(singleResult.toDocInfo)
+              }
           }
 
         case Left(code) =>
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
index e3713d8..d2c08dd 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
@@ -18,16 +18,18 @@
 package whisk.core.database
 
 import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
 import spray.json.RootJsonFormat
 import whisk.common.Logging
 import whisk.core.WhiskConfig
 
 object CouchDbStoreProvider extends ArtifactStoreProvider {
 
-  def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)(
+  def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String, useBatching: Boolean)(
     implicit jsonFormat: RootJsonFormat[D],
     actorSystem: ActorSystem,
-    logging: Logging): ArtifactStore[D] = {
+    logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = {
     require(config != null && config.isValid, "config is undefined or not valid")
     require(
       config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB",
@@ -43,6 +45,7 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
       config.dbPort.toInt,
       config.dbUsername,
       config.dbPassword,
-      name(config))
+      name(config),
+      useBatching)
   }
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index 45d6d28..6aa9515 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -164,14 +164,6 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
     }
   }
 
-  def put[Wsuper >: W](db: ArtifactStore[Wsuper], docs: Seq[W])(
-    implicit transid: TransactionId): Future[Seq[Either[DocumentConflictException, DocInfo]]] = {
-    implicit val logger = db.logging
-    implicit val ec = db.executionContext
-
-    db.put(docs)
-  }
-
   def attach[Wsuper >: W](
     db: ArtifactStore[Wsuper],
     doc: DocInfo,
diff --git a/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala b/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala
index d03d3c1..b6e635a 100644
--- a/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala
@@ -17,14 +17,16 @@
 
 package whisk.core.entity
 
-import whisk.core.entity.ArgNormalizer.trim
 import scala.util.Try
-import spray.json.JsValue
-import spray.json.RootJsonFormat
+
+import spray.json.DefaultJsonProtocol
 import spray.json.JsNull
 import spray.json.JsString
+import spray.json.JsValue
+import spray.json.RootJsonFormat
 import spray.json.deserializationError
-import spray.json.DefaultJsonProtocol
+
+import whisk.core.entity.ArgNormalizer.trim
 
 /**
  * A DocId is the document id === primary key in the datastore.
@@ -86,6 +88,22 @@ protected[core] case class DocInfo protected[entity] (id: DocId, rev: DocRevisio
   }
 }
 
+/**
+ * A BulkEntityResult is wrapping the fields that are returned for a single document on a bulk-put of several documents.
+ * http://docs.couchdb.org/en/2.1.0/api/database/bulk-api.html#post--db-_bulk_docs
+ *
+ * @param id the document id
+ * @param rev the document revision, optional; this is an opaque value determined by the datastore
+ * @param error the error, that occured on trying to put this document into CouchDB
+ * @param reason the error message that correspands to the error
+ */
+case class BulkEntityResult(id: String,
+                            rev: DocRevision = DocRevision.empty,
+                            error: Option[String],
+                            reason: Option[String]) {
+  def toDocInfo = DocInfo(DocId(id), rev)
+}
+
 protected[core] object DocId extends ArgNormalizer[DocId] {
 
   /**
@@ -151,3 +169,7 @@ protected[core] object DocInfo extends DefaultJsonProtocol {
 
   implicit val serdes = jsonFormat2(DocInfo.apply)
 }
+
+object BulkEntityResult extends DefaultJsonProtocol {
+  implicit val serdes = jsonFormat4(BulkEntityResult.apply)
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
index e766d61..df8d1fe 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
@@ -24,6 +24,7 @@ import scala.language.postfixOps
 import scala.util.Try
 
 import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
 import spray.json.JsObject
 import spray.json.JsString
 import spray.json.RootJsonFormat
@@ -97,7 +98,7 @@ object WhiskAuthStore {
       dbPort -> null,
       dbAuths -> null)
 
-  def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
+  def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
     SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskAuth](config, _.dbAuths)
 }
 
@@ -112,10 +113,10 @@ object WhiskEntityStore {
       dbPort -> null,
       dbWhisk -> null)
 
-  def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
+  def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
     SpiLoader
       .get[ArtifactStoreProvider]
-      .makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging)
+      .makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging, materializer)
 
 }
 
@@ -130,8 +131,8 @@ object WhiskActivationStore {
       dbPort -> null,
       dbActivations -> null)
 
-  def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
-    SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskActivation](config, _.dbActivations)
+  def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
+    SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskActivation](config, _.dbActivations, true)
 }
 
 /**
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 2308913..76b9c3e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -22,8 +22,11 @@ import java.time.Instant
 
 import scala.concurrent.Future
 import scala.concurrent.duration._
-import scala.util.{Failure, Success}
+import scala.util.Failure
+import scala.util.Success
+
 import org.apache.kafka.common.errors.RecordTooLargeException
+
 import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
 import akka.actor.Props
@@ -44,7 +47,7 @@ import whisk.core.containerpool.ContainerPool
 import whisk.core.containerpool.ContainerProxy
 import whisk.core.containerpool.PrewarmingConfig
 import whisk.core.containerpool.Run
-import whisk.core.database.{Batcher, DocumentConflictException, NoDocumentException}
+import whisk.core.database.NoDocumentException
 import whisk.core.entity._
 import whisk.core.entity.size._
 import whisk.http.Messages
@@ -54,6 +57,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
   implicit actorSystem: ActorSystem,
   logging: Logging) {
 
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
   implicit val ec = actorSystem.dispatcher
   implicit val cfg = config
 
@@ -123,23 +127,13 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
     }
   }
 
-  implicit val materializer: ActorMaterializer = ActorMaterializer()
-  // In worst case we need "maximumContainers" connections to load actions from the database. The remaining
-  // connections can safely be used for writing activations.
-  val maxOpenDbRequests = actorSystem.settings.config.getInt("akka.http.host-connection-pool.max-open-requests")
-  val maxOpenActivationRequests = (maxOpenDbRequests - maximumContainers) max (maxOpenDbRequests / 2)
-  val batcher: Batcher[WhiskActivation, Either[DocumentConflictException, DocInfo]] =
-    new Batcher(500, maxOpenActivationRequests)(WhiskActivation.put(activationStore, _)(TransactionId.invoker))
-
   /** Stores an activation in the database. */
   val store = (tid: TransactionId, activation: WhiskActivation) => {
     implicit val transid = tid
     logging.info(this, "recording the activation result to the data store")
-
-    batcher.put(activation).andThen {
-      case Success(Right(_)) => logging.info(this, s"recorded activation")
-      case Success(Left(t))  => logging.error(this, s"failed to record activation, $t")
-      case Failure(t)        => logging.error(this, s"failed to record activation, $t")
+    WhiskActivation.put(activationStore, activation)(tid, notifier = None).andThen {
+      case Success(id) => logging.info(this, s"recorded activation")
+      case Failure(t)  => logging.error(this, s"failed to record activation")
     }
   }
 
diff --git a/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala b/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala
index b3fd9ce..977505f 100644
--- a/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala
+++ b/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala
@@ -26,6 +26,7 @@ import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfter
 import org.scalatest.junit.JUnitRunner
 
+import akka.stream.ActorMaterializer
 import common.TestHelpers
 import common.TestUtils
 import common.Wsk
@@ -33,7 +34,6 @@ import common.WskProps
 import common.WskTestHelpers
 import spray.json._
 import spray.json.DefaultJsonProtocol.StringJsonFormat
-
 import whisk.core.WhiskConfig
 import whisk.core.database.test.DbUtils
 import whisk.core.entity._
@@ -45,6 +45,7 @@ import whisk.core.entity.test.ExecHelpers
 @RunWith(classOf[JUnitRunner])
 class SequenceMigrationTests extends TestHelpers with BeforeAndAfter with DbUtils with ExecHelpers with WskTestHelpers {
 
+  implicit val matzerializer = ActorMaterializer()
   implicit val wskprops = WskProps()
   val wsk = new Wsk
   val whiskConfig = new WhiskConfig(WhiskEntityStore.requiredProperties)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
index 2af35df..548c612 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
@@ -19,16 +19,16 @@ package whisk.core.controller.test
 
 import java.time.Clock
 import java.time.Instant
+
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 
-import akka.http.scaladsl.model.StatusCodes._
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.StatusCodes._
 import akka.http.scaladsl.server.Route
-
+import akka.stream.ActorMaterializer
 import spray.json._
 import spray.json.DefaultJsonProtocol._
-
 import whisk.core.controller.WhiskActivationsApi
 import whisk.core.database.ArtifactStoreProvider
 import whisk.core.entity._
@@ -445,10 +445,10 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
   }
 
   it should "report proper error when record is corrupted on get" in {
-
+    implicit val materializer = ActorMaterializer()
     val activationStore = SpiLoader
       .get[ArtifactStoreProvider]
-      .makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging)
+      .makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging, materializer)
     implicit val tid = transid()
     val entity = BadEntity(namespace, EntityName(ActivationId().toString))
     put(activationStore, entity)
diff --git a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
index 3a289d7..f850f6b 100644
--- a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
@@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 
+import akka.stream.ActorMaterializer
 import common.StreamLogging
 import common.WskActorSystem
 import whisk.core.WhiskConfig
@@ -47,6 +48,7 @@ class DatastoreTests
     with ExecHelpers
     with StreamLogging {
 
+  implicit val materializer = ActorMaterializer()
   val namespace = EntityPath("test namespace")
   val config = new WhiskConfig(WhiskAuthStore.requiredProperties ++ WhiskEntityStore.requiredProperties)
   val datastore = WhiskEntityStore.datastore(config)
diff --git a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala
index 5be4ac4..2347170 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala
@@ -30,6 +30,7 @@ import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
 
+import akka.stream.ActorMaterializer
 import common.StreamLogging
 import common.WskActorSystem
 import whisk.core.WhiskConfig
@@ -68,6 +69,8 @@ class ViewTests
   val creds2 = WhiskAuthHelpers.newAuth(Subject("t12345"))
   val namespace2 = EntityPath(creds2.subject.asString)
 
+  implicit val materializer = ActorMaterializer()
+
   val config = new WhiskConfig(WhiskEntityStore.requiredProperties ++ WhiskActivationStore.requiredProperties)
   val entityStore = WhiskEntityStore.datastore(config)
   val activationStore = WhiskActivationStore.datastore(config)

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].