You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/08/20 16:33:39 UTC
[incubator-openwhisk] branch master updated: Allow for activation
store to accept user and request information (#3798)
This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 8327cd0 Allow for activation store to accept user and request information (#3798)
8327cd0 is described below
commit 8327cd06224a43b5cb91c8cbb13703d64f58026f
Author: James Dubee <jw...@us.ibm.com>
AuthorDate: Mon Aug 20 12:33:35 2018 -0400
Allow for activation store to accept user and request information (#3798)
* Allow for activation store to accept user and request information
* Make user and request non-optional parameters
* Introduce UserContext to activation store
* Update doc for new parameters
---
.../logging/DockerToActivationLogStore.scala | 4 +-
.../logging/ElasticSearchLogStore.scala | 7 +-
.../containerpool/logging/LogDriverLogStore.scala | 5 +-
.../core/containerpool/logging/LogStore.scala | 4 +-
.../containerpool/logging/SplunkLogStore.scala | 4 +-
.../whisk/core/database/ActivationStore.scala | 56 +++++++++-----
.../core/database/ArtifactActivationStore.scala | 48 +++++++-----
.../scala/whisk/core/controller/Activations.scala | 59 +++++++-------
.../scala/whisk/core/controller/Triggers.scala | 89 ++++++++++++----------
.../core/controller/actions/PrimitiveActions.scala | 16 ++--
.../core/controller/actions/SequenceActions.scala | 6 +-
.../whisk/core/containerpool/ContainerProxy.scala | 12 ++-
.../scala/whisk/core/invoker/InvokerReactive.scala | 8 +-
.../logging/ElasticSearchLogStoreTests.scala | 15 ++--
.../logging/SplunkLogStoreTests.scala | 9 ++-
.../containerpool/test/ContainerProxyTests.scala | 3 +-
.../core/controller/test/ActionsApiTests.scala | 10 ++-
.../core/controller/test/ActivationsApiTests.scala | 50 ++++++------
.../controller/test/ControllerTestCommon.scala | 33 ++++----
.../core/controller/test/TriggersApiTests.scala | 9 ++-
20 files changed, 253 insertions(+), 194 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 520e51e..f0ab49f 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -19,7 +19,6 @@ package whisk.core.containerpool.logging
import akka.NotUsed
import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpRequest
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Flow
@@ -29,6 +28,7 @@ import whisk.common.TransactionId
import whisk.core.containerpool.Container
import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
import whisk.http.Messages
+import whisk.core.database.UserContext
import scala.concurrent.{ExecutionContext, Future}
@@ -67,7 +67,7 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
override val containerParameters = Map("--log-driver" -> Set("json-file"))
/* As logs are already part of the activation record, just return that bit of it */
- override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] =
+ override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] =
Future.successful(activation.logs)
override def collectLogs(transid: TransactionId,
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
index b404a24..58da7fa 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
@@ -26,6 +26,7 @@ import akka.http.scaladsl.model._
import whisk.core.entity.{ActivationLogs, Identity, WhiskActivation}
import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
import whisk.core.ConfigKeys
+import whisk.core.database.UserContext
import scala.concurrent.{Future, Promise}
import scala.util.Try
@@ -99,12 +100,12 @@ class ElasticSearchLogStore(
private def generatePath(user: Identity) = elasticSearchConfig.path.format(user.namespace.uuid.asString)
- override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = {
- val headers = extractRequiredHeaders(request.headers)
+ override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] = {
+ val headers = extractRequiredHeaders(context.request.headers)
// Return logs from ElasticSearch, or return logs from activation if required headers are not present
if (headers.length == elasticSearchConfig.requiredHeaders.length) {
- esClient.search[EsSearchResult](generatePath(user), generatePayload(activation), headers).flatMap {
+ esClient.search[EsSearchResult](generatePath(context.user), generatePayload(activation), headers).flatMap {
case Right(queryResult) =>
Future.successful(transcribeLogs(queryResult))
case Left(code) =>
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
index efbbb80..daa3acb 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -18,12 +18,11 @@
package whisk.core.containerpool.logging
import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpRequest
-import whisk.core.entity.Identity
import whisk.common.TransactionId
import whisk.core.containerpool.Container
import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
+import whisk.core.database.UserContext
import scala.concurrent.Future
@@ -49,7 +48,7 @@ class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore {
/** no logs exposed to API/CLI using only the LogDriverLogStore; use an extended version,
* e.g. the SplunkLogStore to expose logs from some external source */
- def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] =
+ def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] =
Future.successful(ActivationLogs(Vector("Logs are not available.")))
}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
index 2bc3f98..29433d9 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
@@ -18,12 +18,12 @@
package whisk.core.containerpool.logging
import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpRequest
import whisk.common.TransactionId
import whisk.core.containerpool.Container
import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
import whisk.spi.Spi
+import whisk.core.database.UserContext
import scala.concurrent.Future
@@ -76,7 +76,7 @@ trait LogStore {
* @param activation activation to fetch the logs for
* @return the relevant logs
*/
- def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs]
+ def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs]
}
trait LogStoreProvider extends Spi {
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
index 5baef46..1cd93b5 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
@@ -53,7 +53,7 @@ import whisk.common.AkkaLogging
import whisk.core.ConfigKeys
import whisk.core.entity.ActivationLogs
import whisk.core.entity.WhiskActivation
-import whisk.core.entity.Identity
+import whisk.core.database.UserContext
case class SplunkLogStoreConfig(host: String,
port: Int,
@@ -98,7 +98,7 @@ class SplunkLogStore(
Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true))))
else Http().defaultClientHttpsContext)
- override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = {
+ override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] = {
//example curl request:
// curl -u username:password -k https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | search activation_id=a930e5ae4ad4455c8f2505d665aad282 | table log_message" -d "earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
diff --git a/common/scala/src/main/scala/whisk/core/database/ActivationStore.scala b/common/scala/src/main/scala/whisk/core/database/ActivationStore.scala
index 106f57f..9a821c9 100644
--- a/common/scala/src/main/scala/whisk/core/database/ActivationStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ActivationStore.scala
@@ -21,6 +21,7 @@ import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
+import akka.http.scaladsl.model.HttpRequest
import spray.json.JsObject
import whisk.common.{Logging, TransactionId}
import whisk.core.entity._
@@ -28,38 +29,45 @@ import whisk.spi.Spi
import scala.concurrent.Future
+case class UserContext(user: Identity, request: HttpRequest = HttpRequest())
+
trait ActivationStore {
/**
* Stores an activation.
*
* @param activation activation to store
+ * @param context user and request context
* @param transid transaction ID for request
* @param notifier cache change notifier
* @return Future containing DocInfo related to stored activation
*/
- def store(activation: WhiskActivation)(implicit transid: TransactionId,
- notifier: Option[CacheChangeNotification]): Future[DocInfo]
+ def store(activation: WhiskActivation, context: UserContext)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[DocInfo]
/**
* Retrieves an activation corresponding to the specified activation ID.
*
* @param activationId ID of activation to retrieve
+ * @param context user and request context
* @param transid transaction ID for request
* @return Future containing the retrieved WhiskActivation
*/
- def get(activationId: ActivationId)(implicit transid: TransactionId): Future[WhiskActivation]
+ def get(activationId: ActivationId, context: UserContext)(implicit transid: TransactionId): Future[WhiskActivation]
/**
* Deletes an activation corresponding to the provided activation ID.
*
* @param activationId ID of activation to delete
+ * @param context user and request context
* @param transid transaction ID for the request
* @param notifier cache change notifier
* @return Future containing a Boolean value indication whether the activation was deleted
*/
- def delete(activationId: ActivationId)(implicit transid: TransactionId,
- notifier: Option[CacheChangeNotification]): Future[Boolean]
+ def delete(activationId: ActivationId, context: UserContext)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[Boolean]
/**
* Counts the number of activations in a namespace.
@@ -69,6 +77,7 @@ trait ActivationStore {
* @param skip number of activations to skip
* @param since timestamp to retrieve activations after
* @param upto timestamp to retrieve activations before
+ * @param context user and request context
* @param transid transaction ID for request
* @return Future containing number of activations returned from query in JSON format
*/
@@ -76,7 +85,8 @@ trait ActivationStore {
name: Option[EntityPath] = None,
skip: Int,
since: Option[Instant] = None,
- upto: Option[Instant] = None)(implicit transid: TransactionId): Future[JsObject]
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[JsObject]
/**
* Returns activations corresponding to provided entity name.
@@ -88,18 +98,20 @@ trait ActivationStore {
* @param includeDocs return document with each activation
* @param since timestamp to retrieve activations after
* @param upto timestamp to retrieve activations before
+ * @param context user and request context
* @param transid transaction ID for request
* @return When docs are not included, a Future containing a List of activations in JSON format is returned. When docs
* are included, a List of WhiskActivation is returned.
*/
- def listActivationsMatchingName(namespace: EntityPath,
- name: EntityPath,
- skip: Int,
- limit: Int,
- includeDocs: Boolean = false,
- since: Option[Instant] = None,
- upto: Option[Instant] = None)(
- implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]]
+ def listActivationsMatchingName(
+ namespace: EntityPath,
+ name: EntityPath,
+ skip: Int,
+ limit: Int,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]]
/**
* List all activations in a specified namespace.
@@ -110,17 +122,19 @@ trait ActivationStore {
* @param includeDocs return document with each activation
* @param since timestamp to retrieve activations after
* @param upto timestamp to retrieve activations before
+ * @param context user and request context
* @param transid transaction ID for request
* @return When docs are not included, a Future containing a List of activations in JSON format is returned. When docs
* are included, a List of WhiskActivation is returned.
*/
- def listActivationsInNamespace(namespace: EntityPath,
- skip: Int,
- limit: Int,
- includeDocs: Boolean = false,
- since: Option[Instant] = None,
- upto: Option[Instant] = None)(
- implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]]
+ def listActivationsInNamespace(
+ namespace: EntityPath,
+ skip: Int,
+ limit: Int,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]]
}
trait ActivationStoreProvider extends Spi {
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactActivationStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactActivationStore.scala
index 15da34b..d6f5ad5 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactActivationStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactActivationStore.scala
@@ -36,8 +36,9 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor
private val artifactStore: ArtifactStore[WhiskActivation] =
WhiskActivationStore.datastore()(actorSystem, logging, actorMaterializer)
- def store(activation: WhiskActivation)(implicit transid: TransactionId,
- notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+ def store(activation: WhiskActivation, context: UserContext)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
logging.debug(this, s"recording activation '${activation.activationId}'")
@@ -54,7 +55,8 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor
res
}
- def get(activationId: ActivationId)(implicit transid: TransactionId): Future[WhiskActivation] = {
+ def get(activationId: ActivationId, context: UserContext)(
+ implicit transid: TransactionId): Future[WhiskActivation] = {
WhiskActivation.get(artifactStore, DocId(activationId.asString))
}
@@ -62,8 +64,9 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor
* Here there is added overhead of retrieving the specified activation before deleting it, so this method should not
* be used in production or performance related code.
*/
- def delete(activationId: ActivationId)(implicit transid: TransactionId,
- notifier: Option[CacheChangeNotification]): Future[Boolean] = {
+ def delete(activationId: ActivationId, context: UserContext)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[Boolean] = {
WhiskActivation.get(artifactStore, DocId(activationId.asString)) flatMap { doc =>
WhiskActivation.del(artifactStore, doc.docinfo)
}
@@ -73,7 +76,8 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor
name: Option[EntityPath] = None,
skip: Int,
since: Option[Instant] = None,
- upto: Option[Instant] = None)(implicit transid: TransactionId): Future[JsObject] = {
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[JsObject] = {
WhiskActivation.countCollectionInNamespace(
artifactStore,
name.map(p => namespace.addPath(p)).getOrElse(namespace),
@@ -84,14 +88,15 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor
name.map(_ => WhiskActivation.filtersView).getOrElse(WhiskActivation.view))
}
- def listActivationsMatchingName(namespace: EntityPath,
- name: EntityPath,
- skip: Int,
- limit: Int,
- includeDocs: Boolean = false,
- since: Option[Instant] = None,
- upto: Option[Instant] = None)(
- implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+ def listActivationsMatchingName(
+ namespace: EntityPath,
+ name: EntityPath,
+ skip: Int,
+ limit: Int,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
WhiskActivation.listActivationsMatchingName(
artifactStore,
namespace,
@@ -104,13 +109,14 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor
StaleParameter.UpdateAfter)
}
- def listActivationsInNamespace(namespace: EntityPath,
- skip: Int,
- limit: Int,
- includeDocs: Boolean = false,
- since: Option[Instant] = None,
- upto: Option[Instant] = None)(
- implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+ def listActivationsInNamespace(
+ namespace: EntityPath,
+ skip: Int,
+ limit: Int,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
WhiskActivation.listCollectionInNamespace(
artifactStore,
namespace,
diff --git a/core/controller/src/main/scala/whisk/core/controller/Activations.scala b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
index fb4ab54..82ec062 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
@@ -36,6 +36,7 @@ import whisk.core.entitlement.{Collection, Privilege, Resource}
import whisk.core.entity._
import whisk.http.ErrorResponse.terminate
import whisk.http.Messages
+import whisk.core.database.UserContext
object WhiskActivationsApi {
@@ -119,18 +120,21 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
/** Dispatches resource to the proper handler depending on context. */
protected override def dispatchOp(user: Identity, op: Privilege, resource: Resource)(
implicit transid: TransactionId) = {
-
- resource.entity.flatMap(e => ActivationId.parse(e).toOption) match {
- case Some(aid) =>
- op match {
- case READ => fetch(user, resource.namespace, aid)
- case _ => reject // should not get here
- }
- case None =>
- op match {
- case READ => list(resource.namespace)
- case _ => reject // should not get here
- }
+ extractRequest { request =>
+ val context = UserContext(user, request)
+
+ resource.entity.flatMap(e => ActivationId.parse(e).toOption) match {
+ case Some(aid) =>
+ op match {
+ case READ => fetch(context, resource.namespace, aid)
+ case _ => reject // should not get here
+ }
+ case None =>
+ op match {
+ case READ => list(context, resource.namespace)
+ case _ => reject // should not get here
+ }
+ }
}
}
@@ -141,7 +145,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
* - 200 [] or [WhiskActivation as JSON]
* - 500 Internal Server Error
*/
- private def list(namespace: EntityPath)(implicit transid: TransactionId) = {
+ private def list(context: UserContext, namespace: EntityPath)(implicit transid: TransactionId) = {
import WhiskActivationsApi.stringToRestrictedEntityPath
import WhiskActivationsApi.stringToInstantDeserializer
import WhiskActivationsApi.stringToListLimit
@@ -157,16 +161,16 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
'upto.as[Instant] ?) { (skip, limit, count, docs, name, since, upto) =>
if (count && !docs) {
countEntities {
- activationStore.countActivationsInNamespace(namespace, name.flatten, skip.n, since, upto)
+ activationStore.countActivationsInNamespace(namespace, name.flatten, skip.n, since, upto, context)
}
} else if (count && docs) {
terminate(BadRequest, Messages.docsNotAllowedWithCount)
} else {
val activations = name.flatten match {
case Some(action) =>
- activationStore.listActivationsMatchingName(namespace, action, skip.n, limit.n, docs, since, upto)
+ activationStore.listActivationsMatchingName(namespace, action, skip.n, limit.n, docs, since, upto, context)
case None =>
- activationStore.listActivationsInNamespace(namespace, skip.n, limit.n, docs, since, upto)
+ activationStore.listActivationsInNamespace(namespace, skip.n, limit.n, docs, since, upto, context)
}
listEntities(activations map (_.fold((js) => js, (wa) => wa.map(_.toExtendedJson))))
}
@@ -181,16 +185,15 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
* - 404 Not Found
* - 500 Internal Server Error
*/
- private def fetch(user: Identity, namespace: EntityPath, activationId: ActivationId)(
+ private def fetch(context: UserContext, namespace: EntityPath, activationId: ActivationId)(
implicit transid: TransactionId) = {
val docid = DocId(WhiskEntity.qualifiedName(namespace, activationId))
pathEndOrSingleSlash {
getEntity(
- activationStore.get(ActivationId(docid.asString)),
+ activationStore.get(ActivationId(docid.asString), context),
postProcess = Some((activation: WhiskActivation) => complete(activation.toExtendedJson)))
-
- } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(docid) } ~
- (pathPrefix(logsPath) & pathEnd) { fetchLogs(user, docid) }
+ } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(context, docid) } ~
+ (pathPrefix(logsPath) & pathEnd) { fetchLogs(context, docid) }
}
/**
@@ -201,9 +204,9 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
* - 404 Not Found
* - 500 Internal Server Error
*/
- private def fetchResponse(docid: DocId)(implicit transid: TransactionId) = {
+ private def fetchResponse(context: UserContext, docid: DocId)(implicit transid: TransactionId) = {
getEntityAndProject(
- activationStore.get(ActivationId(docid.asString)),
+ activationStore.get(ActivationId(docid.asString), context),
(activation: WhiskActivation) => Future.successful(activation.response.toExtendedJson))
}
@@ -215,11 +218,9 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
* - 404 Not Found
* - 500 Internal Server Error
*/
- private def fetchLogs(user: Identity, docid: DocId)(implicit transid: TransactionId) = {
- extractRequest { request =>
- getEntityAndProject(
- activationStore.get(ActivationId(docid.asString)),
- (activation: WhiskActivation) => logStore.fetchLogs(user, activation, request).map(_.toJsonObject))
- }
+ private def fetchLogs(context: UserContext, docid: DocId)(implicit transid: TransactionId) = {
+ getEntityAndProject(
+ activationStore.get(ActivationId(docid.asString), context),
+ (activation: WhiskActivation) => logStore.fetchLogs(activation, context).map(_.toJsonObject))
}
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
index d293aba..a4d2ecf 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
@@ -45,6 +45,7 @@ import whisk.core.entity._
import whisk.core.entity.types.EntityStore
import whisk.http.ErrorResponse
import whisk.http.Messages
+import whisk.core.database.UserContext
/** A trait implementing the triggers API. */
trait WhiskTriggersApi extends WhiskCollectionAPI {
@@ -132,48 +133,52 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
*/
override def activate(user: Identity, entityName: FullyQualifiedEntityName, env: Option[Parameters])(
implicit transid: TransactionId) = {
- entity(as[Option[JsObject]]) { payload =>
- getEntity(WhiskTrigger.get(entityStore, entityName.toDocId), Some {
- trigger: WhiskTrigger =>
- val triggerActivationId = activationIdFactory.make()
- logging.info(this, s"[POST] trigger activation id: ${triggerActivationId}")
- val triggerActivation = WhiskActivation(
- namespace = user.namespace.name.toPath, // all activations should end up in the one space regardless trigger.namespace,
- entityName.name,
- user.subject,
- triggerActivationId,
- Instant.now(Clock.systemUTC()),
- Instant.EPOCH,
- response = ActivationResponse.success(payload orElse Some(JsObject.empty)),
- version = trigger.version,
- duration = None)
-
- // List of active rules associated with the trigger
- val activeRules: Map[FullyQualifiedEntityName, ReducedRule] =
- trigger.rules.map(_.filter(_._2.status == Status.ACTIVE)).getOrElse(Map.empty)
-
- if (activeRules.nonEmpty) {
- val args: JsObject = trigger.parameters.merge(payload).getOrElse(JsObject.empty)
-
- activateRules(user, args, trigger.rules.getOrElse(Map.empty))
- .map(results => triggerActivation.withLogs(ActivationLogs(results.map(_.toJson.compactPrint).toVector)))
- .recover {
- case e =>
- logging.error(this, s"Failed to write action activation results to trigger activation: $e")
- triggerActivation
- }
- .map { activation =>
- activationStore.store(activation)
- }
- complete(Accepted, triggerActivationId.toJsObject)
- } else {
- logging
- .debug(
- this,
- s"[POST] trigger without an active rule was activated; no trigger activation record created for $entityName")
- complete(NoContent)
- }
- })
+ extractRequest { request =>
+ val context = UserContext(user, request)
+
+ entity(as[Option[JsObject]]) { payload =>
+ getEntity(WhiskTrigger.get(entityStore, entityName.toDocId), Some {
+ trigger: WhiskTrigger =>
+ val triggerActivationId = activationIdFactory.make()
+ logging.info(this, s"[POST] trigger activation id: ${triggerActivationId}")
+ val triggerActivation = WhiskActivation(
+ namespace = user.namespace.name.toPath, // all activations should end up in the one space regardless trigger.namespace,
+ entityName.name,
+ user.subject,
+ triggerActivationId,
+ Instant.now(Clock.systemUTC()),
+ Instant.EPOCH,
+ response = ActivationResponse.success(payload orElse Some(JsObject.empty)),
+ version = trigger.version,
+ duration = None)
+
+ // List of active rules associated with the trigger
+ val activeRules: Map[FullyQualifiedEntityName, ReducedRule] =
+ trigger.rules.map(_.filter(_._2.status == Status.ACTIVE)).getOrElse(Map.empty)
+
+ if (activeRules.nonEmpty) {
+ val args: JsObject = trigger.parameters.merge(payload).getOrElse(JsObject.empty)
+
+ activateRules(user, args, trigger.rules.getOrElse(Map.empty))
+ .map(results => triggerActivation.withLogs(ActivationLogs(results.map(_.toJson.compactPrint).toVector)))
+ .recover {
+ case e =>
+ logging.error(this, s"Failed to write action activation results to trigger activation: $e")
+ triggerActivation
+ }
+ .map { activation =>
+ activationStore.store(activation, context)
+ }
+ complete(Accepted, triggerActivationId.toJsObject)
+ } else {
+ logging
+ .debug(
+ this,
+ s"[POST] trigger without an active rule was activated; no trigger activation record created for $entityName")
+ complete(NoContent)
+ }
+ })
+ }
}
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index c2ab52d..4a0b0ec 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -34,6 +34,7 @@ import whisk.core.entity.size.SizeInt
import whisk.core.entity.types.EntityStore
import whisk.http.Messages._
import whisk.utils.ExecutionContextFactory.FutureExtensions
+import whisk.core.database.UserContext
import scala.collection.mutable.Buffer
import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -517,6 +518,8 @@ protected[actions] trait PrimitiveActions {
private def completeActivation(user: Identity, session: Session, response: ActivationResponse)(
implicit transid: TransactionId): WhiskActivation = {
+ val context = UserContext(user)
+
// compute max memory
val sequenceLimits = Parameters(
WhiskActivation.limitsAnnotation,
@@ -550,7 +553,7 @@ protected[actions] trait PrimitiveActions {
sequenceLimits,
duration = Some(session.duration))
- activationStore.store(activation)(transid, notifier = None)
+ activationStore.store(activation, context)(transid, notifier = None)
activation
}
@@ -569,8 +572,8 @@ protected[actions] trait PrimitiveActions {
totalWaitTime: FiniteDuration,
activeAckResponse: Future[Either[ActivationId, WhiskActivation]])(
implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
+ val context = UserContext(user)
val result = Promise[Either[ActivationId, WhiskActivation]]
-
val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.name.toPath, activationId))
logging.debug(this, s"action activation will block for result upto $totalWaitTime")
@@ -578,11 +581,11 @@ protected[actions] trait PrimitiveActions {
// in case of an incomplete active-ack (record too large for example).
activeAckResponse.foreach {
case Right(activation) => result.trySuccess(Right(activation))
- case _ => pollActivation(docid, result, i => 1.seconds + (2.seconds * i), maxRetries = 4)
+ case _ => pollActivation(docid, context, result, i => 1.seconds + (2.seconds * i), maxRetries = 4)
}
// 2. Poll the database slowly in case the active-ack never arrives
- pollActivation(docid, result, _ => 15.seconds)
+ pollActivation(docid, context, result, _ => 15.seconds)
// 3. Timeout forces a fallback to activationId
val timeout = actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId)))
@@ -602,15 +605,16 @@ protected[actions] trait PrimitiveActions {
* @param result promise to resolve on result. Is also used to abort polling once completed.
*/
private def pollActivation(docid: DocId,
+ context: UserContext,
result: Promise[Either[ActivationId, WhiskActivation]],
wait: Int => FiniteDuration,
retries: Int = 0,
maxRetries: Int = Int.MaxValue)(implicit transid: TransactionId): Unit = {
if (!result.isCompleted && retries < maxRetries) {
val schedule = actorSystem.scheduler.scheduleOnce(wait(retries)) {
- activationStore.get(ActivationId(docid.asString)).onComplete {
+ activationStore.get(ActivationId(docid.asString), context).onComplete {
case Success(activation) => result.trySuccess(Right(activation))
- case Failure(_: NoDocumentException) => pollActivation(docid, result, wait, retries + 1, maxRetries)
+ case Failure(_: NoDocumentException) => pollActivation(docid, context, result, wait, retries + 1, maxRetries)
case Failure(t: Throwable) => result.tryFailure(t)
}
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index cd6c1ea..d38a91a 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -40,6 +40,7 @@ import whisk.core.entity.size.SizeInt
import whisk.core.entity.types._
import whisk.http.Messages._
import whisk.utils.ExecutionContextFactory.FutureExtensions
+import whisk.core.database.UserContext
protected[actions] trait SequenceActions {
/** The core collections require backend services to be injected in this trait. */
@@ -150,6 +151,8 @@ protected[actions] trait SequenceActions {
start: Instant,
cause: Option[ActivationId])(
implicit transid: TransactionId): Future[(Right[ActivationId, WhiskActivation], Int)] = {
+ val context = UserContext(user)
+
// not topmost, no need to worry about terminating incoming request
// Note: the future for the sequence result recovers from all throwable failures
futureSeqResult
@@ -161,7 +164,8 @@ protected[actions] trait SequenceActions {
(Right(seqActivation), accounting.atomicActionCnt)
}
.andThen {
- case Success((Right(seqActivation), _)) => activationStore.store(seqActivation)(transid, notifier = None)
+ case Success((Right(seqActivation), _)) =>
+ activationStore.store(seqActivation, context)(transid, notifier = None)
// This should never happen; in this case, there is no activation record created or stored:
// should there be?
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index d04dfb3..92eda45 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -40,6 +40,7 @@ import whisk.http.Messages
import akka.event.Logging.InfoLevel
import pureconfig.loadConfigOrThrow
import whisk.core.ConfigKeys
+import whisk.core.database.UserContext
// States
sealed trait ContainerState
@@ -96,7 +97,7 @@ case object RescheduleJob // job is sent back to parent and could not be process
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
- storeActivation: (TransactionId, WhiskActivation) => Future[Any],
+ storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
poolConfig: ContainerPoolConfig,
@@ -156,6 +157,7 @@ class ContainerProxy(
case BlackboxStartupError(msg) => ActivationResponse.applicationError(msg)
case _ => ActivationResponse.whiskError(Messages.resourceProvisionError)
}
+ val context = UserContext(job.msg.user)
// construct an appropriate activation and record it in the datastore,
// also update the feed and active ack; the container cleanup is queued
// implicitly via a FailureMessage which will be processed later when the state
@@ -167,7 +169,7 @@ class ContainerProxy(
job.msg.blocking,
job.msg.rootControllerIndex,
job.msg.user.namespace.uuid)
- storeActivation(transid, activation)
+ storeActivation(transid, activation, context)
}
.flatMap { container =>
// now attempt to inject the user code and run the action
@@ -391,6 +393,8 @@ class ContainerProxy(
activation.foreach(
sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid))
+ val context = UserContext(job.msg.user)
+
// Adds logs to the raw activation.
val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
.flatMap { activation =>
@@ -415,7 +419,7 @@ class ContainerProxy(
}
// Storing the record. Entirely asynchronous and not waited upon.
- activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _))
+ activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _, context))
// Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
activationWithLogs.flatMap {
@@ -432,7 +436,7 @@ object ContainerProxy {
def props(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
- store: (TransactionId, WhiskActivation) => Future[Any],
+ store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
poolConfig: ContainerPoolConfig,
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index de9b4bc..634e416 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -36,6 +36,7 @@ import whisk.core.database._
import whisk.core.entity._
import whisk.http.Messages
import whisk.spi.SpiLoader
+import whisk.core.database.UserContext
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
@@ -160,9 +161,9 @@ class InvokerReactive(
}
/** Stores an activation in the database. */
- private val store = (tid: TransactionId, activation: WhiskActivation) => {
+ private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
implicit val transid: TransactionId = tid
- activationStore.store(activation)(tid, notifier = None)
+ activationStore.store(activation, context)(tid, notifier = None)
}
/** Creates a ContainerProxy Actor when being called. */
@@ -236,10 +237,11 @@ class InvokerReactive(
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
}
+ val context = UserContext(msg.user)
val activation = generateFallbackActivation(msg, response)
activationFeed ! MessageFeed.Processed
ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid)
- store(msg.transid, activation)
+ store(msg.transid, activation, context)
Future.successful(())
}
} else {
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
index 8930a43..0d32b32 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
@@ -36,6 +36,7 @@ import pureconfig.error.ConfigReaderException
import spray.json._
import whisk.core.entity._
import whisk.core.entity.size._
+import whisk.core.database.UserContext
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
@@ -88,6 +89,7 @@ class ElasticSearchLogStoreTests
HttpEntity(ContentTypes.`application/json`, defaultPayload))
private val defaultLogStoreHttpRequest =
HttpRequest(method = GET, uri = "https://some.url", entity = HttpEntity.Empty)
+ private val defaultContext = UserContext(user, defaultLogStoreHttpRequest)
private val expectedLogs = ActivationLogs(
Vector("2018-03-05T02:10:38.196689522Z stdout: some log stuff", "2018-03-05T02:10:38.196754258Z stdout: more logs"))
@@ -127,7 +129,7 @@ class ElasticSearchLogStoreTests
Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
elasticSearchConfig = defaultConfig)
- await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs
+ await(esLogStore.fetchLogs(activation.withoutLogs, defaultContext)) shouldBe expectedLogs
}
it should "get logs from supplied activation record when required headers are not present" in {
@@ -137,7 +139,7 @@ class ElasticSearchLogStoreTests
Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
elasticSearchConfig = defaultConfigRequiredHeaders)
- await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest)) shouldBe expectedLogs
+ await(esLogStore.fetchLogs(activation, defaultContext)) shouldBe expectedLogs
}
it should "get user logs from ElasticSearch when required headers are needed" in {
@@ -160,8 +162,9 @@ class ElasticSearchLogStoreTests
uri = "https://some.url",
headers = List(RawHeader("x-auth-token", authToken), RawHeader("x-auth-project-id", authProjectId)),
entity = HttpEntity.Empty)
+ val context = UserContext(user, requiredHeadersHttpRequest)
- await(esLogStore.fetchLogs(user, activation.withoutLogs, requiredHeadersHttpRequest)) shouldBe expectedLogs
+ await(esLogStore.fetchLogs(activation.withoutLogs, context)) shouldBe expectedLogs
}
it should "dynamically replace $UUID in request path" in {
@@ -177,13 +180,13 @@ class ElasticSearchLogStoreTests
Some(testFlow(defaultHttpResponse, httpRequest)),
elasticSearchConfig = dynamicPathConfig)
- await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs
+ await(esLogStore.fetchLogs(activation.withoutLogs, defaultContext)) shouldBe expectedLogs
}
it should "fail to connect to invalid host" in {
val esLogStore = new ElasticSearchLogStore(system, elasticSearchConfig = defaultConfig)
- a[Throwable] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest))
+ a[Throwable] should be thrownBy await(esLogStore.fetchLogs(activation, defaultContext))
}
it should "forward errors from ElasticSearch" in {
@@ -194,7 +197,7 @@ class ElasticSearchLogStoreTests
Some(testFlow(httpResponse, defaultHttpRequest)),
elasticSearchConfig = defaultConfig)
- a[RuntimeException] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest))
+ a[RuntimeException] should be thrownBy await(esLogStore.fetchLogs(activation, defaultContext))
}
it should "error when configuration protocol is invalid" in {
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
index e2bd8db..dae47e6 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -38,6 +38,7 @@ import pureconfig.error.ConfigReaderException
import spray.json._
import whisk.core.entity._
import whisk.core.entity.size._
+import whisk.core.database.UserContext
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
@@ -88,6 +89,8 @@ class SplunkLogStoreTests
annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
duration = Some(123))
+ val context = UserContext(user, request)
+
implicit val ec: ExecutionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
@@ -144,20 +147,20 @@ class SplunkLogStoreTests
it should "find logs based on activation timestamps" in {
//use the a flow that asserts the request structure and provides a response in the expected format
val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig)
- val result = await(splunkStore.fetchLogs(user, activation, request))
+ val result = await(splunkStore.fetchLogs(activation, context))
result shouldBe ActivationLogs(Vector("some log message", "some other log message"))
}
it should "fail to connect to bogus host" in {
//use the default http flow with the default bogus-host config
val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig)
- a[Throwable] should be thrownBy await(splunkStore.fetchLogs(user, activation, request))
+ a[Throwable] should be thrownBy await(splunkStore.fetchLogs(activation, context))
}
it should "display an error if API cannot be reached" in {
//use a flow that generates a 500 response
val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig)
- a[RuntimeException] should be thrownBy await(splunkStore.fetchLogs(user, activation, request))
+ a[RuntimeException] should be thrownBy await(splunkStore.fetchLogs(activation, context))
}
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index e635b5f..a46647a 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -38,6 +38,7 @@ import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import whisk.core.entity._
import whisk.core.entity.size._
import whisk.http.Messages
+import whisk.core.database.UserContext
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -163,7 +164,7 @@ class ContainerProxyTests
response
}
- def createStore = LoggedFunction { (transid: TransactionId, activation: WhiskActivation) =>
+ def createStore = LoggedFunction { (transid: TransactionId, activation: WhiskActivation, context: UserContext) =>
Future.successful(())
}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index 9b54b0a..7bbf820 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -35,6 +35,7 @@ import whisk.core.entity.size._
import whisk.core.entitlement.Collection
import whisk.http.ErrorResponse
import whisk.http.Messages
+import whisk.core.database.UserContext
import java.io.ByteArrayInputStream
import java.util.Base64
@@ -59,6 +60,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
behavior of "Actions API"
val creds = WhiskAuthHelpers.newIdentity()
+ val context = UserContext(creds)
val namespace = EntityPath(creds.subject.asString)
val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}"
def aname() = MakeName.next("action_tests")
@@ -1202,7 +1204,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
// storing the activation in the db will allow the db polling to retrieve it
// the test harness makes sure the activation id observed by the test matches
// the one generated by the api handler
- storeActivation(activation)
+ storeActivation(activation, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -1217,7 +1219,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
response should be(activation.resultAsJson)
}
} finally {
- deleteActivation(ActivationId(activation.docid.asString))
+ deleteActivation(ActivationId(activation.docid.asString), context)
}
}
@@ -1313,7 +1315,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
// storing the activation in the db will allow the db polling to retrieve it
// the test harness makes sure the activation id observed by the test matches
// the one generated by the api handler
- storeActivation(activation)
+ storeActivation(activation, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
status should be(InternalServerError)
@@ -1321,7 +1323,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
response should be(activation.withoutLogs.toExtendedJson)
}
} finally {
- deleteActivation(ActivationId(activation.docid.asString))
+ deleteActivation(ActivationId(activation.docid.asString), context)
}
}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
index 4b1927e..6810ef2 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
@@ -31,6 +31,7 @@ import whisk.core.entitlement.Collection
import whisk.core.entity._
import whisk.core.entity.size._
import whisk.http.{ErrorResponse, Messages}
+import whisk.core.database.UserContext
/**
* Tests Activations API.
@@ -51,6 +52,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
behavior of "Activations API"
val creds = WhiskAuthHelpers.newIdentity()
+ val context = UserContext(creds)
val namespace = EntityPath(creds.subject.asString)
val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}"
@@ -93,8 +95,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
end = Instant.now)
}.toList
try {
- (notExpectedActivations ++ activations).foreach(storeActivation)
- waitOnListActivationsInNamespace(namespace, 2)
+ (notExpectedActivations ++ activations).foreach(storeActivation(_, context))
+ waitOnListActivationsInNamespace(namespace, 2, context)
whisk.utils.retry {
Get(s"$collectionPath") ~> Route.seal(routes(creds)) ~> check {
@@ -135,7 +137,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
} finally {
(notExpectedActivations ++ activations).foreach(activation =>
- deleteActivation(ActivationId(activation.docid.asString)))
+ deleteActivation(ActivationId(activation.docid.asString), context))
}
}
@@ -176,8 +178,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
}.toList
try {
- (notExpectedActivations ++ activations).foreach(storeActivation)
- waitOnListActivationsInNamespace(namespace, 2)
+ (notExpectedActivations ++ activations).foreach(storeActivation(_, context))
+ waitOnListActivationsInNamespace(namespace, 2, context)
checkCount("", 2)
whisk.utils.retry {
@@ -190,7 +192,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
}
} finally {
(notExpectedActivations ++ activations).foreach(activation =>
- deleteActivation(ActivationId(activation.docid.asString)))
+ deleteActivation(ActivationId(activation.docid.asString), context))
}
}
@@ -251,8 +253,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
end = now.plusSeconds(30))) // should match
try {
- (notExpectedActivations ++ activations).foreach(storeActivation)
- waitOnListActivationsInNamespace(namespace, activations.length)
+ (notExpectedActivations ++ activations).foreach(storeActivation(_, context))
+ waitOnListActivationsInNamespace(namespace, activations.length, context)
{ // get between two time stamps
val filter = s"since=${since.toEpochMilli}&upto=${upto.toEpochMilli}"
@@ -306,7 +308,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
} finally {
(notExpectedActivations ++ activations).foreach(activation =>
- deleteActivation(ActivationId(activation.docid.asString)))
+ deleteActivation(ActivationId(activation.docid.asString), context))
}
}
@@ -360,9 +362,13 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
annotations = Parameters("path", s"${namespace.asString}/pkg/xyz"))
}.toList
try {
- (notExpectedActivations ++ activations ++ activationsInPackage).foreach(storeActivation)
- waitOnListActivationsMatchingName(namespace, EntityPath("xyz"), activations.length)
- waitOnListActivationsMatchingName(namespace, EntityName("pkg").addPath(EntityName("xyz")), activations.length)
+ (notExpectedActivations ++ activations ++ activationsInPackage).foreach(storeActivation(_, context))
+ waitOnListActivationsMatchingName(namespace, EntityPath("xyz"), activations.length, context)
+ waitOnListActivationsMatchingName(
+ namespace,
+ EntityName("pkg").addPath(EntityName("xyz")),
+ activations.length,
+ context)
checkCount("name=xyz", activations.length)
whisk.utils.retry {
@@ -386,7 +392,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
}
} finally {
(notExpectedActivations ++ activations ++ activationsInPackage).foreach(activation =>
- deleteActivation(ActivationId(activation.docid.asString)))
+ deleteActivation(ActivationId(activation.docid.asString), context))
}
}
@@ -474,7 +480,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation)
+ storeActivation(activation, context)
Get(s"$collectionPath/${activation.activationId.asString}") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -495,7 +501,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
status should be(Forbidden)
}
} finally {
- deleteActivation(ActivationId(activation.docid.asString))
+ deleteActivation(ActivationId(activation.docid.asString), context)
}
}
@@ -511,7 +517,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation)
+ storeActivation(activation, context)
Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -519,7 +525,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
response should be(activation.response.toExtendedJson)
}
} finally {
- deleteActivation(ActivationId(activation.docid.asString))
+ deleteActivation(ActivationId(activation.docid.asString), context)
}
}
@@ -535,7 +541,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation)
+ storeActivation(activation, context)
Get(s"$collectionPath/${activation.activationId.asString}/logs") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -543,7 +549,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
response should be(activation.logs.toJsonObject)
}
} finally {
- deleteActivation(ActivationId(activation.docid.asString))
+ deleteActivation(ActivationId(activation.docid.asString), context)
}
}
@@ -558,14 +564,14 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
ActivationId.generate(),
start = Instant.now,
end = Instant.now)
- storeActivation(activation)
+ storeActivation(activation, context)
try {
Get(s"$collectionPath/${activation.activationId.asString}/bogus") ~> Route.seal(routes(creds)) ~> check {
status should be(NotFound)
}
} finally {
- deleteActivation(ActivationId(activation.docid.asString))
+ deleteActivation(ActivationId(activation.docid.asString), context)
}
}
@@ -631,7 +637,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
val activation =
new BadActivation(namespace, aname(), creds.subject, ActivationId.generate(), Instant.now, Instant.now)
- storeActivation(activation)
+ storeActivation(activation, context)
Get(s"$collectionPath/${activation.activationId}") ~> Route.seal(routes(creds)) ~> check {
status should be(InternalServerError)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index b09d1b1..925c47f 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -42,6 +42,7 @@ import whisk.core.entity._
import whisk.core.entity.test.ExecHelpers
import whisk.core.loadBalancer.LoadBalancer
import whisk.spi.SpiLoader
+import whisk.core.database.UserContext
protected trait ControllerTestCommon
extends FlatSpec
@@ -96,32 +97,34 @@ protected trait ControllerTestCommon
}, dbOpTimeout)
}
- def getActivation(activationId: ActivationId)(implicit transid: TransactionId,
- timeout: Duration = 10 seconds): WhiskActivation = {
- Await.result(activationStore.get(activationId), timeout)
+ def getActivation(activationId: ActivationId, context: UserContext)(
+ implicit transid: TransactionId,
+ timeout: Duration = 10 seconds): WhiskActivation = {
+ Await.result(activationStore.get(activationId, context), timeout)
}
- def storeActivation(activation: WhiskActivation)(implicit transid: TransactionId,
- timeout: Duration = 10 seconds): DocInfo = {
- val docFuture = activationStore.store(activation)
+ def storeActivation(activation: WhiskActivation, context: UserContext)(implicit transid: TransactionId,
+ timeout: Duration = 10 seconds): DocInfo = {
+ val docFuture = activationStore.store(activation, context)
val doc = Await.result(docFuture, timeout)
assert(doc != null)
doc
}
- def deleteActivation(activationId: ActivationId)(implicit transid: TransactionId) = {
- val res = Await.result(activationStore.delete(activationId), dbOpTimeout)
+ def deleteActivation(activationId: ActivationId, context: UserContext)(implicit transid: TransactionId) = {
+ val res = Await.result(activationStore.delete(activationId, context), dbOpTimeout)
assert(res, true)
res
}
- def waitOnListActivationsInNamespace(namespace: EntityPath, count: Int)(implicit context: ExecutionContext,
- transid: TransactionId,
- timeout: Duration) = {
+ def waitOnListActivationsInNamespace(namespace: EntityPath, count: Int, context: UserContext)(
+ implicit ec: ExecutionContext,
+ transid: TransactionId,
+ timeout: Duration) = {
val success = retry(
() => {
val activations: Future[Either[List[JsObject], List[WhiskActivation]]] =
- activationStore.listActivationsInNamespace(namespace, 0, 0)
+ activationStore.listActivationsInNamespace(namespace, 0, 0, context = context)
val listFuture: Future[List[JsObject]] = activations map (_.fold((js) => js, (wa) => wa.map(_.toExtendedJson)))
listFuture map { l =>
@@ -135,14 +138,14 @@ protected trait ControllerTestCommon
assert(success.isSuccess, "wait aborted")
}
- def waitOnListActivationsMatchingName(namespace: EntityPath, name: EntityPath, count: Int)(
- implicit context: ExecutionContext,
+ def waitOnListActivationsMatchingName(namespace: EntityPath, name: EntityPath, count: Int, context: UserContext)(
+ implicit ex: ExecutionContext,
transid: TransactionId,
timeout: Duration) = {
val success = retry(
() => {
val activations: Future[Either[List[JsObject], List[WhiskActivation]]] =
- activationStore.listActivationsMatchingName(namespace, name, 0, 0)
+ activationStore.listActivationsMatchingName(namespace, name, 0, 0, context = context)
val listFuture: Future[List[JsObject]] = activations map (_.fold((js) => js, (wa) => wa.map(_.toExtendedJson)))
listFuture map { l =>
diff --git a/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala
index cfdda7e..415d224 100644
--- a/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala
@@ -25,7 +25,6 @@ import scala.language.postfixOps
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
-import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.model.StatusCodes._
@@ -41,6 +40,7 @@ import whisk.core.entity.size._
import whisk.core.entity.test.OldWhiskTrigger
import whisk.http.ErrorResponse
import whisk.http.Messages
+import whisk.core.database.UserContext
/**
* Tests Trigger API.
@@ -61,6 +61,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
behavior of "Triggers API"
val creds = WhiskAuthHelpers.newIdentity()
+ val context = UserContext(creds)
val namespace = EntityPath(creds.subject.asString)
val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}"
def aname() = MakeName.next("triggers_tests")
@@ -373,8 +374,8 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
whisk.utils.retry({
println(s"trying to obtain async activation doc: '${activationDoc}'")
- val activation = getActivation(ActivationId(activationDoc.asString))
- deleteActivation(ActivationId(activationDoc.asString))
+ val activation = getActivation(ActivationId(activationDoc.asString), context)
+ deleteActivation(ActivationId(activationDoc.asString), context)
activation.end should be(Instant.EPOCH)
activation.response.result should be(Some(content))
}, 30, Some(1.second))
@@ -396,7 +397,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
val activationDoc = DocId(WhiskEntity.qualifiedName(namespace, activationId))
whisk.utils.retry({
println(s"trying to delete async activation doc: '${activationDoc}'")
- deleteActivation(ActivationId(activationDoc.asString))
+ deleteActivation(ActivationId(activationDoc.asString), context)
response.fields("activationId") should not be None
}, 30, Some(1.second))
}