You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2020/04/30 05:18:40 UTC
[openwhisk] branch master updated: Disable db record store for
successful blocking activations (#4885)
This is an automated email from the ASF dual-hosted git repository.
stanciu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 0f3ef78 Disable db record store for successful blocking activations (#4885)
0f3ef78 is described below
commit 0f3ef780124622e1983e9bc693109bca94b5c7d4
Author: Cosmin Stanciu <se...@users.noreply.github.com>
AuthorDate: Wed Apr 29 22:18:27 2020 -0700
Disable db record store for successful blocking activations (#4885)
* Make container proxy activation store configurable
* Format code
* Add tests
* Don't copy message in case of blocking action
* Don't store sequences and compositions when disable store config is true
* Make splunk logs available for activations with no db entry
* Make elasticsearch logs available for activations with no db entry
* Format code
* Add comment for disable-store-result configuration
* Update configuration description
* Move logic for activation store disable to a central location
* Update common/scala/src/main/resources/application.conf
Co-Authored-By: rodric rabbah <ro...@gmail.com>
* Update common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
Co-Authored-By: rodric rabbah <ro...@gmail.com>
* Update common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
Co-Authored-By: rodric rabbah <ro...@gmail.com>
* Update tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
Co-Authored-By: rodric rabbah <ro...@gmail.com>
* Remove creation of bogus WhiskActivation record
* Wrap start, end and logs parameters in Options
* Read disableStore configuration only from ActivationStore
* Add logging for activations that have not been stored
* Including action name in the log message
* Make splunk api max-time and max earliest_time configurable
* Pass disableStore as explicit not implicit parameter
* Define variables as FiniteDuration
* Use ActivationId instead of String
* Update common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
Co-Authored-By: rodric rabbah <ro...@gmail.com>
* Fix tests
Co-authored-by: rodric rabbah <ro...@gmail.com>
---
common/scala/src/main/resources/application.conf | 14 +++-
.../org/apache/openwhisk/core/WhiskConfig.scala | 3 +
.../logging/DockerToActivationLogStore.scala | 12 ++-
.../logging/ElasticSearchLogStore.scala | 38 +++++----
.../containerpool/logging/LogDriverLogStore.scala | 12 ++-
.../core/containerpool/logging/LogStore.scala | 20 ++++-
.../containerpool/logging/SplunkLogStore.scala | 42 ++++++----
.../openwhisk/core/database/ActivationStore.scala | 50 +++++++++++-
.../openwhisk/core/controller/Activations.scala | 15 +++-
.../openwhisk/core/controller/ApiUtils.scala | 78 +++++++++++++++++--
.../openwhisk/core/controller/Triggers.scala | 2 +-
.../core/controller/actions/PrimitiveActions.scala | 12 ++-
.../core/controller/actions/SequenceActions.scala | 11 ++-
.../core/containerpool/ContainerProxy.scala | 14 ++--
.../openwhisk/core/invoker/InvokerReactive.scala | 7 +-
.../logging/ElasticSearchLogStoreTests.scala | 68 ++++++++++++----
.../logging/SplunkLogStoreTests.scala | 24 ++++--
.../containerpool/test/ContainerProxyTests.scala | 7 +-
.../test/ActionsApiWithDbPollingTests.scala | 4 +-
.../core/controller/test/ActivationsApiTests.scala | 90 +++++++++++++++++++---
.../controller/test/ControllerTestCommon.scala | 60 ++++++++++++++-
.../core/controller/test/SequenceApiTests.scala | 65 +++-------------
22 files changed, 487 insertions(+), 161 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 3a3699a..994f2cd 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -378,6 +378,13 @@ whisk {
# instance 64
# uniqueName + displayName 253 (max pod name length in Kube)
serdes-overhead = 6068 // 3034 bytes of metadata * 2 for extra headroom
+
+ # Disables database store for blocking + successful activations
+ # invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored
+ disable-store-result = false
+
+ # Enable metadata logging of activations not stored in the database
+ unstored-logs-enabled = false
}
# action timelimit configuration
@@ -467,10 +474,13 @@ whisk {
# log-timestamp-field = "log_timestamp" #splunk field where timestamp is stored (to reflect log event generated time, not splunk's _time)
# log-stream-field = "log_stream" #splunk field where stream is stored (stdout/stderr)
# log-message-field = "log_message" #splunk field where log message is stored
+ # namespace-field = "namespace" #splunk field where namespace is stored
# activation-id-field = "activation_id" #splunk field where activation id is stored
# query-constraints = "" #additional constraints for splunk queries
- # query-timestamp-offset-seconds = "" #splunk query will be broadened by this 2*<offset value>; e.g. "earliest_time=activation.start - offset" and "latest_time=activation.end + offset"
- # disableSNI = false #if true, disables hostname validation and cert validation (in case splunk api endpoint is using a self signed cert)
+ # finalize-max-time = 10.seconds #splunk api max_time The number of seconds to run this search before finalizing. Specify 0 to never finalize.
+ # earliest-time-offset = 7.days #splunk query will search for records no older than the offset defined; e.g. "earliest_time=now() - offset"
+ # query-timestamp-offset = 2.seconds #splunk query will be broadened by this 2*<offset value>; e.g. "earliest_time=activation.start - offset" and "latest_time=activation.end + offset"
+ # disable-sni = false #if true, disables hostname validation and cert validation (in case splunk api endpoint is using a self signed cert)
#}
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 8bb5bdb..6a54f63 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -266,6 +266,9 @@ object ConfigKeys {
val whiskConfig = "whisk.config"
val swaggerUi = "whisk.swagger-ui"
+ val disableStoreResult = s"$activation.disable-store-result"
+ val unstoredLogsEnabled = s"$activation.unstored-logs-enabled"
+
val apacheClientConfig = "whisk.apache-client"
val parameterStorage = "whisk.parameter-storage"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
index c11b8c5..0fa8892 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -68,8 +68,16 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
override val containerParameters = Map("--log-driver" -> Set("json-file"))
/* As logs are already part of the activation record, just return that bit of it */
- override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] =
- Future.successful(activation.logs)
+ override def fetchLogs(namespace: String,
+ activationId: ActivationId,
+ start: Option[Instant],
+ end: Option[Instant],
+ activationLogs: Option[ActivationLogs],
+ context: UserContext): Future[ActivationLogs] =
+ activationLogs match {
+ case Some(logs) => Future.successful(logs)
+ case None => Future.failed(new RuntimeException(s"Activation logs not available for activation ${activationId}"))
+ }
/**
* Obtains the container's stdout and stderr output.
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
index c3054c5..fcc5bb3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
@@ -18,26 +18,25 @@
package org.apache.openwhisk.core.containerpool.logging
import java.nio.file.{Path, Paths}
+import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import akka.http.scaladsl.model._
-
-import org.apache.openwhisk.core.entity.{ActivationLogs, Identity, WhiskActivation}
+import org.apache.openwhisk.core.entity.{ActivationId, ActivationLogs, Identity}
import org.apache.openwhisk.core.containerpool.logging.ElasticSearchJsonProtocol._
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.database.UserContext
import scala.concurrent.{Future, Promise}
import scala.util.Try
-
import spray.json._
-
import pureconfig._
import pureconfig.generic.auto._
case class ElasticSearchLogFieldConfig(userLogs: String,
message: String,
+ tenantId: String,
activationId: String,
stream: String,
time: String)
@@ -90,9 +89,9 @@ class ElasticSearchLogStore(
private def extractRequiredHeaders(headers: Seq[HttpHeader]) =
headers.filter(h => elasticSearchConfig.requiredHeaders.contains(h.lowercaseName)).toList
- private def generatePayload(activation: WhiskActivation) = {
+ private def generatePayload(namespace: String, activationId: ActivationId) = {
val logQuery =
- s"_type: ${elasticSearchConfig.logSchema.userLogs} AND ${elasticSearchConfig.logSchema.activationId}: ${activation.activationId}"
+ s"_type: ${elasticSearchConfig.logSchema.userLogs} AND ${elasticSearchConfig.logSchema.tenantId}: ${namespace} AND ${elasticSearchConfig.logSchema.activationId}: ${activationId}"
val queryString = EsQueryString(logQuery)
val queryOrder = EsQueryOrder(elasticSearchConfig.logSchema.time, EsOrderAsc)
@@ -101,19 +100,30 @@ class ElasticSearchLogStore(
private def generatePath(user: Identity) = elasticSearchConfig.path.format(user.namespace.uuid.asString)
- override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] = {
+ override def fetchLogs(namespace: String,
+ activationId: ActivationId,
+ start: Option[Instant],
+ end: Option[Instant],
+ activationLogs: Option[ActivationLogs],
+ context: UserContext): Future[ActivationLogs] = {
val headers = extractRequiredHeaders(context.request.headers)
// Return logs from ElasticSearch, or return logs from activation if required headers are not present
if (headers.length == elasticSearchConfig.requiredHeaders.length) {
- esClient.search[EsSearchResult](generatePath(context.user), generatePayload(activation), headers).flatMap {
- case Right(queryResult) =>
- Future.successful(transcribeLogs(queryResult))
- case Left(code) =>
- Future.failed(new RuntimeException(s"Status code '$code' was returned from log store"))
- }
+ esClient
+ .search[EsSearchResult](generatePath(context.user), generatePayload(namespace, activationId), headers)
+ .flatMap {
+ case Right(queryResult) =>
+ Future.successful(transcribeLogs(queryResult))
+ case Left(code) =>
+ Future.failed(new RuntimeException(s"Status code '$code' was returned from log store"))
+ }
} else {
- Future.successful(activation.logs)
+ activationLogs match {
+ case Some(logs) => Future.successful(logs)
+ case None =>
+ Future.failed(new RuntimeException(s"Activation logs not available for activation ${activationId}"))
+ }
}
}
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/LogDriverLogStore.scala
index 5dff55f..e4a5876 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/LogDriverLogStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -17,11 +17,12 @@
package org.apache.openwhisk.core.containerpool.logging
-import akka.actor.ActorSystem
+import java.time.Instant
+import akka.actor.ActorSystem
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.Container
-import org.apache.openwhisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
+import org.apache.openwhisk.core.entity.{ActivationId, ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
import org.apache.openwhisk.core.database.UserContext
import scala.concurrent.Future
@@ -50,7 +51,12 @@ class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore {
/** no logs exposed to API/CLI using only the LogDriverLogStore; use an extended version,
* e.g. the SplunkLogStore to expose logs from some external source */
- def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] =
+ def fetchLogs(namespace: String,
+ activationId: ActivationId,
+ start: Option[Instant],
+ end: Option[Instant],
+ activationLogs: Option[ActivationLogs],
+ context: UserContext): Future[ActivationLogs] =
Future.successful(ActivationLogs(Vector("Logs are not available.")))
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/LogStore.scala
index c6cc295..9d4838f 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/LogStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/LogStore.scala
@@ -17,11 +17,12 @@
package org.apache.openwhisk.core.containerpool.logging
-import akka.actor.ActorSystem
+import java.time.Instant
+import akka.actor.ActorSystem
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.Container
-import org.apache.openwhisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
+import org.apache.openwhisk.core.entity.{ActivationId, ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
import org.apache.openwhisk.spi.Spi
import org.apache.openwhisk.core.database.UserContext
@@ -78,11 +79,22 @@ trait LogStore {
* Fetch relevant logs for the given activation from the store.
*
* This method is called when a user requests logs via the API.
+ * The "logs" parameter is not empty if:
+ * - the activation record exists
+ * - the logs are stored embedded in the activation record
*
- * @param activation activation to fetch the logs for
+ * @param namespace namespace to fetch the logs for
+ * @param activationId activation to fetch the logs for
+ * @param start activation start
+ * @param end activation end
* @return the relevant logs
*/
- def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs]
+ def fetchLogs(namespace: String,
+ activationId: ActivationId,
+ start: Option[Instant],
+ end: Option[Instant],
+ logs: Option[ActivationLogs],
+ content: UserContext): Future[ActivationLogs]
}
trait LogStoreProvider extends Spi {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala
index b778d9e..c83309d 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala
@@ -17,6 +17,9 @@
package org.apache.openwhisk.core.containerpool.logging
+import java.time.Instant
+import java.time.temporal.ChronoUnit
+
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Post
@@ -36,9 +39,7 @@ import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
-
import com.typesafe.sslconfig.akka.AkkaSSLConfig
-
import pureconfig._
import pureconfig.generic.auto._
@@ -47,15 +48,14 @@ import scala.concurrent.Promise
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
import spray.json._
-
import org.apache.openwhisk.common.AkkaLogging
import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.entity.ActivationLogs
-import org.apache.openwhisk.core.entity.WhiskActivation
+import org.apache.openwhisk.core.entity.{ActivationId, ActivationLogs}
import org.apache.openwhisk.core.database.UserContext
+import scala.concurrent.duration.FiniteDuration
+
case class SplunkLogStoreConfig(host: String,
port: Int,
username: String,
@@ -64,9 +64,12 @@ case class SplunkLogStoreConfig(host: String,
logTimestampField: String,
logStreamField: String,
logMessageField: String,
+ namespaceField: String,
activationIdField: String,
queryConstraints: String,
- queryTimestampOffsetSeconds: Int,
+ finalizeMaxTime: FiniteDuration,
+ earliestTimeOffset: FiniteDuration,
+ queryTimestampOffset: FiniteDuration,
disableSNI: Boolean)
case class SplunkResponse(results: Vector[JsObject])
object SplunkResponseJsonProtocol extends DefaultJsonProtocol {
@@ -103,27 +106,36 @@ class SplunkLogStore(
Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true))))
else Http().defaultClientHttpsContext)
- override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] = {
+ override def fetchLogs(namespace: String,
+ activationId: ActivationId,
+ start: Option[Instant],
+ end: Option[Instant],
+ logs: Option[ActivationLogs],
+ context: UserContext): Future[ActivationLogs] = {
//example curl request:
- // curl -u username:password -k https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d output_mode=json -d "search=search index=\"someindex\" | spath=log_message | search activation_id=a930e5ae4ad4455c8f2505d665aad282 | table log_message" -d "earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
+ // curl -u username:password -k https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d output_mode=json -d "search=search index=someindex | search namespace=guest | search activation_id=a930e5ae4ad4455c8f2505d665aad282 | spath=log_message | table log_message" -d "earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
//example response:
// {"preview":false,"init_offset":0,"messages":[],"fields":[{"name":"log_message"}],"results":[{"log_message":"some log message"}], "highlighted":{}}
//note: splunk returns results in reverse-chronological order, therefore we include "| reverse" to cause results to arrive in chronological order
val search =
- s"""search index="${splunkConfig.index}"| spath ${splunkConfig.logMessageField}| search ${splunkConfig.queryConstraints} ${splunkConfig.activationIdField}=${activation.activationId.toString}| table ${splunkConfig.logTimestampField}, ${splunkConfig.logStreamField}, ${splunkConfig.logMessageField}| reverse"""
+ s"""search index="${splunkConfig.index}" | search ${splunkConfig.queryConstraints} | search ${splunkConfig.namespaceField}=${namespace} | search ${splunkConfig.activationIdField}=${activationId} | spath ${splunkConfig.logMessageField} | table ${splunkConfig.logTimestampField}, ${splunkConfig.logStreamField}, ${splunkConfig.logMessageField} | reverse"""
val entity = FormData(
Map(
"exec_mode" -> "oneshot",
"search" -> search,
"output_mode" -> "json",
- "earliest_time" -> activation.start
- .minusSeconds(splunkConfig.queryTimestampOffsetSeconds)
+ "earliest_time" -> start
+ .getOrElse(Instant.now().minus(splunkConfig.earliestTimeOffset.toSeconds, ChronoUnit.SECONDS))
+ .minusSeconds(splunkConfig.queryTimestampOffset.toSeconds)
.toString, //assume that activation start/end are UTC zone, and splunk events are the same
- "latest_time" -> activation.end
- .plusSeconds(splunkConfig.queryTimestampOffsetSeconds) //add 5s to avoid a timerange of 0 on short-lived activations
- .toString)).toEntity
+ "latest_time" -> end
+ .getOrElse(Instant.now())
+ .plusSeconds(splunkConfig.queryTimestampOffset.toSeconds) //add 5s to avoid a timerange of 0 on short-lived activations
+ .toString,
+ "max_time" -> splunkConfig.finalizeMaxTime.toSeconds.toString //max time for the search query to run in seconds
+ )).toEntity
logging.debug(this, "sending request")
queueRequest(
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
index e4f974a..99783fe 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
@@ -24,8 +24,10 @@ import akka.stream.ActorMaterializer
import akka.http.scaladsl.model.HttpRequest
import spray.json.JsObject
import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.spi.Spi
+import pureconfig.loadConfigOrThrow
import scala.concurrent.Future
@@ -33,21 +35,40 @@ case class UserContext(user: Identity, request: HttpRequest = HttpRequest())
trait ActivationStore {
+ protected val disableStoreResultConfig = loadConfigOrThrow[Boolean](ConfigKeys.disableStoreResult)
+ protected val unstoredLogsEnabledConfig = loadConfigOrThrow[Boolean](ConfigKeys.unstoredLogsEnabled)
+
/**
* Checks if an activation should be stored in database and stores it.
*
* @param activation activation to store
+ * @param isBlockingActivation is activation blocking
* @param context user and request context
* @param transid transaction ID for request
* @param notifier cache change notifier
* @return Future containing DocInfo related to stored activation
*/
- def storeAfterCheck(activation: WhiskActivation, context: UserContext)(
- implicit transid: TransactionId,
- notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
- if (context.user.limits.storeActivations.getOrElse(true)) {
+ def storeAfterCheck(activation: WhiskActivation,
+ isBlockingActivation: Boolean,
+ disableStore: Option[Boolean],
+ context: UserContext)(implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification],
+ logging: Logging): Future[DocInfo] = {
+ if (context.user.limits.storeActivations.getOrElse(true) &&
+ shouldStoreActivation(
+ activation.response.isSuccess,
+ isBlockingActivation,
+ transid.meta.extraLogging,
+ disableStore.getOrElse(disableStoreResultConfig))) {
+
store(activation, context)
} else {
+ if (unstoredLogsEnabledConfig) {
+ logging.info(
+ this,
+ s"Explicitly NOT storing activation ${activation.activationId.asString} for action ${activation.name} from namespace ${activation.namespace.asString} with response_size=${activation.response.size
+ .getOrElse("0")}B")
+ }
Future.successful(DocInfo(activation.docid))
}
}
@@ -154,6 +175,27 @@ trait ActivationStore {
since: Option[Instant] = None,
upto: Option[Instant] = None,
context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]]
+
+ /**
+ * Checks if the system is configured to not store the activation in the database.
+ * Only stores activations if one of these is true:
+ * - result is an error,
+ * - a non-blocking activation
+ * - an activation in debug mode
+ * - activation stores is not disabled via a configuration parameter
+ *
+ * @param isSuccess is successful activation
+ * @param isBlocking is blocking activation
+ * @param debugMode is logging header set to "on" for the invocation
+ * @param disableStore is disable store configured
+ * @return Should the activation be stored to the database
+ */
+ private def shouldStoreActivation(isSuccess: Boolean,
+ isBlocking: Boolean,
+ debugMode: Boolean,
+ disableStore: Boolean): Boolean = {
+ !isSuccess || !isBlocking || debugMode || !disableStore
+ }
}
trait ActivationStoreProvider extends Spi {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
index 9100ba4..eb1c0f6 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
@@ -28,6 +28,7 @@ import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.unmarshalling._
import spray.json.DefaultJsonProtocol.RootJsObjectFormat
import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.containerpool.logging.LogStore
import org.apache.openwhisk.core.controller.RestApiCommons.{ListLimit, ListSkip}
import org.apache.openwhisk.core.database.ActivationStore
@@ -37,6 +38,7 @@ import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.http.ErrorResponse.terminate
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.core.database.UserContext
+import pureconfig.loadConfigOrThrow
object WhiskActivationsApi {
@@ -72,6 +74,8 @@ object WhiskActivationsApi {
/** A trait implementing the activations API. */
trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider with AuthorizedRouteProvider with ReadOps {
+ protected val disableStoreResultConfig = loadConfigOrThrow[Boolean](ConfigKeys.disableStoreResult)
+
protected override val collection = Collection(Collection.ACTIVATIONS)
/** JSON response formatter. */
@@ -219,8 +223,15 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
* - 500 Internal Server Error
*/
private def fetchLogs(context: UserContext, docid: DocId)(implicit transid: TransactionId) = {
- getEntityAndProject(
+ getEntityAndProjectLog(
activationStore.get(ActivationId(docid.asString), context),
- (activation: WhiskActivation) => logStore.fetchLogs(activation, context).map(_.toJsonObject))
+ docid,
+ disableStoreResultConfig,
+ (namespace: String,
+ activationId: ActivationId,
+ start: Option[Instant],
+ end: Option[Instant],
+ logs: Option[ActivationLogs]) =>
+ logStore.fetchLogs(namespace, activationId, start, end, logs, context).map(_.toJsonObject))
}
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
index 6ef01c7..42b39cd 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
@@ -17,6 +17,8 @@
package org.apache.openwhisk.core.controller
+import java.time.Instant
+
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
@@ -29,15 +31,12 @@ import akka.http.scaladsl.model.StatusCodes.NotFound
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.{Directives, RequestContext, RouteResult}
import spray.json.DefaultJsonProtocol._
-import spray.json.JsObject
-import spray.json.JsValue
-import spray.json.RootJsonFormat
+import spray.json.{JsObject, JsValue, RootJsonFormat}
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.controller.PostProcess.PostProcessEntity
import org.apache.openwhisk.core.database._
-import org.apache.openwhisk.core.entity.DocId
-import org.apache.openwhisk.core.entity.WhiskDocument
+import org.apache.openwhisk.core.entity.{ActivationId, ActivationLogs, DocId, WhiskActivation, WhiskDocument}
import org.apache.openwhisk.http.ErrorResponse
import org.apache.openwhisk.http.ErrorResponse.terminate
import org.apache.openwhisk.http.Messages._
@@ -195,6 +194,75 @@ trait ReadOps extends Directives {
terminate(InternalServerError)
}
}
+
+ /**
+ * Waits on specified Future that returns an entity of type A from datastore.
+ * In case A entity is not stored, use the docId to search logstore
+ * Terminates HTTP request.
+ *
+ * @param entity future that returns an entity of type A fetched from datastore
+ * @param docId activation DocId
+ * @param disableStoreResultConfig configuration
+ * @param project a function A => JSON which projects fields form A
+ *
+ * Responses are one of (Code, Message)
+ * - 200 project(A) as JSON
+ * - 404 Not Found
+ * - 500 Internal Server Error
+ */
+ protected def getEntityAndProjectLog[A <: DocumentRevisionProvider, Au >: A](
+ entity: Future[A],
+ docId: DocId,
+ disableStoreResultConfig: Boolean,
+ project: (String, ActivationId, Option[Instant], Option[Instant], Option[ActivationLogs]) => Future[JsObject])(
+ implicit transid: TransactionId,
+ format: RootJsonFormat[A],
+ ma: Manifest[A]) = {
+ onComplete(entity) {
+ case Success(entity) =>
+ logging.debug(this, s"[PROJECT] entity success")
+ val activation = entity.asInstanceOf[WhiskActivation]
+ onComplete(
+ project(
+ activation.namespace.asString,
+ activation.activationId,
+ Some(activation.start),
+ Some(activation.end),
+ Some(activation.logs))) {
+ case Success(response: JsObject) =>
+ complete(OK, response)
+ case Failure(t: Throwable) =>
+ logging.error(this, s"[PROJECT] projection failed: ${t.getMessage}")
+ terminate(InternalServerError, t.getMessage)
+ }
+ case Failure(t: NoDocumentException) =>
+ // In case disableStoreResult configuration is active, persevere
+ // log might still be available even if entity was not
+ if (disableStoreResultConfig) {
+ val namespace = docId.asString.split("/")(0)
+ val id = docId.asString.split("/")(1)
+ onComplete(project(namespace, ActivationId(id), None, None, None)) {
+ case Success(response: JsObject) =>
+ logging.debug(this, s"[PROJECTLOG] entity success")
+ complete(OK, response)
+ case Failure(t: Throwable) =>
+ logging.error(this, s"[PROJECTLOG] projection failed: ${t.getMessage}")
+ terminate(InternalServerError, t.getMessage)
+ }
+ } else {
+ terminate(NotFound)
+ }
+ case Failure(t: DocumentTypeMismatchException) =>
+ logging.debug(this, s"[PROJECT] entity conformance check failed: ${t.getMessage}")
+ terminate(Conflict, conformanceMessage)
+ case Failure(t: ArtifactStoreException) =>
+ logging.debug(this, s"[PROJECT] entity unreadable")
+ terminate(InternalServerError, t.getMessage)
+ case Failure(t: Throwable) =>
+ logging.error(this, s"[PROJECT] entity failed: ${t.getMessage}")
+ terminate(InternalServerError)
+ }
+ }
}
/** A trait for REST APIs that write entities to a datastore */
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
index c2b4272..86b793e 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
@@ -169,7 +169,7 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
triggerActivation
}
.map { activation =>
- activationStore.storeAfterCheck(activation, context)
+ activationStore.storeAfterCheck(activation, false, None, context)
}
respondWithActivationIdHeader(triggerActivationId) {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
index 1505ce9..f376d77 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
@@ -290,7 +290,8 @@ protected[actions] trait PrimitiveActions {
logging.info(this, s"invoking composition $action topmost ${cause.isEmpty} activationid '${session.activationId}'")
val response: Future[Either[ActivationId, WhiskActivation]] =
- invokeConductor(user, payload, session).map(response => Right(completeActivation(user, session, response)))
+ invokeConductor(user, payload, session).map(response =>
+ Right(completeActivation(user, session, response, waitForResponse.isDefined)))
// is caller waiting for the result of the activation?
cause
@@ -526,8 +527,10 @@ protected[actions] trait PrimitiveActions {
* Creates an activation for a composition and writes it back to the datastore.
* Returns the activation.
*/
- private def completeActivation(user: Identity, session: Session, response: ActivationResponse)(
- implicit transid: TransactionId): WhiskActivation = {
+ private def completeActivation(user: Identity,
+ session: Session,
+ response: ActivationResponse,
+ blockingComposition: Boolean)(implicit transid: TransactionId): WhiskActivation = {
val context = UserContext(user)
@@ -575,7 +578,7 @@ protected[actions] trait PrimitiveActions {
}
}
- activationStore.storeAfterCheck(activation, context)(transid, notifier = None)
+ activationStore.storeAfterCheck(activation, blockingComposition, None, context)(transid, notifier = None, logging)
activation
}
@@ -662,6 +665,7 @@ protected[actions] trait PrimitiveActions {
protected val controllerActivationConfig =
loadConfigOrThrow[ControllerActivationConfig](ConfigKeys.controllerActivation)
+
}
case class ControllerActivationConfig(pollingFromDb: Boolean, maxWaitForBlockingActivation: FiniteDuration)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
index 4c95590..54272dd 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
@@ -21,7 +21,6 @@ import java.time.{Clock, Instant}
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorSystem
-import spray.json._
import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
import org.apache.openwhisk.core.connector.{EventMessage, MessagingProvider}
import org.apache.openwhisk.core.controller.WhiskServices
@@ -32,6 +31,7 @@ import org.apache.openwhisk.core.entity.types._
import org.apache.openwhisk.http.Messages._
import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.utils.ExecutionContextFactory.FutureExtensions
+import spray.json._
import scala.collection._
import scala.concurrent.duration._
@@ -120,6 +120,7 @@ protected[actions] trait SequenceActions {
user,
action,
topmost,
+ waitForOutermostResponse.isDefined,
start,
cause)
}
@@ -152,6 +153,7 @@ protected[actions] trait SequenceActions {
user: Identity,
action: WhiskActionMetaData,
topmost: Boolean,
+ blockingSequence: Boolean,
start: Instant,
cause: Option[ActivationId])(
implicit transid: TransactionId): Future[(Right[ActivationId, WhiskActivation], Int)] = {
@@ -175,7 +177,11 @@ protected[actions] trait SequenceActions {
case Failure(t) => logging.warn(this, s"activation event was not sent: $t")
}
}
- activationStore.storeAfterCheck(seqActivation, context)(transid, notifier = None)
+
+ activationStore.storeAfterCheck(seqActivation, blockingSequence, None, context)(
+ transid,
+ notifier = None,
+ logging)
// This should never happen; in this case, there is no activation record created or stored:
// should there be?
@@ -388,6 +394,7 @@ protected[actions] trait SequenceActions {
/** Max atomic action count allowed for sequences */
private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
+
}
/**
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 71acec5..b7e7cf6 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -32,7 +32,7 @@ import akka.io.Tcp.CommandFailed
import akka.io.Tcp.Connect
import akka.io.Tcp.Connected
import akka.pattern.pipe
-import pureconfig._
+import pureconfig.loadConfigOrThrow
import pureconfig.generic.auto._
import akka.stream.ActorMaterializer
import java.net.InetSocketAddress
@@ -251,7 +251,7 @@ class ContainerProxy(factory: (TransactionId,
Int,
Option[ExecutableWhiskAction]) => Future[Container],
sendActiveAck: ActiveAck,
- storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
+ storeActivation: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
collectLogs: LogsCollector,
instance: InvokerInstanceId,
poolConfig: ContainerPoolConfig,
@@ -274,6 +274,7 @@ class ContainerProxy(factory: (TransactionId,
var activeCount = 0;
var healthPingActor: Option[ActorRef] = None //setup after prewarm starts
val tcp: ActorRef = testTcp.getOrElse(IO(Tcp)) //allows to testing interaction with Tcp extension
+
startWith(Uninitialized, NoData())
when(Uninitialized) {
@@ -338,7 +339,7 @@ class ContainerProxy(factory: (TransactionId,
job.msg.rootControllerIndex,
job.msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))
- storeActivation(transid, activation, context)
+ storeActivation(transid, activation, job.msg.blocking, context)
}
.flatMap { container =>
// now attempt to inject the user code and run the action
@@ -835,8 +836,7 @@ class ContainerProxy(factory: (TransactionId,
job.msg.user.namespace.uuid,
CompletionMessage(tid, activation, instance)))
}
- // Storing the record. Entirely asynchronous and not waited upon.
- storeActivation(tid, activation, context)
+ storeActivation(tid, activation, job.msg.blocking, context)
}
// Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
@@ -861,7 +861,7 @@ object ContainerProxy {
Int,
Option[ExecutableWhiskAction]) => Future[Container],
ack: ActiveAck,
- store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
+ store: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
collectLogs: LogsCollector,
instance: InvokerInstanceId,
poolConfig: ContainerPoolConfig,
@@ -908,7 +908,7 @@ object ContainerProxy {
* Creates a WhiskActivation ready to be sent via active ack.
*
* @param job the job that was executed
- * @param interval the time it took to execute the job
+ * @param totalInterval the time it took to execute the job
* @param response the response to return to the user
* @return a WhiskActivation to be sent to the user
*/
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 544ca1a..48b647b 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -141,9 +141,9 @@ class InvokerReactive(
private val collectLogs = new LogStoreCollector(logsProvider)
/** Stores an activation in the database. */
- private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+ private val store = (tid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => {
implicit val transid: TransactionId = tid
- activationStore.storeAfterCheck(activation, context)(tid, notifier = None)
+ activationStore.storeAfterCheck(activation, isBlocking, None, context)(tid, notifier = None, logging)
}
/** Creates a ContainerProxy Actor when being called. */
@@ -228,7 +228,7 @@ class InvokerReactive(
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))
- store(msg.transid, activation, UserContext(msg.user))
+ store(msg.transid, activation, msg.blocking, UserContext(msg.user))
Future.successful(())
}
} else {
@@ -294,4 +294,5 @@ class InvokerReactive(
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
}
})
+
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
index 742755a..6d14ad9 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
@@ -55,12 +55,13 @@ class ElasticSearchLogStoreTests
implicit val materializer: ActorMaterializer = ActorMaterializer()
private val uuid = UUID()
+ private val tenantId = s"testSpace_${uuid}"
private val user =
- Identity(Subject(), Namespace(EntityName("testSpace"), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
+ Identity(Subject(), Namespace(EntityName(tenantId), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
private val activationId = ActivationId.generate()
private val defaultLogSchema =
- ElasticSearchLogFieldConfig("user_logs", "message", "activationId_str", "stream_str", "time_date")
+ ElasticSearchLogFieldConfig("user_logs", "message", "tenantId", "activationId_str", "stream_str", "time_date")
private val defaultConfig =
ElasticSearchLogStoreConfig("https", "host", 443, "/whisk_user_logs/_search", defaultLogSchema)
private val defaultConfigRequiredHeaders =
@@ -76,11 +77,10 @@ class ElasticSearchLogStoreTests
StatusCodes.OK,
entity = HttpEntity(
ContentTypes.`application/json`,
- s"""{"took":799,"timed_out":false,"_shards":{"total":204,"successful":204,"failed":0},"hits":{"total":2,"max_score":null,"hits":[{"_index":"logstash-2018.03.05.02","_type":"user_logs","_id":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","_score":null,"_source":{"time_date":"2018-03-05T02:10:38.196689522Z","accountId":null,"message":"some log stuff\\n","type":"user_logs","event_uuid":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","activationId_str":"$activationId","action_str":"user@email.com/logs" [...]
+ s"""{"took":799,"timed_out":false,"_shards":{"total":204,"successful":204,"failed":0},"hits":{"total":2,"max_score":null,"hits":[{"_index":"logstash-2018.03.05.02","_type":"user_logs","_id":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","_score":null,"_source":{"time_date":"2018-03-05T02:10:38.196689522Z","accountId":null,"message":"some log stuff\\n","type":"user_logs","event_uuid":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","activationId_str":"$activationId","action_str":"user@email.com/logs" [...]
private val defaultPayload = JsObject(
- "query" -> JsObject(
- "query_string" -> JsObject("query" -> JsString(
- s"_type: ${defaultConfig.logSchema.userLogs} AND ${defaultConfig.logSchema.activationId}: $activationId"))),
+ "query" -> JsObject("query_string" -> JsObject("query" -> JsString(
+ s"_type: ${defaultConfig.logSchema.userLogs} AND ${defaultConfig.logSchema.tenantId}: $tenantId AND ${defaultConfig.logSchema.activationId}: $activationId"))),
"sort" -> JsArray(JsObject(defaultConfig.logSchema.time -> JsObject("order" -> JsString("asc")))),
"from" -> JsNumber(0)).compactPrint
private val defaultHttpRequest = HttpRequest(
@@ -96,7 +96,7 @@ class ElasticSearchLogStoreTests
Vector("2018-03-05T02:10:38.196689522Z stdout: some log stuff", "2018-03-05T02:10:38.196754258Z stdout: more logs"))
private val activation = WhiskActivation(
- namespace = EntityPath("namespace"),
+ namespace = EntityPath(tenantId),
name = EntityName("name"),
Subject(),
activationId = activationId,
@@ -135,7 +135,14 @@ class ElasticSearchLogStoreTests
Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
elasticSearchConfig = defaultConfig)
- await(esLogStore.fetchLogs(activation.withoutLogs, defaultContext)) shouldBe expectedLogs
+ await(
+ esLogStore.fetchLogs(
+ activation.withoutLogs.namespace.asString,
+ activation.withoutLogs.activationId,
+ None,
+ None,
+ Some(activation.withoutLogs.logs),
+ defaultContext)) shouldBe expectedLogs
}
it should "get logs from supplied activation record when required headers are not present" in {
@@ -145,7 +152,14 @@ class ElasticSearchLogStoreTests
Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
elasticSearchConfig = defaultConfigRequiredHeaders)
- await(esLogStore.fetchLogs(activation, defaultContext)) shouldBe expectedLogs
+ await(
+ esLogStore.fetchLogs(
+ activation.namespace.asString,
+ activation.activationId,
+ None,
+ None,
+ Some(activation.logs),
+ defaultContext)) shouldBe expectedLogs
}
it should "get user logs from ElasticSearch when required headers are needed" in {
@@ -170,7 +184,14 @@ class ElasticSearchLogStoreTests
entity = HttpEntity.Empty)
val context = UserContext(user, requiredHeadersHttpRequest)
- await(esLogStore.fetchLogs(activation.withoutLogs, context)) shouldBe expectedLogs
+ await(
+ esLogStore.fetchLogs(
+ activation.withoutLogs.namespace.asString,
+ activation.withoutLogs.activationId,
+ None,
+ None,
+ Some(activation.withoutLogs.logs),
+ context)) shouldBe expectedLogs
}
it should "dynamically replace $UUID in request path" in {
@@ -186,13 +207,27 @@ class ElasticSearchLogStoreTests
Some(testFlow(defaultHttpResponse, httpRequest)),
elasticSearchConfig = dynamicPathConfig)
- await(esLogStore.fetchLogs(activation.withoutLogs, defaultContext)) shouldBe expectedLogs
+ await(
+ esLogStore.fetchLogs(
+ activation.withoutLogs.namespace.asString,
+ activation.withoutLogs.activationId,
+ None,
+ None,
+ Some(activation.withoutLogs.logs),
+ defaultContext)) shouldBe expectedLogs
}
it should "fail to connect to invalid host" in {
val esLogStore = new ElasticSearchLogStore(system, elasticSearchConfig = defaultConfig)
- a[Throwable] should be thrownBy await(esLogStore.fetchLogs(activation, defaultContext))
+ a[Throwable] should be thrownBy await(
+ esLogStore.fetchLogs(
+ activation.namespace.asString,
+ activation.activationId,
+ None,
+ None,
+ Some(activation.logs),
+ defaultContext))
}
it should "forward errors from ElasticSearch" in {
@@ -203,7 +238,14 @@ class ElasticSearchLogStoreTests
Some(testFlow(httpResponse, defaultHttpRequest)),
elasticSearchConfig = defaultConfig)
- a[RuntimeException] should be thrownBy await(esLogStore.fetchLogs(activation, defaultContext))
+ a[RuntimeException] should be thrownBy await(
+ esLogStore.fetchLogs(
+ activation.namespace.asString,
+ activation.activationId,
+ None,
+ None,
+ Some(activation.logs),
+ defaultContext))
}
it should "error when configuration protocol is invalid" in {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStoreTests.scala
index 1b185dd..d8816e0 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -69,9 +69,12 @@ class SplunkLogStoreTests
"log_timestamp",
"log_stream",
"log_message",
+ "namespace",
"activation_id",
"somefield::somevalue",
- 22,
+ 10.seconds,
+ 7.days,
+ 22.seconds,
disableSNI = false)
behavior of "Splunk LogStore"
@@ -118,6 +121,7 @@ class SplunkLogStoreTests
val outputMode = form.fields.get("output_mode")
val search = form.fields.get("search")
val execMode = form.fields.get("exec_mode")
+ val maxTime = form.fields.get("max_time")
request.uri.path.toString() shouldBe "/services/search/jobs"
request.headers shouldBe List(Authorization.basic(testConfig.username, testConfig.password))
@@ -125,8 +129,9 @@ class SplunkLogStoreTests
latestTime shouldBe Some(endTimePlusOffset)
outputMode shouldBe Some("json")
execMode shouldBe Some("oneshot")
+ maxTime shouldBe Some("10")
search shouldBe Some(
- s"""search index="${testConfig.index}"| spath ${testConfig.logMessageField}| search ${testConfig.queryConstraints} ${testConfig.activationIdField}=${activation.activationId.toString}| table ${testConfig.logTimestampField}, ${testConfig.logStreamField}, ${testConfig.logMessageField}| reverse""")
+ s"""search index="${testConfig.index}" | search ${testConfig.queryConstraints} | search ${testConfig.namespaceField}=${activation.namespace.asString} | search ${testConfig.activationIdField}=${activation.activationId.toString} | spath ${testConfig.logMessageField} | table ${testConfig.logTimestampField}, ${testConfig.logStreamField}, ${testConfig.logMessageField} | reverse""")
(
Success(
@@ -158,7 +163,14 @@ class SplunkLogStoreTests
it should "find logs based on activation timestamps" in {
//use the a flow that asserts the request structure and provides a response in the expected format
val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig)
- val result = await(splunkStore.fetchLogs(activation, context))
+ val result = await(
+ splunkStore.fetchLogs(
+ activation.namespace.asString,
+ activation.activationId,
+ Some(activation.start),
+ Some(activation.end),
+ None,
+ context))
result shouldBe ActivationLogs(
Vector(
"2007-12-03T10:15:30Z stdout: some log message",
@@ -170,13 +182,15 @@ class SplunkLogStoreTests
it should "fail to connect to bogus host" in {
//use the default http flow with the default bogus-host config
val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig)
- a[Throwable] should be thrownBy await(splunkStore.fetchLogs(activation, context))
+ a[Throwable] should be thrownBy await(
+ splunkStore.fetchLogs(activation.namespace.asString, activation.activationId, None, None, None, context))
}
it should "display an error if API cannot be reached" in {
//use a flow that generates a 500 response
val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig)
- a[RuntimeException] should be thrownBy await(splunkStore.fetchLogs(activation, context))
+ a[RuntimeException] should be thrownBy await(
+ splunkStore.fetchLogs(activation.namespace.asString, activation.activationId, None, None, None, context))
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index d679690..693f600 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -266,11 +266,12 @@ class ContainerProxyTests
invokeCallback: () => Unit = () => ()) =
new LoggedCollector(response, invokeCallback)
- def createStore = LoggedFunction { (transid: TransactionId, activation: WhiskActivation, context: UserContext) =>
- Future.successful(())
+ def createStore = LoggedFunction {
+ (transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) =>
+ Future.successful(())
}
def createSyncStore = SynchronizedLoggedFunction {
- (transid: TransactionId, activation: WhiskActivation, context: UserContext) =>
+ (transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) =>
Future.successful(())
}
val poolConfig = ContainerPoolConfig(2.MB, 0.5, false)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala
index 8797b8e..962b5da 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala
@@ -79,7 +79,7 @@ class ActionsApiWithDbPollingTests extends ControllerTestCommon with WhiskAction
// storing the activation in the db will allow the db polling to retrieve it
// the test harness makes sure the activation id observed by the test matches
// the one generated by the api handler
- storeActivation(activation, context)
+ storeActivation(activation, false, false, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -114,7 +114,7 @@ class ActionsApiWithDbPollingTests extends ControllerTestCommon with WhiskAction
// storing the activation in the db will allow the db polling to retrieve it
// the test harness makes sure the activation id observed by the test matches
// the one generated by the api handler
- storeActivation(activation, context)
+ storeActivation(activation, false, false, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
status should be(InternalServerError)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
index 8ad2e4b..a243397 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
@@ -95,7 +95,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
end = Instant.now)
}.toList
try {
- (notExpectedActivations ++ activations).foreach(storeActivation(_, context))
+ (notExpectedActivations ++ activations).foreach(storeActivation(_, false, false, context))
waitOnListActivationsInNamespace(namespace, 2, context)
org.apache.openwhisk.utils.retry {
@@ -179,7 +179,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
}.toList
try {
- (notExpectedActivations ++ activations).foreach(storeActivation(_, context))
+ (notExpectedActivations ++ activations).foreach(storeActivation(_, false, false, context))
waitOnListActivationsInNamespace(namespace, 2, context)
checkCount("", 2)
@@ -254,7 +254,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
end = now.plusSeconds(30))) // should match
try {
- (notExpectedActivations ++ activations).foreach(storeActivation(_, context))
+ (notExpectedActivations ++ activations).foreach(storeActivation(_, false, false, context))
waitOnListActivationsInNamespace(namespace, activations.length, context)
{ // get between two time stamps
@@ -363,7 +363,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
annotations = Parameters("path", s"${namespace.asString}/pkg/xyz"))
}.toList
try {
- (notExpectedActivations ++ activations ++ activationsInPackage).foreach(storeActivation(_, context))
+ (notExpectedActivations ++ activations ++ activationsInPackage).foreach(storeActivation(_, false, false, context))
waitOnListActivationsMatchingName(namespace, EntityPath("xyz"), activations.length, context)
waitOnListActivationsMatchingName(
namespace,
@@ -479,7 +479,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
}.toList
try {
- activations.foreach(storeActivation(_, context))
+ activations.foreach(storeActivation(_, false, false, context))
waitOnListActivationsInNamespace(namespace, activations.size, context)
Get(s"$collectionPath?skip=1") ~> Route.seal(routes(creds)) ~> check {
@@ -503,7 +503,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
}.toList
try {
- activations.foreach(storeActivation(_, context))
+ activations.foreach(storeActivation(_, false, false, context))
waitOnListActivationsInNamespace(namespace, activations.size, context)
Get(s"$collectionPath?limit=1") ~> Route.seal(routes(creds)) ~> check {
@@ -533,7 +533,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation, context)
+ storeActivation(activation, false, false, context)
Get(s"$collectionPath/${activation.activationId.asString}") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -570,7 +570,75 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation, context)
+ storeActivation(activation, false, false, context)
+
+ Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check {
+ status should be(OK)
+ val response = responseAs[JsObject]
+ response should be(activation.response.toExtendedJson)
+ }
+ } finally {
+ deleteActivation(ActivationId(activation.docid.asString), context)
+ }
+ }
+
+ //// GET /activations/id/result when db store is disabled
+ it should "return activation empty when db store is disabled" in {
+ implicit val tid = transid()
+ val activation =
+ WhiskActivation(
+ namespace,
+ aname(),
+ creds.subject,
+ ActivationId.generate(),
+ start = Instant.now,
+ end = Instant.now)
+
+ storeActivation(activation, true, true, context)
+
+ Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check {
+ status should be(NotFound)
+ }
+ }
+
+ //// GET /activations/id/result when store is disabled and activation is not blocking
+ it should "get activation result by id when db store is disabled and activation is not blocking" in {
+ implicit val tid = transid()
+ val activation =
+ WhiskActivation(
+ namespace,
+ aname(),
+ creds.subject,
+ ActivationId.generate(),
+ start = Instant.now,
+ end = Instant.now)
+ try {
+ storeActivation(activation, false, true, context)
+
+ Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check {
+ status should be(OK)
+ val response = responseAs[JsObject]
+ response should be(activation.response.toExtendedJson)
+ }
+ } finally {
+ deleteActivation(ActivationId(activation.docid.asString), context)
+ }
+ }
+
+ //// GET /activations/id/result when store is disabled and activation is unsuccessful
+ it should "get activation result by id when db store is disabled and activation is unsuccessful" in {
+ implicit val tid = transid()
+ val activation =
+ WhiskActivation(
+ namespace,
+ aname(),
+ creds.subject,
+ ActivationId.generate(),
+ start = Instant.now,
+ end = Instant.now,
+ response = ActivationResponse.whiskError("activation error"))
+ try {
+ storeActivation(activation, true, true, context)
Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -594,7 +662,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation, context)
+ storeActivation(activation, false, false, context)
Get(s"$collectionPath/${activation.activationId.asString}/logs") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -617,7 +685,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
ActivationId.generate(),
start = Instant.now,
end = Instant.now)
- storeActivation(activation, context)
+ storeActivation(activation, false, false, context)
try {
Get(s"$collectionPath/${activation.activationId.asString}/bogus") ~> Route.seal(routes(creds)) ~> check {
@@ -690,7 +758,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
val activation =
new BadActivation(namespace, aname(), creds.subject, ActivationId.generate(), Instant.now, Instant.now)
- storeActivation(activation, context)
+ storeActivation(activation, false, false, context)
Get(s"$collectionPath/${activation.activationId}") ~> Route.seal(routes(creds)) ~> check {
status should be(InternalServerError)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
index 9817bef..7431f12 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
@@ -73,7 +73,7 @@ protected trait ControllerTestCommon
override lazy val entitlementProvider: EntitlementProvider =
new LocalEntitlementProvider(whiskConfig, loadBalancer, instance)
- override val activationIdFactory = new ActivationId.ActivationIdGenerator() {
+ override val activationIdFactory = new ActivationId.ActivationIdGenerator {
// need a static activation id to test activations api
private val fixedId = ActivationId.generate()
override def make = fixedId
@@ -124,9 +124,15 @@ protected trait ControllerTestCommon
Await.result(activationStore.get(activationId, context), timeout)
}
- def storeActivation(activation: WhiskActivation, context: UserContext)(implicit transid: TransactionId,
- timeout: Duration = 10 seconds): DocInfo = {
- val docFuture = activationStore.storeAfterCheck(activation, context)
+ def storeActivation(
+ activation: WhiskActivation,
+ isBlockingActivation: Boolean,
+ disableStore: Boolean,
+ context: UserContext)(implicit transid: TransactionId, timeout: Duration = 10 seconds): DocInfo = {
+ val docFuture = activationStore.storeAfterCheck(activation, isBlockingActivation, Some(disableStore), context)(
+ transid,
+ notifier = None,
+ logging)
val doc = Await.result(docFuture, timeout)
assert(doc != null)
doc
@@ -248,6 +254,52 @@ protected trait ControllerTestCommon
implicit val serdes = jsonFormat5(BadEntity.apply)
override val cacheEnabled = true
}
+
+ /**
+ * Makes a simple sequence action and installs it in the db (no call to wsk api/cli).
+ * All actions are in the default package.
+ *
+ * @param sequenceName the name of the sequence
+ * @param ns the namespace to be used when creating the component actions and the sequence action
+ * @param components the names of the actions (entity names, no namespace)
+ */
+ protected def putSimpleSequenceInDB(sequenceName: String, ns: EntityPath, components: Vector[String])(
+ implicit tid: TransactionId) = {
+ val seqAction = makeSimpleSequence(sequenceName, ns, components)
+ put(entityStore, seqAction)
+ }
+
+ /**
+ * Returns a WhiskAction that can be used to create/update a sequence.
+ * If instructed to do so, installs the component actions in the db.
+ * All actions are in the default package.
+ *
+ * @param sequenceName the name of the sequence
+ * @param ns the namespace to be used when creating the component actions and the sequence action
+ * @param componentNames the names of the actions (entity names, no namespace)
+ * @param installDB if true, installs the component actions in the db (default true)
+ */
+ protected def makeSimpleSequence(sequenceName: String,
+ ns: EntityPath,
+ componentNames: Vector[String],
+ installDB: Boolean = true)(implicit tid: TransactionId): WhiskAction = {
+ if (installDB) {
+ // create bogus wsk actions
+ val wskActions = componentNames.toSet[String] map { c =>
+ WhiskAction(ns, EntityName(c), jsDefault("??"))
+ }
+ // add them to the db
+ wskActions.foreach {
+ put(entityStore, _)
+ }
+ }
+ // add namespace to component names
+ val components = componentNames map { c =>
+ stringToFullyQualifiedName(s"/$ns/$c")
+ }
+ // create wsk action for the sequence
+ WhiskAction(ns, EntityName(sequenceName), sequence(components))
+ }
}
class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionContext) extends LoadBalancer {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/SequenceApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/SequenceApiTests.scala
index db7adec..896c14a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/SequenceApiTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/SequenceApiTests.scala
@@ -17,21 +17,22 @@
package org.apache.openwhisk.core.controller.test
-import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
import java.time.Instant
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import akka.http.scaladsl.model.StatusCodes._
+
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Route
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.controller.WhiskActionsApi
import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.http.{ErrorResponse, Messages}
import org.apache.openwhisk.http.Messages.sequenceComponentNotFound
+import org.apache.openwhisk.http.{ErrorResponse, Messages}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
/**
* Tests Sequence API - stand-alone tests that require only the controller to be up
@@ -431,52 +432,6 @@ class SequenceApiTests extends ControllerTestCommon with WhiskActionsApi {
}
}
- /**
- * Makes a simple sequence action and installs it in the db (no call to wsk api/cli).
- * All actions are in the default package.
- *
- * @param sequenceName the name of the sequence
- * @param ns the namespace to be used when creating the component actions and the sequence action
- * @param components the names of the actions (entity names, no namespace)
- */
- private def putSimpleSequenceInDB(sequenceName: String, ns: EntityPath, components: Vector[String])(
- implicit tid: TransactionId) = {
- val seqAction = makeSimpleSequence(sequenceName, ns, components)
- put(entityStore, seqAction)
- }
-
- /**
- * Returns a WhiskAction that can be used to create/update a sequence.
- * If instructed to do so, installs the component actions in the db.
- * All actions are in the default package.
- *
- * @param sequenceName the name of the sequence
- * @param ns the namespace to be used when creating the component actions and the sequence action
- * @param componentNames the names of the actions (entity names, no namespace)
- * @param installDB if true, installs the component actions in the db (default true)
- */
- private def makeSimpleSequence(sequenceName: String,
- ns: EntityPath,
- componentNames: Vector[String],
- installDB: Boolean = true)(implicit tid: TransactionId): WhiskAction = {
- if (installDB) {
- // create bogus wsk actions
- val wskActions = componentNames.toSet[String] map { c =>
- WhiskAction(ns, EntityName(c), jsDefault("??"))
- }
- // add them to the db
- wskActions.foreach {
- put(entityStore, _)
- }
- }
- // add namespace to component names
- val components = componentNames map { c =>
- stringToFullyQualifiedName(s"/$ns/$c")
- }
- // create wsk action for the sequence
- WhiskAction(namespace, EntityName(sequenceName), sequence(components))
- }
-
private def logContains(w: String)(implicit stream: java.io.ByteArrayOutputStream): Boolean = {
org.apache.openwhisk.utils.retry({
val log = stream.toString()