You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/13 20:12:47 UTC

[GitHub] jasonpet closed pull request #3619: Provide an activation store SPI

jasonpet closed pull request #3619: Provide an activation store SPI
URL: https://github.com/apache/incubator-openwhisk/pull/3619
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 35cf4f09a5..a0e3a031f1 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -3,6 +3,7 @@
 
 whisk.spi {
   ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
+  ActivationStoreProvider = whisk.core.entity.ArtifactActivationStoreProvider
   MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
   ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider
   LogStoreProvider = whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala
index 85c59beb7b..c6b983cfc0 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala
@@ -35,7 +35,7 @@ import scala.util.{Failure, Success, Try}
  *
  * @param asString the activation id
  */
-protected[whisk] class ActivationId private (val asString: String) extends AnyVal {
+protected[whisk] case class ActivationId private (val asString: String) extends AnyVal {
   override def toString: String = asString
   def toJsObject: JsObject = JsObject("activationId" -> asString.toJson)
 }
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationStore.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationStore.scala
new file mode 100644
index 0000000000..ff8ec498da
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationStore.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.entity
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import spray.json.JsObject
+import whisk.common.{Logging, TransactionId}
+import whisk.core.database.CacheChangeNotification
+import whisk.spi.Spi
+
+import scala.concurrent.Future
+
+trait ActivationStore {
+
+  /**
+   * Stores an activation.
+   *
+   * @param activation activation to store
+   * @param transid transaction ID for request
+   * @param notifier cache change notifier
+   * @return Future containing DocInfo related to stored activation
+   */
+  def store(activation: WhiskActivation)(implicit transid: TransactionId,
+                                         notifier: Option[CacheChangeNotification]): Future[DocInfo]
+
+  /**
+   * Retrieves an activation corresponding to the specified activation ID.
+   *
+   * @param activationId ID of activation to retrieve
+   * @param transid transaction ID for request
+   * @return Future containing the retrieved WhiskActivation
+   */
+  def get(activationId: ActivationId)(implicit transid: TransactionId): Future[WhiskActivation]
+
+  /**
+   * Deletes an activation corresponding to the provided activation ID.
+   *
+   * @param activationId ID of activation to delete
+   * @param transid transaction ID for the request
+   * @param notifier cache change notifier
+   * @return Future containing a Boolean value indication whether the activation was deleted
+   */
+  def delete(activationId: ActivationId)(implicit transid: TransactionId,
+                                         notifier: Option[CacheChangeNotification]): Future[Boolean]
+
+  /**
+   * Counts the number of activations in a namespace.
+   *
+   * @param namespace namespace to query
+   * @param name entity name to query
+   * @param skip number of activations to skip
+   * @param since timestamp to retrieve activations after
+   * @param upto timestamp to retrieve activations before
+   * @param transid transaction ID for request
+   * @return Future containing number of activations returned from query in JSON format
+   */
+  def countActivationsInNamespace(namespace: EntityPath,
+                                  name: Option[EntityPath] = None,
+                                  skip: Int,
+                                  since: Option[Instant] = None,
+                                  upto: Option[Instant] = None)(implicit transid: TransactionId): Future[JsObject]
+
+  /**
+   * Returns activations corresponding to provided entity name.
+   *
+   * @param namespace namespace to query
+   * @param name entity name to query
+   * @param skip number of activations to skip
+   * @param limit maximum number of activations to list
+   * @param includeDocs return document with each activation
+   * @param since timestamp to retrieve activations after
+   * @param upto timestamp to retrieve activations before
+   * @param transid transaction ID for request
+   * @return When docs are not included, a Future containing a List of activations in JSON format is returned. When docs
+   *         are included, a List of WhiskActivation is returned.
+   */
+  def listActivationsMatchingName(namespace: EntityPath,
+                                  name: EntityPath,
+                                  skip: Int,
+                                  limit: Int,
+                                  includeDocs: Boolean = false,
+                                  since: Option[Instant] = None,
+                                  upto: Option[Instant] = None)(
+    implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]]
+
+  /**
+   * List all activations in a specified namespace.
+   *
+   * @param namespace namespace to query
+   * @param skip number of activations to skip
+   * @param limit maximum number of activations to list
+   * @param includeDocs return document with each activation
+   * @param since timestamp to retrieve activations after
+   * @param upto timestamp to retrieve activations before
+   * @param transid transaction ID for request
+   * @return When docs are not included, a Future containing a List of activations in JSON format is returned. When docs
+   *         are included, a List of WhiskActivation is returned.
+   */
+  def listActivationsInNamespace(namespace: EntityPath,
+                                 skip: Int,
+                                 limit: Int,
+                                 includeDocs: Boolean = false,
+                                 since: Option[Instant] = None,
+                                 upto: Option[Instant] = None)(
+    implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]]
+}
+
+trait ActivationStoreProvider extends Spi {
+  def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging): ActivationStore
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/ArtifactActivationStore.scala b/common/scala/src/main/scala/whisk/core/entity/ArtifactActivationStore.scala
new file mode 100644
index 0000000000..88bb92ef46
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/entity/ArtifactActivationStore.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.entity
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+
+import spray.json.JsObject
+
+import whisk.common.{Logging, TransactionId}
+import whisk.core.database.{ArtifactStore, CacheChangeNotification, StaleParameter}
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging)
+    extends ActivationStore {
+
+  implicit val executionContext = actorSystem.dispatcher
+
+  private val artifactStore: ArtifactStore[WhiskActivation] =
+    WhiskActivationStore.datastore()(actorSystem, logging, actorMaterializer)
+
+  def store(activation: WhiskActivation)(implicit transid: TransactionId,
+                                         notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+
+    logging.debug(this, s"recording activation '${activation.activationId}'")
+
+    val res = WhiskActivation.put(artifactStore, activation)
+
+    res onComplete {
+      case Success(id) => logging.debug(this, s"recorded activation")
+      case Failure(t) =>
+        logging.error(
+          this,
+          s"failed to record activation ${activation.activationId} with error ${t.getLocalizedMessage}")
+    }
+
+    res
+  }
+
+  def get(activationId: ActivationId)(implicit transid: TransactionId): Future[WhiskActivation] = {
+    WhiskActivation.get(artifactStore, DocId(activationId.asString))
+  }
+
+  /**
+   * Here there is added overhead of retrieving the specified activation before deleting it, so this method should not
+   * be used in production or performance related code.
+   */
+  def delete(activationId: ActivationId)(implicit transid: TransactionId,
+                                         notifier: Option[CacheChangeNotification]): Future[Boolean] = {
+    WhiskActivation.get(artifactStore, DocId(activationId.asString)) flatMap { doc =>
+      WhiskActivation.del(artifactStore, doc.docinfo)
+    }
+  }
+
+  def countActivationsInNamespace(namespace: EntityPath,
+                                  name: Option[EntityPath] = None,
+                                  skip: Int,
+                                  since: Option[Instant] = None,
+                                  upto: Option[Instant] = None)(implicit transid: TransactionId): Future[JsObject] = {
+    WhiskActivation.countCollectionInNamespace(
+      artifactStore,
+      name.map(p => namespace.addPath(p)).getOrElse(namespace),
+      skip,
+      since,
+      upto,
+      StaleParameter.UpdateAfter,
+      name.map(_ => WhiskActivation.filtersView).getOrElse(WhiskActivation.view))
+  }
+
+  def listActivationsMatchingName(namespace: EntityPath,
+                                  name: EntityPath,
+                                  skip: Int,
+                                  limit: Int,
+                                  includeDocs: Boolean = false,
+                                  since: Option[Instant] = None,
+                                  upto: Option[Instant] = None)(
+    implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+    WhiskActivation.listActivationsMatchingName(
+      artifactStore,
+      namespace,
+      name,
+      skip,
+      limit,
+      includeDocs,
+      since,
+      upto,
+      StaleParameter.UpdateAfter)
+  }
+
+  def listActivationsInNamespace(namespace: EntityPath,
+                                 skip: Int,
+                                 limit: Int,
+                                 includeDocs: Boolean = false,
+                                 since: Option[Instant] = None,
+                                 upto: Option[Instant] = None)(
+    implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+    WhiskActivation.listCollectionInNamespace(
+      artifactStore,
+      namespace,
+      skip,
+      limit,
+      includeDocs,
+      since,
+      upto,
+      StaleParameter.UpdateAfter)
+  }
+
+}
+
+object ArtifactActivationStoreProvider extends ActivationStoreProvider {
+  override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
+    new ArtifactActivationStore(actorSystem, actorMaterializer, logging)
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
index c5c2c32575..90e9070931 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
@@ -38,13 +38,11 @@ import whisk.core.database.DocumentSerializer
 import whisk.core.database.StaleParameter
 import whisk.spi.SpiLoader
 import pureconfig._
-
 import scala.reflect.classTag
 
 package object types {
   type AuthStore = ArtifactStore[WhiskAuth]
   type EntityStore = ArtifactStore[WhiskEntity]
-  type ActivationStore = ArtifactStore[WhiskActivation]
 }
 
 case class DBConfig(actionsDdoc: String, activationsDdoc: String, activationsFilterDdoc: String)
diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index 25be8fa4ee..1d6f4cf2a8 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -40,7 +40,6 @@ import whisk.core.database.CacheChangeNotification
 import whisk.core.database.NoDocumentException
 import whisk.core.entitlement._
 import whisk.core.entity._
-import whisk.core.entity.types.ActivationStore
 import whisk.core.entity.types.EntityStore
 import whisk.http.ErrorResponse.terminate
 import whisk.http.Messages
diff --git a/core/controller/src/main/scala/whisk/core/controller/Activations.scala b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
index 4a141fc7ff..cdeab42703 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
@@ -31,11 +31,9 @@ import spray.json.DefaultJsonProtocol.RootJsObjectFormat
 import whisk.common.TransactionId
 import whisk.core.containerpool.logging.LogStore
 import whisk.core.controller.RestApiCommons.{ListLimit, ListSkip}
-import whisk.core.database.StaleParameter
 import whisk.core.entitlement.Privilege.READ
 import whisk.core.entitlement.{Collection, Privilege, Resource}
 import whisk.core.entity._
-import whisk.core.entity.types.ActivationStore
 import whisk.http.ErrorResponse.terminate
 import whisk.http.Messages
 
@@ -159,40 +157,16 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
       'upto.as[Instant] ?) { (skip, limit, count, docs, name, since, upto) =>
       if (count && !docs) {
         countEntities {
-          WhiskActivation.countCollectionInNamespace(
-            activationStore,
-            name.flatten.map(p => namespace.addPath(p)).getOrElse(namespace),
-            skip.n,
-            since,
-            upto,
-            StaleParameter.UpdateAfter,
-            viewName = name.flatten.map(_ => WhiskActivation.filtersView).getOrElse(WhiskActivation.view))
+          activationStore.countActivationsInNamespace(namespace, name.flatten, skip.n, since, upto)
         }
       } else if (count && docs) {
         terminate(BadRequest, Messages.docsNotAllowedWithCount)
       } else {
         val activations = name.flatten match {
           case Some(action) =>
-            WhiskActivation.listActivationsMatchingName(
-              activationStore,
-              namespace,
-              action,
-              skip.n,
-              limit.n,
-              docs,
-              since,
-              upto,
-              StaleParameter.UpdateAfter)
+            activationStore.listActivationsMatchingName(namespace, action, skip.n, limit.n, docs, since, upto)
           case None =>
-            WhiskActivation.listCollectionInNamespace(
-              activationStore,
-              namespace,
-              skip.n,
-              limit.n,
-              docs,
-              since,
-              upto,
-              StaleParameter.UpdateAfter)
+            activationStore.listActivationsInNamespace(namespace, skip.n, limit.n, docs, since, upto)
         }
         listEntities(activations map (_.fold((js) => js, (wa) => wa.map(_.toExtendedJson))))
       }
@@ -212,7 +186,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
     val docid = DocId(WhiskEntity.qualifiedName(namespace, activationId))
     pathEndOrSingleSlash {
       getEntity(
-        WhiskActivation.get(activationStore, docid),
+        activationStore.get(ActivationId(docid.asString)),
         postProcess = Some((activation: WhiskActivation) => complete(activation.toExtendedJson)))
 
     } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(docid) } ~
@@ -229,7 +203,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
    */
   private def fetchResponse(docid: DocId)(implicit transid: TransactionId) = {
     getEntityAndProject(
-      WhiskActivation.get(activationStore, docid),
+      activationStore.get(ActivationId(docid.asString)),
       (activation: WhiskActivation) => Future.successful(activation.response.toExtendedJson))
   }
 
@@ -244,7 +218,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
   private def fetchLogs(user: Identity, docid: DocId)(implicit transid: TransactionId) = {
     extractRequest { request =>
       getEntityAndProject(
-        WhiskActivation.get(activationStore, docid),
+        activationStore.get(ActivationId(docid.asString)),
         (activation: WhiskActivation) => logStore.fetchLogs(user, activation, request).map(_.toJsonObject))
     }
   }
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 8096711b1b..a8b6a4e960 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -105,7 +105,6 @@ class Controller(val instance: InstanceId,
   // initialize datastores
   private implicit val authStore = WhiskAuthStore.datastore()
   private implicit val entityStore = WhiskEntityStore.datastore()
-  private implicit val activationStore = WhiskActivationStore.datastore()
   private implicit val cacheChangeNotification = Some(new CacheChangeNotification {
     val remoteCacheInvalidaton = new RemoteCacheInvalidation(whiskConfig, "controller", instance)
     override def apply(k: CacheKey) = {
@@ -123,6 +122,8 @@ class Controller(val instance: InstanceId,
     new LocalEntitlementProvider(whiskConfig, loadBalancer, instance)
   private implicit val activationIdFactory = new ActivationIdGenerator {}
   private implicit val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem)
+  private implicit val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
 
   // register collections
   Collection.initialize(entityStore)
diff --git a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
index b7f0438a62..67c431c32e 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
@@ -21,7 +21,7 @@ import java.time.{Clock, Instant}
 
 import scala.collection.immutable.Map
 import scala.concurrent.Future
-import scala.util.{Failure, Try}
+import scala.util.Try
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
@@ -42,7 +42,7 @@ import whisk.core.controller.RestApiCommons.{ListLimit, ListSkip}
 import whisk.core.database.CacheChangeNotification
 import whisk.core.entitlement.Collection
 import whisk.core.entity._
-import whisk.core.entity.types.{ActivationStore, EntityStore}
+import whisk.core.entity.types.EntityStore
 import whisk.http.ErrorResponse
 import whisk.http.Messages
 
@@ -163,14 +163,7 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
                   triggerActivation
               }
               .map { activation =>
-                logging.debug(
-                  this,
-                  s"[POST] trigger activated, writing activation record to datastore: $triggerActivationId")
-                WhiskActivation.put(activationStore, activation)
-              }
-              .andThen {
-                case Failure(t) =>
-                  logging.error(this, s"[POST] storing trigger activation $triggerActivationId failed: ${t.getMessage}")
+                activationStore.store(activation)
               }
             complete(Accepted, triggerActivationId.toJsObject)
           } else {
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index b7d7737c3b..28dbedfc55 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -29,7 +29,7 @@ import whisk.core.database.NoDocumentException
 import whisk.core.entitlement.{Resource, _}
 import whisk.core.entity._
 import whisk.core.entity.size.SizeInt
-import whisk.core.entity.types.{ActivationStore, EntityStore}
+import whisk.core.entity.types.EntityStore
 import whisk.http.Messages._
 import whisk.utils.ExecutionContextFactory.FutureExtensions
 
@@ -531,14 +531,7 @@ protected[actions] trait PrimitiveActions {
         sequenceLimits,
       duration = Some(session.duration))
 
-    logging.debug(this, s"recording activation '${activation.activationId}'")
-    WhiskActivation.put(activationStore, activation)(transid, notifier = None) onComplete {
-      case Success(id) => logging.debug(this, s"recorded activation")
-      case Failure(t) =>
-        logging.error(
-          this,
-          s"failed to record activation ${activation.activationId} with error ${t.getLocalizedMessage}")
-    }
+    activationStore.store(activation)(transid, notifier = None)
 
     activation
   }
@@ -596,7 +589,7 @@ protected[actions] trait PrimitiveActions {
                              maxRetries: Int = Int.MaxValue)(implicit transid: TransactionId): Unit = {
     if (!result.isCompleted && retries < maxRetries) {
       val schedule = actorSystem.scheduler.scheduleOnce(wait(retries)) {
-        WhiskActivation.get(activationStore, docid).onComplete {
+        activationStore.get(ActivationId(docid.asString)).onComplete {
           case Success(activation)             => result.trySuccess(Right(activation))
           case Failure(_: NoDocumentException) => pollActivation(docid, result, wait, retries + 1, maxRetries)
           case Failure(t: Throwable)           => result.tryFailure(t)
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index ae601f3533..4375ee872a 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -162,7 +162,7 @@ protected[actions] trait SequenceActions {
         (Right(seqActivation), accounting.atomicActionCnt)
       }
       .andThen {
-        case Success((Right(seqActivation), _)) => storeSequenceActivation(seqActivation)
+        case Success((Right(seqActivation), _)) => activationStore.store(seqActivation)(transid, notifier = None)
 
         // This should never happen; in this case, there is no activation record created or stored:
         // should there be?
@@ -170,20 +170,6 @@ protected[actions] trait SequenceActions {
       }
   }
 
-  /**
-   * Stores sequence activation to database.
-   */
-  private def storeSequenceActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
-    logging.debug(this, s"recording activation '${activation.activationId}'")
-    WhiskActivation.put(activationStore, activation)(transid, notifier = None) onComplete {
-      case Success(id) => logging.debug(this, s"recorded activation")
-      case Failure(t) =>
-        logging.error(
-          this,
-          s"failed to record activation ${activation.activationId} with error ${t.getLocalizedMessage}")
-    }
-  }
-
   /**
    * Creates an activation for a sequence.
    */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 20cbbd46cd..4d283820da 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -80,7 +80,9 @@ class InvokerReactive(
 
   /** Initialize needed databases */
   private val entityStore = WhiskEntityStore.datastore()
-  private val activationStore = WhiskActivationStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
   private val authStore = WhiskAuthStore.datastore()
 
   private val namespaceBlacklist = new NamespaceBlacklist(authStore)
@@ -159,11 +161,7 @@ class InvokerReactive(
   /** Stores an activation in the database. */
   private val store = (tid: TransactionId, activation: WhiskActivation) => {
     implicit val transid: TransactionId = tid
-    logging.debug(this, "recording the activation result to the data store")
-    WhiskActivation.put(activationStore, activation)(tid, notifier = None).andThen {
-      case Success(id) => logging.debug(this, s"recorded activation")
-      case Failure(t)  => logging.error(this, s"failed to record activation")
-    }
+    activationStore.store(activation)(tid, notifier = None)
   }
 
   /** Creates a ContainerProxy Actor when being called. */
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index 4842a11f01..18fc1250f0 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -1191,7 +1191,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     // storing the activation in the db will allow the db polling to retrieve it
     // the test harness makes sure the activation id observed by the test matches
     // the one generated by the api handler
-    put(activationStore, activation)
+    storeActivation(activation)
     try {
       Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
         status should be(OK)
@@ -1206,7 +1206,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
         response should be(activation.resultAsJson)
       }
     } finally {
-      deleteActivation(activation.docid)
+      deleteActivation(ActivationId(activation.docid.asString))
     }
   }
 
@@ -1300,9 +1300,9 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
       response = ActivationResponse.whiskError("test"))
     put(entityStore, action)
     // storing the activation in the db will allow the db polling to retrieve it
-    // the test harness makes sure the activaiton id observed by the test matches
+    // the test harness makes sure the activation id observed by the test matches
     // the one generated by the api handler
-    put(activationStore, activation)
+    storeActivation(activation)
     try {
       Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
         status should be(InternalServerError)
@@ -1310,7 +1310,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
         response should be(activation.withoutLogs.toExtendedJson)
       }
     } finally {
-      deleteActivation(activation.docid)
+      deleteActivation(ActivationId(activation.docid.asString))
     }
   }
 
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
index b9a8915296..4b1927e128 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
@@ -53,6 +53,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
   val creds = WhiskAuthHelpers.newIdentity()
   val namespace = EntityPath(creds.subject.asString)
   val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}"
+
   def aname() = MakeName.next("activations_tests")
 
   def checkCount(filter: String, expected: Int, user: Identity = creds) = {
@@ -72,7 +73,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
     implicit val tid = transid()
     // create two sets of activation records, and check that only one set is served back
     val creds1 = WhiskAuthHelpers.newAuth()
-    (1 to 2).map { i =>
+    val notExpectedActivations = (1 to 2).map { i =>
       WhiskActivation(
         EntityPath(creds1.subject.asString),
         aname(),
@@ -80,8 +81,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         ActivationId.generate(),
         start = Instant.now,
         end = Instant.now)
-    } foreach { put(entityStore, _) }
-
+    }
     val actionName = aname()
     val activations = (1 to 2).map { i =>
       WhiskActivation(
@@ -92,43 +92,50 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         start = Instant.now,
         end = Instant.now)
     }.toList
-    activations foreach { put(activationStore, _) }
-    waitOnView(activationStore, namespace.root, 2, WhiskActivation.view)
-    whisk.utils.retry {
-      Get(s"$collectionPath") ~> Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[List[JsObject]]
-        activations.length should be(response.length)
-        response should contain theSameElementsAs activations.map(_.summaryAsJson)
-        response forall { a =>
-          a.getFields("for") match {
-            case Seq(JsString(n)) => n == actionName.asString
-            case _                => false
+    try {
+      (notExpectedActivations ++ activations).foreach(storeActivation)
+      waitOnListActivationsInNamespace(namespace, 2)
+
+      whisk.utils.retry {
+        Get(s"$collectionPath") ~> Route.seal(routes(creds)) ~> check {
+          status should be(OK)
+          val response = responseAs[List[JsObject]]
+          activations.length should be(response.length)
+          response should contain theSameElementsAs activations.map(_.summaryAsJson)
+          response forall { a =>
+            a.getFields("for") match {
+              case Seq(JsString(n)) => n == actionName.asString
+              case _                => false
+            }
           }
         }
       }
-    }
 
-    // it should "list activations with explicit namespace owned by subject" in {
-    whisk.utils.retry {
-      Get(s"/$namespace/${collection.path}") ~> Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[List[JsObject]]
-        activations.length should be(response.length)
-        response should contain theSameElementsAs activations.map(_.summaryAsJson)
-        response forall { a =>
-          a.getFields("for") match {
-            case Seq(JsString(n)) => n == actionName.asString
-            case _                => false
+      // it should "list activations with explicit namespace owned by subject" in {
+      whisk.utils.retry {
+        Get(s"/$namespace/${collection.path}") ~> Route.seal(routes(creds)) ~> check {
+          status should be(OK)
+          val response = responseAs[List[JsObject]]
+          activations.length should be(response.length)
+          response should contain theSameElementsAs activations.map(_.summaryAsJson)
+          response forall { a =>
+            a.getFields("for") match {
+              case Seq(JsString(n)) => n == actionName.asString
+              case _                => false
+            }
           }
         }
       }
-    }
 
-    // it should "reject list activations with explicit namespace not owned by subject" in {
-    val auser = WhiskAuthHelpers.newIdentity()
-    Get(s"/$namespace/${collection.path}") ~> Route.seal(routes(auser)) ~> check {
-      status should be(Forbidden)
+      // it should "reject list activations with explicit namespace not owned by subject" in {
+      val auser = WhiskAuthHelpers.newIdentity()
+      Get(s"/$namespace/${collection.path}") ~> Route.seal(routes(auser)) ~> check {
+        status should be(Forbidden)
+      }
+
+    } finally {
+      (notExpectedActivations ++ activations).foreach(activation =>
+        deleteActivation(ActivationId(activation.docid.asString)))
     }
   }
 
@@ -147,7 +154,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
     implicit val tid = transid()
     // create two sets of activation records, and check that only one set is served back
     val creds1 = WhiskAuthHelpers.newAuth()
-    (1 to 2).map { i =>
+    val notExpectedActivations = (1 to 2).map { i =>
       WhiskActivation(
         EntityPath(creds1.subject.asString),
         aname(),
@@ -155,8 +162,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         ActivationId.generate(),
         start = Instant.now,
         end = Instant.now)
-    } foreach { put(entityStore, _) }
-
+    }
     val actionName = aname()
     val activations = (1 to 2).map { i =>
       WhiskActivation(
@@ -168,18 +174,23 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         end = Instant.now,
         response = ActivationResponse.success(Some(JsNumber(5))))
     }.toList
-    activations foreach { put(activationStore, _) }
-    waitOnView(activationStore, namespace.root, 2, WhiskActivation.view)
 
-    checkCount("", 2)
+    try {
+      (notExpectedActivations ++ activations).foreach(storeActivation)
+      waitOnListActivationsInNamespace(namespace, 2)
+      checkCount("", 2)
 
-    whisk.utils.retry {
-      Get(s"$collectionPath?docs=true") ~> Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[List[JsObject]]
-        activations.length should be(response.length)
-        response should contain theSameElementsAs activations.map(_.toExtendedJson)
+      whisk.utils.retry {
+        Get(s"$collectionPath?docs=true") ~> Route.seal(routes(creds)) ~> check {
+          status should be(OK)
+          val response = responseAs[List[JsObject]]
+          activations.length should be(response.length)
+          response should contain theSameElementsAs activations.map(_.toExtendedJson)
+        }
       }
+    } finally {
+      (notExpectedActivations ++ activations).foreach(activation =>
+        deleteActivation(ActivationId(activation.docid.asString)))
     }
   }
 
@@ -188,7 +199,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
     implicit val tid = transid()
     // create two sets of activation records, and check that only one set is served back
     val creds1 = WhiskAuthHelpers.newAuth()
-    (1 to 2).map { i =>
+    val notExpectedActivations = (1 to 2).map { i =>
       WhiskActivation(
         EntityPath(creds1.subject.asString),
         aname(),
@@ -196,13 +207,13 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         ActivationId.generate(),
         start = Instant.now,
         end = Instant.now)
-    } foreach { put(activationStore, _) }
+    }
 
     val actionName = aname()
     val now = Instant.now(Clock.systemUTC())
     val since = now.plusSeconds(10)
     val upto = now.plusSeconds(30)
-    implicit val activations = Seq(
+    val activations = Seq(
       WhiskActivation(
         namespace,
         actionName,
@@ -238,57 +249,64 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         ActivationId.generate(),
         start = now.plusSeconds(30),
         end = now.plusSeconds(30))) // should match
-    activations foreach { put(activationStore, _) }
-    waitOnView(activationStore, namespace.root, activations.length, WhiskActivation.view)
 
-    { // get between two time stamps
-      val filter = s"since=${since.toEpochMilli}&upto=${upto.toEpochMilli}"
-      val expected = activations.filter { e =>
-        (e.start.equals(since) || e.start.equals(upto) || (e.start.isAfter(since) && e.start.isBefore(upto)))
-      }
+    try {
+      (notExpectedActivations ++ activations).foreach(storeActivation)
+      waitOnListActivationsInNamespace(namespace, activations.length)
 
-      checkCount(filter, expected.length)
+      { // get between two time stamps
+        val filter = s"since=${since.toEpochMilli}&upto=${upto.toEpochMilli}"
+        val expected = activations.filter { e =>
+          (e.start.equals(since) || e.start.equals(upto) || (e.start.isAfter(since) && e.start.isBefore(upto)))
+        }
 
-      whisk.utils.retry {
-        Get(s"$collectionPath?docs=true&$filter") ~> Route.seal(routes(creds)) ~> check {
-          status should be(OK)
-          val response = responseAs[List[JsObject]]
-          expected.length should be(response.length)
-          response should contain theSameElementsAs expected.map(_.toExtendedJson)
+        checkCount(filter, expected.length)
+
+        whisk.utils.retry {
+          Get(s"$collectionPath?docs=true&$filter") ~> Route.seal(routes(creds)) ~> check {
+            status should be(OK)
+            val response = responseAs[List[JsObject]]
+            expected.length should be(response.length)
+            response should contain theSameElementsAs expected.map(_.toExtendedJson)
+          }
         }
       }
-    }
 
-    { // get 'upto' with no defined since value should return all activation 'upto'
-      val expected = activations.filter(e => e.start.equals(upto) || e.start.isBefore(upto))
-      val filter = s"upto=${upto.toEpochMilli}"
+      { // get 'upto' with no defined since value should return all activation 'upto'
+        val expected = activations.filter(e => e.start.equals(upto) || e.start.isBefore(upto))
+        val filter = s"upto=${upto.toEpochMilli}"
 
-      checkCount(filter, expected.length)
+        checkCount(filter, expected.length)
 
-      whisk.utils.retry {
-        Get(s"$collectionPath?docs=true&$filter") ~> Route.seal(routes(creds)) ~> check {
-          status should be(OK)
-          val response = responseAs[List[JsObject]]
-          expected.length should be(response.length)
-          response should contain theSameElementsAs expected.map(_.toExtendedJson)
+        whisk.utils.retry {
+          Get(s"$collectionPath?docs=true&$filter") ~> Route.seal(routes(creds)) ~> check {
+            status should be(OK)
+            val response = responseAs[List[JsObject]]
+            expected.length should be(response.length)
+            response should contain theSameElementsAs expected.map(_.toExtendedJson)
+          }
         }
       }
-    }
 
-    { // get 'since' with no defined upto value should return all activation 'since'
-      whisk.utils.retry {
-        val expected = activations.filter(e => e.start.equals(since) || e.start.isAfter(since))
-        val filter = s"since=${since.toEpochMilli}"
+      { // get 'since' with no defined upto value should return all activation 'since'
+        whisk.utils.retry {
+          val expected = activations.filter(e => e.start.equals(since) || e.start.isAfter(since))
+          val filter = s"since=${since.toEpochMilli}"
 
-        checkCount(filter, expected.length)
+          checkCount(filter, expected.length)
 
-        Get(s"$collectionPath?docs=true&$filter") ~> Route.seal(routes(creds)) ~> check {
-          status should be(OK)
-          val response = responseAs[List[JsObject]]
-          expected.length should be(response.length)
-          response should contain theSameElementsAs expected.map(_.toExtendedJson)
+          Get(s"$collectionPath?docs=true&$filter") ~> Route.seal(routes(creds)) ~> check {
+            status should be(OK)
+            val response = responseAs[List[JsObject]]
+            expected.length should be(response.length)
+            response should contain theSameElementsAs expected.map(_.toExtendedJson)
+          }
         }
       }
+
+    } finally {
+      (notExpectedActivations ++ activations).foreach(activation =>
+        deleteActivation(ActivationId(activation.docid.asString)))
     }
   }
 
@@ -312,7 +330,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
 
     // create two sets of activation records, and check that only one set is served back
     val creds1 = WhiskAuthHelpers.newAuth()
-    (1 to 2).map { i =>
+    val notExpectedActivations = (1 to 2).map { i =>
       WhiskActivation(
         EntityPath(creds1.subject.asString),
         aname(),
@@ -320,8 +338,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         ActivationId.generate(),
         start = Instant.now,
         end = Instant.now)
-    } foreach { put(activationStore, _) }
-
+    }
     val activations = (1 to 2).map { i =>
       WhiskActivation(
         namespace,
@@ -331,7 +348,6 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         start = Instant.now,
         end = Instant.now)
     }.toList
-    activations foreach { put(activationStore, _) }
 
     val activationsInPackage = (1 to 2).map { i =>
       WhiskActivation(
@@ -343,36 +359,36 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         end = Instant.now,
         annotations = Parameters("path", s"${namespace.asString}/pkg/xyz"))
     }.toList
-    activationsInPackage foreach { put(activationStore, _) }
-
-    waitOnView(activationStore, namespace.addPath(EntityName("xyz")), activations.length, WhiskActivation.filtersView)
-    waitOnView(
-      activationStore,
-      namespace.addPath(EntityName("pkg")).addPath(EntityName("xyz")),
-      activationsInPackage.length,
-      WhiskActivation.filtersView)
-
-    checkCount("name=xyz", activations.length)
+    try {
+      (notExpectedActivations ++ activations ++ activationsInPackage).foreach(storeActivation)
+      waitOnListActivationsMatchingName(namespace, EntityPath("xyz"), activations.length)
+      waitOnListActivationsMatchingName(namespace, EntityName("pkg").addPath(EntityName("xyz")), activations.length)
+      checkCount("name=xyz", activations.length)
 
-    whisk.utils.retry {
-      Get(s"$collectionPath?name=xyz") ~> Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[List[JsObject]]
-        activations.length should be(response.length)
-        response should contain theSameElementsAs activations.map(_.summaryAsJson)
+      whisk.utils.retry {
+        Get(s"$collectionPath?name=xyz") ~> Route.seal(routes(creds)) ~> check {
+          status should be(OK)
+          val response = responseAs[List[JsObject]]
+          activations.length should be(response.length)
+          response should contain theSameElementsAs activations.map(_.summaryAsJson)
+        }
       }
-    }
 
-    checkCount("name=pkg/xyz", activations.length)
+      checkCount("name=pkg/xyz", activations.length)
 
-    whisk.utils.retry {
-      Get(s"$collectionPath?name=pkg/xyz") ~> Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[List[JsObject]]
-        activationsInPackage.length should be(response.length)
-        response should contain theSameElementsAs activationsInPackage.map(_.summaryAsJson)
+      whisk.utils.retry {
+        Get(s"$collectionPath?name=pkg/xyz") ~> Route.seal(routes(creds)) ~> check {
+          status should be(OK)
+          val response = responseAs[List[JsObject]]
+          activationsInPackage.length should be(response.length)
+          response should contain theSameElementsAs activationsInPackage.map(_.summaryAsJson)
+        }
       }
+    } finally {
+      (notExpectedActivations ++ activations ++ activationsInPackage).foreach(activation =>
+        deleteActivation(ActivationId(activation.docid.asString)))
     }
+
   }
 
   it should "reject invalid query parameter combinations" in {
@@ -457,25 +473,29 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         ActivationId.generate(),
         start = Instant.now,
         end = Instant.now)
-    put(activationStore, activation)
+    try {
+      storeActivation(activation)
 
-    Get(s"$collectionPath/${activation.activationId.asString}") ~> Route.seal(routes(creds)) ~> check {
-      status should be(OK)
-      val response = responseAs[JsObject]
-      response should be(activation.toExtendedJson)
-    }
+      Get(s"$collectionPath/${activation.activationId.asString}") ~> Route.seal(routes(creds)) ~> check {
+        status should be(OK)
+        val response = responseAs[JsObject]
+        response should be(activation.toExtendedJson)
+      }
 
-    // it should "get activation by name in explicit namespace owned by subject" in
-    Get(s"/$namespace/${collection.path}/${activation.activationId.asString}") ~> Route.seal(routes(creds)) ~> check {
-      status should be(OK)
-      val response = responseAs[JsObject]
-      response should be(activation.toExtendedJson)
-    }
+      // it should "get activation by name in explicit namespace owned by subject" in
+      Get(s"/$namespace/${collection.path}/${activation.activationId.asString}") ~> Route.seal(routes(creds)) ~> check {
+        status should be(OK)
+        val response = responseAs[JsObject]
+        response should be(activation.toExtendedJson)
+      }
 
-    // it should "reject get activation by name in explicit namespace not owned by subject" in
-    val auser = WhiskAuthHelpers.newIdentity()
-    Get(s"/$namespace/${collection.path}/${activation.activationId.asString}") ~> Route.seal(routes(auser)) ~> check {
-      status should be(Forbidden)
+      // it should "reject get activation by name in explicit namespace not owned by subject" in
+      val auser = WhiskAuthHelpers.newIdentity()
+      Get(s"/$namespace/${collection.path}/${activation.activationId.asString}") ~> Route.seal(routes(auser)) ~> check {
+        status should be(Forbidden)
+      }
+    } finally {
+      deleteActivation(ActivationId(activation.docid.asString))
     }
   }
 
@@ -490,12 +510,16 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         ActivationId.generate(),
         start = Instant.now,
         end = Instant.now)
-    put(activationStore, activation)
+    try {
+      storeActivation(activation)
 
-    Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check {
-      status should be(OK)
-      val response = responseAs[JsObject]
-      response should be(activation.response.toExtendedJson)
+      Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check {
+        status should be(OK)
+        val response = responseAs[JsObject]
+        response should be(activation.response.toExtendedJson)
+      }
+    } finally {
+      deleteActivation(ActivationId(activation.docid.asString))
     }
   }
 
@@ -510,12 +534,16 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         ActivationId.generate(),
         start = Instant.now,
         end = Instant.now)
-    put(activationStore, activation)
+    try {
+      storeActivation(activation)
 
-    Get(s"$collectionPath/${activation.activationId.asString}/logs") ~> Route.seal(routes(creds)) ~> check {
-      status should be(OK)
-      val response = responseAs[JsObject]
-      response should be(activation.logs.toJsonObject)
+      Get(s"$collectionPath/${activation.activationId.asString}/logs") ~> Route.seal(routes(creds)) ~> check {
+        status should be(OK)
+        val response = responseAs[JsObject]
+        response should be(activation.logs.toJsonObject)
+      }
+    } finally {
+      deleteActivation(ActivationId(activation.docid.asString))
     }
   }
 
@@ -530,10 +558,14 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
         ActivationId.generate(),
         start = Instant.now,
         end = Instant.now)
-    put(entityStore, activation)
+    storeActivation(activation)
+    try {
 
-    Get(s"$collectionPath/${activation.activationId.asString}/bogus") ~> Route.seal(routes(creds)) ~> check {
-      status should be(NotFound)
+      Get(s"$collectionPath/${activation.activationId.asString}/bogus") ~> Route.seal(routes(creds)) ~> check {
+        status should be(NotFound)
+      }
+    } finally {
+      deleteActivation(ActivationId(activation.docid.asString))
     }
   }
 
@@ -599,7 +631,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
 
     val activation =
       new BadActivation(namespace, aname(), creds.subject, ActivationId.generate(), Instant.now, Instant.now)
-    put(activationStore, activation)
+    storeActivation(activation)
 
     Get(s"$collectionPath/${activation.activationId}") ~> Route.seal(routes(creds)) ~> check {
       status should be(InternalServerError)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index fdceb1411e..0d0b96f77c 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -19,7 +19,7 @@ package whisk.core.controller.test
 
 import scala.concurrent.{Await, Future}
 import scala.concurrent.ExecutionContext
-import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
 import scala.language.postfixOps
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.BeforeAndAfterAll
@@ -28,8 +28,7 @@ import org.scalatest.Matchers
 import common.StreamLogging
 import akka.http.scaladsl.testkit.ScalatestRouteTest
 import akka.http.scaladsl.testkit.RouteTestTimeout
-import spray.json.DefaultJsonProtocol
-import spray.json.JsString
+import spray.json._
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.connector.ActivationMessage
@@ -87,9 +86,9 @@ protected trait ControllerTestCommon
   }
 
   val entityStore = WhiskEntityStore.datastore()
-  val activationStore = WhiskActivationStore.datastore()
   val authStore = WhiskAuthStore.datastore()
   val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem)
+  val activationStore = SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
 
   def deleteAction(doc: DocId)(implicit transid: TransactionId) = {
     Await.result(WhiskAction.get(entityStore, doc) flatMap { doc =>
@@ -98,11 +97,64 @@ protected trait ControllerTestCommon
     }, dbOpTimeout)
   }
 
-  def deleteActivation(doc: DocId)(implicit transid: TransactionId) = {
-    Await.result(WhiskActivation.get(activationStore, doc) flatMap { doc =>
-      logging.debug(this, s"deleting ${doc.docinfo}")
-      WhiskActivation.del(activationStore, doc.docinfo)
-    }, dbOpTimeout)
+  def getActivation(activationId: ActivationId)(implicit transid: TransactionId,
+                                                timeout: Duration = 10 seconds): WhiskActivation = {
+    Await.result(activationStore.get(activationId), timeout)
+  }
+
+  def storeActivation(activation: WhiskActivation)(implicit transid: TransactionId,
+                                                   timeout: Duration = 10 seconds): DocInfo = {
+    val docFuture = activationStore.store(activation)
+    val doc = Await.result(docFuture, timeout)
+    assert(doc != null)
+    doc
+  }
+
+  def deleteActivation(activationId: ActivationId)(implicit transid: TransactionId) = {
+    val res = Await.result(activationStore.delete(activationId), dbOpTimeout)
+    assert(res, true)
+    res
+  }
+
+  def waitOnListActivationsInNamespace(namespace: EntityPath, count: Int)(implicit context: ExecutionContext,
+                                                                          transid: TransactionId,
+                                                                          timeout: Duration) = {
+    val success = retry(
+      () => {
+        val activations: Future[Either[List[JsObject], List[WhiskActivation]]] =
+          activationStore.listActivationsInNamespace(namespace, 0, 0)
+        val listFuture: Future[List[JsObject]] = activations map (_.fold((js) => js, (wa) => wa.map(_.toExtendedJson)))
+
+        listFuture map { l =>
+          if (l.length != count) {
+            throw RetryOp()
+          } else true
+        }
+      },
+      timeout)
+
+    assert(success.isSuccess, "wait aborted")
+  }
+
+  def waitOnListActivationsMatchingName(namespace: EntityPath, name: EntityPath, count: Int)(
+    implicit context: ExecutionContext,
+    transid: TransactionId,
+    timeout: Duration) = {
+    val success = retry(
+      () => {
+        val activations: Future[Either[List[JsObject], List[WhiskActivation]]] =
+          activationStore.listActivationsMatchingName(namespace, name, 0, 0)
+        val listFuture: Future[List[JsObject]] = activations map (_.fold((js) => js, (wa) => wa.map(_.toExtendedJson)))
+
+        listFuture map { l =>
+          if (l.length != count) {
+            throw RetryOp()
+          } else true
+        }
+      },
+      timeout)
+
+    assert(success.isSuccess, "wait aborted")
   }
 
   def deleteTrigger(doc: DocId)(implicit transid: TransactionId) = {
@@ -152,7 +204,6 @@ protected trait ControllerTestCommon
   override def afterAll() = {
     println("Shutting down db connections");
     entityStore.shutdown()
-    activationStore.shutdown()
     authStore.shutdown()
   }
 
diff --git a/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala
index aceae91910..cfdda7ebea 100644
--- a/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala
@@ -372,8 +372,9 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
       val activationDoc = DocId(WhiskEntity.qualifiedName(namespace, activationId))
       whisk.utils.retry({
         println(s"trying to obtain async activation doc: '${activationDoc}'")
-        val activation = get(activationStore, activationDoc, WhiskActivation, garbageCollect = false)
-        del(activationStore, activationDoc, WhiskActivation)
+
+        val activation = getActivation(ActivationId(activationDoc.asString))
+        deleteActivation(ActivationId(activationDoc.asString))
         activation.end should be(Instant.EPOCH)
         activation.response.result should be(Some(content))
       }, 30, Some(1.second))
@@ -395,7 +396,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
       val activationDoc = DocId(WhiskEntity.qualifiedName(namespace, activationId))
       whisk.utils.retry({
         println(s"trying to delete async activation doc: '${activationDoc}'")
-        del(activationStore, activationDoc, WhiskActivation)
+        deleteActivation(ActivationId(activationDoc.asString))
         response.fields("activationId") should not be None
       }, 30, Some(1.second))
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services