You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/06/20 16:16:39 UTC
[incubator-openwhisk] branch master updated: Update sequence impl
to tune controller memory consumption (#2387)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 7f399a4 Update sequence impl to tune controller memory consumption (#2387)
7f399a4 is described below
commit 7f399a43e561765a23fb377e7747ca4c8c397ea6
Author: Nick Mitchell <st...@users.noreply.github.com>
AuthorDate: Tue Jun 20 12:16:36 2017 -0400
Update sequence impl to tune controller memory consumption (#2387)
- switch to scheduleOnce+weakrefs for timeout handling in SequenceActions
- switch SequenceAccounting to store array of ActivationId rather than array of String -- cheaper in memory
- use better (non-dragging) impl of withTimeout
- use a getAndSet(null) pattern to avoid two copies of responses being alive simultaneously
- refactor top level sequence scheduler to eliminate promises
---
.../scala/whisk/core/entity/ActivationId.scala | 2 +-
.../src/main/scala/whisk/http/ErrorResponse.scala | 3 +-
.../whisk/utils/ExecutionContextFactory.scala | 18 +-
.../core/controller/actions/SequenceActions.scala | 498 ++++++++++++---------
.../test/scala/system/basic/WskSequenceTests.scala | 93 ++--
.../actions/test/SequenceAccountingTests.scala | 141 ++++++
.../utils/test/ExecutionContextFactoryTests.scala | 43 ++
7 files changed, 531 insertions(+), 267 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala
index 374320c..4a71681 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala
@@ -39,7 +39,7 @@ import whisk.http.Messages
*
* @param id the activation id, required not null
*/
-protected[core] class ActivationId private (private val id: java.util.UUID) extends AnyVal {
+protected[whisk] class ActivationId private (private val id: java.util.UUID) extends AnyVal {
def asString = toString
override def toString = id.toString.replaceAll("-", "")
def toJsObject = JsObject("activationId" -> toString.toJson)
diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index d89d749..547618d 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -35,6 +35,7 @@ import whisk.common.TransactionId
import whisk.core.entity.SizeError
import whisk.core.entity.ByteSize
import whisk.core.entity.Exec
+import whisk.core.entity.ActivationId
object Messages {
/** Standard message for reporting resource conflicts. */
@@ -95,7 +96,7 @@ object Messages {
val notAllowedOnBinding = "Operation not permitted on package binding."
/** Error messages for sequence activations. */
- val sequenceRetrieveActivationTimeout = "Timeout reached when retrieving activation for sequence component."
+ def sequenceRetrieveActivationTimeout(id: ActivationId) = s"Timeout reached when retrieving activation $id for sequence component."
val sequenceActivationFailure = "Sequence failed."
/** Error messages for bad requests where parameters do not conform. */
diff --git a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
index bdbd00a..5aa465c 100644
--- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
+++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
@@ -22,16 +22,32 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
+import scala.util.Try
import akka.actor.ActorSystem
import akka.pattern.{ after => expire }
object ExecutionContextFactory {
+ // Future.firstCompletedOf has a memory drag bug
+ // https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism
+ def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
+ val p = Promise[T]()
+ val pref = new java.util.concurrent.atomic.AtomicReference(p)
+ val completeFirst: Try[T] => Unit = { result: Try[T] =>
+ val promise = pref.getAndSet(null)
+ if (promise != null) {
+ promise.tryComplete(result)
+ }
+ }
+ futures foreach { _ onComplete completeFirst }
+ p.future
+ }
+
implicit class FutureExtensions[T](f: Future[T]) {
def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit system: ActorSystem): Future[T] = {
implicit val ec = system.dispatcher
- Future firstCompletedOf Seq(f, expire(timeout, system.scheduler)(Future.failed(msg)))
+ firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(Future.failed(msg))))
}
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index 4c4ccbe..27dac99 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -18,16 +18,15 @@ package whisk.core.controller.actions
import java.time.Clock
import java.time.Instant
+import java.util.concurrent.atomic.AtomicReference
-import scala.Left
-import scala.Right
+import scala.collection._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
-import scala.concurrent.Promise
import scala.concurrent.duration._
+import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success
-import scala.util.Try
import akka.actor.ActorSystem
import spray.json._
@@ -98,71 +97,59 @@ protected[actions] trait SequenceActions {
// create new activation id that corresponds to the sequence
val seqActivationId = activationIdFactory.make()
logging.info(this, s"invoking sequence $action topmost $topmost activationid '$seqActivationId'")
+
val start = Instant.now(Clock.systemUTC())
- val seqActivationPromise = Promise[Option[WhiskActivation]]
- // the cause for the component activations is the current sequence
- val futureWskActivations = invokeSequenceComponents(user, action, seqActivationId, payload, components, cause = Some(seqActivationId), atomicActionsCount)
- val futureSeqResult = Future.sequence(futureWskActivations)
- val response: Future[(ActivationId, Option[WhiskActivation], Int)] =
- if (topmost) { // need to deal with blocking and closing connection
- if (blocking) {
- val timeout = maxWaitForBlockingActivation + blockingInvokeGrace
- val futureSeqResultTimeout = futureSeqResult withTimeout (timeout, new BlockingInvokeTimeout(seqActivationId))
- // if the future fails with a timeout, the failure is dealt with at the caller level
- futureSeqResultTimeout map { wskActivationTuples =>
- val wskActivationEithers = wskActivationTuples.map(_._1)
- // the execution of the sequence was successful, return the result
- val end = Instant.now(Clock.systemUTC())
- val seqActivation = Some(makeSequenceActivation(user, action, seqActivationId, wskActivationEithers, topmost, cause, start, end))
- val atomicActionCnt = wskActivationTuples.last._2
- (seqActivationId, seqActivation, atomicActionCnt)
- } andThen {
- case Success((_, seqActivation, _)) => seqActivationPromise.success(seqActivation)
- case Failure(t) => seqActivationPromise.success(None)
- }
- } else {
- // non-blocking sequence execution, return activation id
- Future.successful((seqActivationId, None, 0)) andThen {
- case _ => seqActivationPromise.success(None)
- }
- }
+ val futureSeqResult = {
+ completeSequenceActivation(
+ seqActivationId,
+ // the cause for the component activations is the current sequence
+ invokeSequenceComponents(user, action, seqActivationId, payload, components, cause = Some(seqActivationId), atomicActionsCount),
+ user, action, topmost, start, cause)
+ }
+
+ if (topmost) { // need to deal with blocking and closing connection
+ if (blocking) {
+ logging.info(this, s"invoke sequence blocking topmost!")
+ val timeout = maxWaitForBlockingActivation + blockingInvokeGrace
+ // if the future fails with a timeout, the failure is dealt with at the caller level
+ futureSeqResult.withTimeout(timeout, new BlockingInvokeTimeout(seqActivationId))
} else {
- // not topmost, no need to worry about terminating incoming request
- futureSeqResult map { wskActivationTuples =>
- val wskActivationEithers = wskActivationTuples.map(_._1)
- // all activations are successful, the result of the sequence is the result of the last activation
- val end = Instant.now(Clock.systemUTC())
- val seqActivation = Some(makeSequenceActivation(user, action, seqActivationId, wskActivationEithers, topmost, cause, start, end))
- val atomicActionCnt = wskActivationTuples.last._2
- (seqActivationId, seqActivation, atomicActionCnt)
- } andThen {
- case Success((_, seqActivation, _)) => seqActivationPromise.success(seqActivation)
- case Failure(t) => seqActivationPromise.success(None)
- }
+ // non-blocking sequence execution, return activation id
+ Future.successful((seqActivationId, None, 0))
}
+ } else {
+ // not topmost, no need to worry about terminating incoming request
+ // Note: the future for the sequence result recovers from all throwable failures
+ futureSeqResult
+ }
+ }
- // store result of sequence execution
- // if seqActivation is defined, use it; otherwise create it (e.g., for non-blocking activations)
- // the execution can reach here without a seqActivation due to non-blocking activations OR blocking activations that reach the blocking invoke timeout
- // futureSeqResult should always be successful, if failed, there is an error
- futureSeqResult flatMap { tuples => seqActivationPromise.future map { (tuples, _) } } onComplete {
- case Success((wskActivationTuples, seqActivation)) =>
- // all activations were successful
- val activation = seqActivation getOrElse {
- val wskActivationEithers = wskActivationTuples.map(_._1)
- val end = Instant.now(Clock.systemUTC())
- // the response of the sequence is the response of the very last activation
- makeSequenceActivation(user, action, seqActivationId, wskActivationEithers, topmost, cause, start, end)
- }
- storeSequenceActivation(activation)
- case Failure(t: Throwable) =>
- // consider this whisk error
- // TODO shall we attempt storing the activation if it exists or even inspect the futures?
- // this should be a pretty serious whisk errror if it gets here
+ /**
+ * Creates an activation for the sequence and writes it back to the datastore.
+ */
+ private def completeSequenceActivation(
+ seqActivationId: ActivationId,
+ futureSeqResult: Future[SequenceAccounting],
+ user: Identity,
+ action: WhiskAction,
+ topmost: Boolean,
+ start: Instant,
+ cause: Option[ActivationId])(
+ implicit transid: TransactionId): Future[(ActivationId, Some[WhiskActivation], Int)] = {
+ // not topmost, no need to worry about terminating incoming request
+ // Note: the future for the sequence result recovers from all throwable failures
+ futureSeqResult.map { accounting =>
+ // sequence terminated, the result of the sequence is the result of the last completed activation
+ val end = Instant.now(Clock.systemUTC())
+ val seqActivation = makeSequenceActivation(user, action, seqActivationId, accounting, topmost, cause, start, end)
+ (seqActivationId, Some(seqActivation), accounting.atomicActionCnt)
+ }.andThen {
+ case Success((_, Some(seqActivation), _)) => storeSequenceActivation(seqActivation)
+ case Failure(t) =>
+ // This should never happen; in this case, there is no activation record created or stored:
+ // should there be?
logging.error(this, s"Sequence activation failed: ${t.getMessage}")
}
-
- response
}
/**
@@ -172,7 +159,7 @@ protected[actions] trait SequenceActions {
logging.info(this, s"recording activation '${activation.activationId}'")
WhiskActivation.put(activationStore, activation) onComplete {
case Success(id) => logging.info(this, s"recorded activation")
- case Failure(t) => logging.error(this, s"failed to record activation")
+ case Failure(t) => logging.error(this, s"failed to record activation ${activation.activationId} with error ${t.getLocalizedMessage}")
}
}
@@ -183,49 +170,20 @@ protected[actions] trait SequenceActions {
user: Identity,
action: WhiskAction,
activationId: ActivationId,
- wskActivationEithers: Vector[Either[ActivationResponse, WhiskActivation]],
+ accounting: SequenceAccounting,
topmost: Boolean,
cause: Option[ActivationId],
start: Instant,
end: Instant): WhiskActivation = {
- // extract all successful activations from the vector of activation eithers
- // the vector is either all rights, all lefts, or some rights followed by some lefts (no interleaving)
- val (right, left) = wskActivationEithers.span(_.isRight)
- val wskActivations = right.map(_.right.get)
-
- // the activation response is either the first left if it exists or the response of the last successful activation
- val activationResponse = if (left.length == 0) {
- wskActivations.last.response
- } else {
- left.head.left.get
- }
-
- // compose logs
- val logs = ActivationLogs(wskActivations map {
- activation => activation.activationId.toString
- })
-
- // compute duration
- val duration = (wskActivations map { activation =>
- activation.duration getOrElse {
- logging.error(this, s"duration for $activation is not defined")
- activation.end.toEpochMilli - activation.start.toEpochMilli
- }
- }).sum
-
// compute max memory
- val maxMemory = Try {
- val memoryLimits = wskActivations map { activation =>
- val limits = ActionLimits.serdes.read(activation.annotations.get("limits").get)
- limits.memory.megabytes
- }
- memoryLimits.max.MB
- }
-
- val sequenceLimits = maxMemory map {
- mb => ActionLimits(action.limits.timeout, MemoryLimit(mb), action.limits.logs)
- }
+ val sequenceLimits = accounting.maxMemory map {
+ maxMemoryAcrossActionsInSequence =>
+ Parameters("limits", ActionLimits(
+ action.limits.timeout,
+ MemoryLimit(maxMemoryAcrossActionsInSequence MB),
+ action.limits.logs).toJson)
+ } getOrElse (Parameters())
// set causedBy if not topmost sequence
val causedBy = if (!topmost) {
@@ -243,16 +201,16 @@ protected[actions] trait SequenceActions {
start = start,
end = end,
cause = if (topmost) None else cause, // propagate the cause for inner sequences, but undefined for topmost
- response = activationResponse,
- logs = logs,
+ response = accounting.previousResponse.getAndSet(null), // getAndSet(null) drops reference to the activation result
+ logs = accounting.finalLogs,
version = action.version,
publish = false,
annotations = Parameters("topmost", JsBoolean(topmost)) ++
Parameters("path", action.fullyQualifiedName(false).toString) ++
Parameters("kind", "sequence") ++
causedBy ++
- sequenceLimits.map(l => Parameters("limits", l.toJson)).getOrElse(Parameters()),
- duration = Some(duration))
+ sequenceLimits,
+ duration = Some(accounting.duration))
}
/**
@@ -264,96 +222,62 @@ protected[actions] trait SequenceActions {
* @param user the user invoking the sequence
* @param seqAction the sequence invoked
* @param seqActivationId the id of the sequence
- * @param payload the payload passed to the first component in the sequence
+ * @param inputPayload the payload passed to the first component in the sequence
* @param components the components in the sequence
* @param cause the activation id of the sequence that lead to invoking this sequence or None if this sequence is topmost
* @param atomicActionCnt the dynamic atomic action count observed so far since the start of the execution of the topmost sequence
- * @return a vector of successful futures; each element contains a tuple with
- * 1. an either with activation(right) or activation response in case of error (left)
- * 2. the dynamic atomic action count after executing the components
+ * @return a future which resolves with the accounting for a sequence, including the last result, duration, and activation ids
*/
private def invokeSequenceComponents(
user: Identity,
seqAction: WhiskAction,
seqActivationId: ActivationId,
- payload: Option[JsObject],
+ inputPayload: Option[JsObject],
components: Vector[FullyQualifiedEntityName],
cause: Option[ActivationId],
atomicActionCnt: Int)(
- implicit transid: TransactionId): Vector[Future[(Either[ActivationResponse, WhiskActivation], Int)]] = {
- logging.info(this, s"invoke sequence $seqAction ($seqActivationId) with components $components")
-
- // first retrieve the information/entities on all actions
- // do not wait to successfully retrieve all the actions before starting the execution
- // start execution of the first action while potentially still retrieving entities
- // Note: the execution starts even if one of the futures retrieving an entity may fail
- // first components need to be resolved given any package bindings and the params need to be merged
- // NOTE: OLD-STYLE sequences may have default namespace in the names of the components, resolve default namespace first
- val resolvedFutureActions = resolveDefaultNamespace(components, user) map { c => WhiskAction.resolveActionAndMergeParameters(entityStore, c) }
-
- // "scan" the wskActions to execute them in blocking fashion
- // use scanLeft instead of foldLeft as we need the intermediate results
- // TODO: double-check the package param policy
- // env are the parameters for the package that the sequence is in; throw them away, not used in the sequence execution
- // create a "fake" WhiskActivation to hold the payload of the sequence to init the scanLeft
- val fakeStart = Instant.now()
- val fakeEnd = Instant.now()
- val fakeResponse = ActivationResponse.payloadPlaceholder(payload)
-
- // NOTE: the init value is a fake (unused) activation to bootstrap the invocations of actions
- val initFakeWhiskActivation: Future[(Either[ActivationResponse, WhiskActivation], Int, Boolean)] = Future successful {
- // use boolean in tuple to indicate first/incoming payload
- (Right(WhiskActivation(seqAction.namespace, seqAction.name, user.subject, seqActivationId, fakeStart, fakeEnd, response = fakeResponse, duration = None)), atomicActionCnt, true)
+ implicit transid: TransactionId): Future[SequenceAccounting] = {
+
+ // For each action in the sequence, fetch any of its associated parameters (including package or binding).
+ // We do this for all of the actions in the sequence even though it may be short circuited. This is to
+ // hide the latency of the fetches from the datastore and the parameter merging that has to occur. It
+ // may be desirable in the future to selectively speculate over a smaller number of components rather than
+ // the entire sequence.
+ //
+ // This action/parameter resolution is done in futures; the execution starts as soon as the first component
+ // is resolved.
+ val resolvedFutureActions = resolveDefaultNamespace(components, user) map {
+ c => WhiskAction.resolveActionAndMergeParameters(entityStore, c)
+ }
+
+ // this holds the initial value of the accounting structure, including the input boxed as an ActivationResponse
+ val initialAccounting = Future.successful {
+ SequenceAccounting(atomicActionCnt, ActivationResponse.payloadPlaceholder(inputPayload))
}
- // seqComponentWskActivationFutures contains a fake activation on the first position in the vector; the rest of the vector is the result of each component execution/activation
- val seqComponentWskActivationFutures = resolvedFutureActions.scanLeft(initFakeWhiskActivation) {
- (futureActivationAtomicCntTuple, futureAction) =>
- futureAction flatMap {
- action =>
- futureActivationAtomicCntTuple flatMap {
- case (activationEither, atomicActionCount, first) =>
- activationEither match {
- case Right(activation) =>
- val payload = activation.response.result.map(_.asJsObject)
- // first check conditions on payload that may lead to interrupting the execution of the sequence
- val payloadContent = payload getOrElse JsObject.empty
- val errorFields = payloadContent.getFields(ActivationResponse.ERROR_FIELD)
- // short-circuit the execution of the sequence iff the payload contains an error field and is the result of an action return, not the initial payload
- val errorShortcircuit = !errorFields.isEmpty && !first
- if (!errorShortcircuit) {
- // second check the atomic action count for sequence action limit)
- if (atomicActionCount >= actionSequenceLimit) {
- val activationResponse = ActivationResponse.applicationError(s"$sequenceIsTooLong")
- Future.successful(Left(activationResponse), atomicActionCount, false) // dynamic action count and first don't really matter anymore
- } else {
- val compResultFuture : Future[(Either[ActivationResponse, WhiskActivation], Int)] = invokeSeqOneComponent(user, action, payload, cause, atomicActionCount)
- compResultFuture map {
- activationDynamicCountPair => (activationDynamicCountPair._1, activationDynamicCountPair._2, false) // it's not first payload anymore
- }
- }
- } else {
- // there is an error field, terminate sequence early
- // propagate the activation response
- Future.successful(Left(activation.response), atomicActionCount, false) // dynamic action count and first don't really matter anymore
- }
- case Left(activationResponse) =>
- // the sequence is interrupted, no more processing
- Future.successful(Left(activationResponse), 0, false) // dynamic action count and first do not matter from now on
- }
+ // execute the actions in sequential blocking fashion
+ resolvedFutureActions.foldLeft(initialAccounting) {
+ (accountingFuture, futureAction) =>
+ accountingFuture.flatMap { accounting =>
+ if (accounting.atomicActionCnt < actionSequenceLimit) {
+ invokeNextAction(user, futureAction, accounting, cause).flatMap { accounting =>
+ if (!accounting.shortcircuit) {
+ Future.successful(accounting)
+ } else {
+ // this is to short circuit the fold
+ Future.failed(FailedSequenceActivation(accounting)) // terminates the fold
+ }
}
- } recover {
- // check any failure here and generate an activation response such that this method always returns a vector of successful futures
- case t: Throwable =>
- // consider this failure a whisk error
- val activationResponse = ActivationResponse.whiskError(sequenceActivationFailure)
- (Left(activationResponse), 0, false)
+ } else {
+ val updatedAccount = accounting.fail(ActivationResponse.applicationError(sequenceIsTooLong), None)
+ Future.failed(FailedSequenceActivation(updatedAccount)) // terminates the fold
+ }
}
- }
- // drop the first future which contains the init value from scanLeft and project the first two fields from the tuples
- // the third one was used to treat error property differently for first action vs the rest of the actions in the sequence (not useful past this point)
- seqComponentWskActivationFutures.drop(1) map {
- tupleFuture => tupleFuture map { tuple => (tuple._1, tuple._2) }
+ }.recoverWith {
+ // turn the failed accounting back to success; this is the only possible failure
+ // since all throwables are recovered with a failed accounting instance and this is
+ // in turned boxed to FailedSequenceActivation
+ case FailedSequenceActivation(accounting) => Future.successful(accounting)
}
}
@@ -365,55 +289,191 @@ protected[actions] trait SequenceActions {
*
* The method distinguishes between invoking a sequence or an atomic action.
* @param user the user executing the sequence
- * @param action the action to be invoked
- * @param payload the payload for the action
- * @param cause the activation id of the first sequence containing this action
- * @param atomicActionCount the number of activations
- * @return future with the result of the invocation and the dynamic atomic action count so far
+ * @param futureAction the future which fetches the action to be invoked from the db
+ * @param accounting the state of the sequence activation, contains the dynamic activation count, logs and payload for the next action
+ * @param cause the activation id of the first sequence containing this activations
+ * @return a future which resolves with updated accounting for a sequence, including the last result, duration, and activation ids
*/
- private def invokeSeqOneComponent(user: Identity, action: WhiskAction, payload: Option[JsObject], cause: Option[ActivationId], atomicActionCount: Int)(
- implicit transid: TransactionId): Future[(Either[ActivationResponse, WhiskActivation], Int)] = {
- // invoke the action by calling the right method depending on whether it's an atomic action or a sequence
- // the tuple contains activationId, wskActivation, atomicActionCount (up till this point in execution)
- val futureWhiskActivationTuple = action.exec match {
- case SequenceExec(components) =>
- // invoke a sequence
- logging.info(this, s"sequence invoking an enclosed sequence $action")
- // call invokeSequence to invoke the inner sequence
- // true for blocking; false for topmost
- invokeSequence(user, action, payload, blocking = true, topmost = false, components, cause, atomicActionCount) map {
- case (activationId, wskActivation, seqAtomicActionCnt) =>
- (activationId, wskActivation, seqAtomicActionCnt + atomicActionCount)
- }
- case _ =>
- // this is an invoke for an atomic action
- logging.info(this, s"sequence invoking an enclosed atomic action $action")
- val timeout = action.limits.timeout.duration + blockingInvokeGrace
- invokeSingleAction(user, action, payload, timeout, blocking = true, cause) map {
- case (activationId, wskActivation) => (activationId, wskActivation, atomicActionCount + 1)
- }
- }
+ private def invokeNextAction(
+ user: Identity,
+ futureAction: Future[WhiskAction],
+ accounting: SequenceAccounting,
+ cause: Option[ActivationId])(
+ implicit transid: TransactionId): Future[SequenceAccounting] = {
+ futureAction.flatMap { action =>
+ // the previous response becomes input for the next action in the sequence;
+ // the accounting no longer needs to hold a reference to it once the action is
+ // invoked, so previousResponse.getAndSet(null) drops the reference at this point
+ // which prevents dragging the previous response for the lifetime of the next activation
+ val inputPayload = accounting.previousResponse.getAndSet(null).result.map(_.asJsObject)
+
+ // invoke the action by calling the right method depending on whether it's an atomic action or a sequence
+ val futureWhiskActivationTuple = action.exec match {
+ case SequenceExec(components) =>
+ logging.info(this, s"sequence invoking an enclosed sequence $action")
+ // call invokeSequence to invoke the inner sequence
+ invokeSequence(user, action, inputPayload, blocking = true, topmost = false, components, cause, accounting.atomicActionCnt)
+ case _ =>
+ // this is an invoke for an atomic action
+ logging.info(this, s"sequence invoking an enclosed atomic action $action")
+ val timeout = action.limits.timeout.duration + blockingInvokeGrace
+ invokeSingleAction(user, action, inputPayload, timeout, blocking = true, cause) map {
+ case (activationId, wskActivation) => (activationId, wskActivation, accounting.atomicActionCnt + 1)
+ }
+ }
- futureWhiskActivationTuple map {
- case (activationId, wskActivation, atomicActionCountSoFar) =>
- // the activation is None only if the activation could not be retrieved either from active ack or from db
- wskActivation match {
- case Some(activation) => (Right(activation), atomicActionCountSoFar)
- case None => {
- val activationResponse = ActivationResponse.whiskError(s"$sequenceRetrieveActivationTimeout Activation id '$activationId'.")
- (Left(activationResponse), atomicActionCountSoFar) // dynamic count doesn't matter, sequence will be interrupted
+ futureWhiskActivationTuple.map {
+ case (activationId, wskActivation, atomicActionCountSoFar) =>
+ wskActivation.map {
+ activation => accounting.maybe(activation, atomicActionCountSoFar, actionSequenceLimit)
+ }.getOrElse {
+ // the wskActivation is None only if the result could not be retrieved in time either from active ack or from db
+ logging.error(this, s"component activation timedout for $activationId")
+ val activationResponse = ActivationResponse.whiskError(sequenceRetrieveActivationTimeout(activationId))
+ accounting.fail(activationResponse, Some(activationId))
}
- }
+ }.recover {
+ // check any failure here and generate an activation response to encapsulate
+ // the failure mode; consider this failure a whisk error
+ case t: Throwable =>
+ logging.error(this, s"component activation failed: $t")
+ accounting.fail(ActivationResponse.whiskError(sequenceActivationFailure), None)
+ }
}
}
/** Replaces default namespaces in a vector of components from a sequence with appropriate namespace. */
private def resolveDefaultNamespace(components: Vector[FullyQualifiedEntityName], user: Identity): Vector[FullyQualifiedEntityName] = {
- // if components are part of the default namespace, they contain `_`; replace it!
- val resolvedComponents = components map { c => FullyQualifiedEntityName(c.path.resolveNamespace(user.namespace), c.name) }
- resolvedComponents
+ // resolve any namespaces that may appears as "_" (the default namespace)
+ components.map(c => FullyQualifiedEntityName(c.path.resolveNamespace(user.namespace), c.name))
}
/** Max atomic action count allowed for sequences */
private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
}
+
+/**
+ * Cumulative accounting of what happened during the execution of a sequence.
+ *
+ * @param atomicActionCnt the current count of non-sequence (c.f. atomic) actions already invoked
+ * @param previousResponse a reference to the previous activation result which will be nulled out
+ * when no longer needed (see previousResponse.getAndSet(null) below)
+ * @param logs a mutable buffer that is appended with new activation ids as the sequence unfolds
+ * @param duration the "user" time so far executing the sequence (sum of durations for
+ * all actions invoked so far which is different from the total time spent executing the sequence)
+ * @param maxMemory the maximum memory annotation observed so far for the
+ * components (needed to annotate the sequence with GB-s)
+ * @param shortcircuit when true, stops the execution of the next component in the sequence
+ */
+protected[actions] case class SequenceAccounting(
+ atomicActionCnt: Int,
+ previousResponse: AtomicReference[ActivationResponse],
+ logs: mutable.Buffer[ActivationId],
+ duration: Long = 0,
+ maxMemory: Option[Int] = None,
+ shortcircuit: Boolean = false) {
+
+ /** @return the ActivationLogs data structure for this sequence invocation */
+ def finalLogs = ActivationLogs(logs.map(id => id.asString).toVector)
+
+ /** The previous activation was successful. */
+ private def success(activation: WhiskActivation, newCnt: Int, shortcircuit: Boolean = false) = {
+ previousResponse.set(null)
+ SequenceAccounting(
+ prev = this,
+ newCnt = newCnt,
+ shortcircuit = shortcircuit,
+ incrDuration = activation.duration,
+ newResponse = activation.response,
+ newActivationId = activation.activationId,
+ newMemoryLimit = activation.annotations.get("limits") map {
+ limitsAnnotation => // we have a limits annotation
+ limitsAnnotation.asJsObject.getFields("memory") match {
+ case Seq(JsNumber(memory)) => Some(memory.toInt) // we have a numerical "memory" field in the "limits" annotation
+ }
+ } getOrElse { None })
+ }
+
+ /** The previous activation failed (this is used when there is no activation record or an internal error. */
+ def fail(failureResponse: ActivationResponse, activationId: Option[ActivationId]) = {
+ require(!failureResponse.isSuccess)
+ logs.appendAll(activationId)
+ copy(previousResponse = new AtomicReference(failureResponse), shortcircuit = true)
+ }
+
+ /** Determines whether the previous activation succeeded or failed. */
+ def maybe(activation: WhiskActivation, newCnt: Int, maxSequenceCnt: Int) = {
+ // check conditions on payload that may lead to interrupting the execution of the sequence
+ // short-circuit the execution of the sequence iff the payload contains an error field
+ // and is the result of an action return, not the initial payload
+ val outputPayload = activation.response.result.map(_.asJsObject)
+ val payloadContent = outputPayload getOrElse JsObject.empty
+ val errorField = payloadContent.fields.get(ActivationResponse.ERROR_FIELD)
+ val withinSeqLimit = newCnt <= maxSequenceCnt
+
+ if (withinSeqLimit && errorField.isEmpty) {
+ // all good with this action invocation
+ success(activation, newCnt)
+ } else {
+ val nextActivation = if (!withinSeqLimit) {
+ // no error in the activation but the dynamic count of actions exceeds the threshold
+ // this is here as defensive code; the activation should not occur if its takes the
+ // count above its limit
+ val newResponse = ActivationResponse.applicationError(sequenceIsTooLong)
+ activation.copy(response = newResponse)
+ } else {
+ assert(errorField.isDefined)
+ activation
+ }
+
+ // there is an error field in the activation response. here, we treat this like success,
+ // in the sense of tallying up the accounting fields, but terminate the sequence early
+ success(nextActivation, newCnt, shortcircuit = true)
+ }
+ }
+}
+
+/**
+ * Three constructors for SequenceAccounting:
+ * - one for successful invocation of an action in the sequence,
+ * - one for failed invocation, and
+ * - one to initialize things
+ */
+protected[actions] object SequenceAccounting {
+
+ def maxMemory(prevMemoryLimit: Option[Int], newMemoryLimit: Option[Int]): Option[Int] = {
+ (prevMemoryLimit ++ newMemoryLimit).reduceOption(Math.max)
+ }
+
+ // constructor for successful invocations, or error'ing ones (where shortcircuit = true)
+ def apply(
+ prev: SequenceAccounting,
+ newCnt: Int,
+ incrDuration: Option[Long],
+ newResponse: ActivationResponse,
+ newActivationId: ActivationId,
+ newMemoryLimit: Option[Int],
+ shortcircuit: Boolean): SequenceAccounting = {
+
+ // compute the new max memory
+ val newMaxMemory = maxMemory(prev.maxMemory, newMemoryLimit)
+
+ // append log entry
+ prev.logs += newActivationId
+
+ SequenceAccounting(
+ atomicActionCnt = newCnt,
+ previousResponse = new AtomicReference(newResponse),
+ logs = prev.logs,
+ duration = incrDuration map { prev.duration + _ } getOrElse { prev.duration },
+ maxMemory = newMaxMemory,
+ shortcircuit = shortcircuit)
+ }
+
+ // constructor for initial payload
+ def apply(atomicActionCnt: Int, initialPayload: ActivationResponse): SequenceAccounting = {
+ SequenceAccounting(atomicActionCnt, new AtomicReference(initialPayload), mutable.Buffer.empty)
+ }
+}
+
+protected[actions] case class FailedSequenceActivation(accounting: SequenceAccounting) extends Throwable
diff --git a/tests/src/test/scala/system/basic/WskSequenceTests.scala b/tests/src/test/scala/system/basic/WskSequenceTests.scala
index e0dfdef..06f5dea 100644
--- a/tests/src/test/scala/system/basic/WskSequenceTests.scala
+++ b/tests/src/test/scala/system/basic/WskSequenceTests.scala
@@ -60,7 +60,7 @@ class WskSequenceTests
behavior of "Wsk Sequence"
- it should "invoke a blocking sequence action and invoke the updated sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
+ it should "invoke a sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val name = "sequence"
val actions = Seq("split", "sort", "head", "cat")
@@ -109,7 +109,7 @@ class WskSequenceTests
// result of sequence should be identical to previous invocation above
val payload = Map("error" -> JsString("irrelevant error string"), "payload" -> args.mkString("\n").toJson)
val thirdrun = wsk.action.invoke(name, payload)
- withActivation(wsk.activation, thirdrun, totalWait = 2 *allowedActionDuration) {
+ withActivation(wsk.activation, thirdrun, totalWait = 2 * allowedActionDuration) {
activation =>
checkSequenceLogsAndAnnotations(activation, 2) // 2 activations in this sequence
val result = activation.response.result.get
@@ -118,14 +118,57 @@ class WskSequenceTests
}
}
+ it should "invoke a sequence with an enclosing sequence action" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ val inner_name = "inner_sequence"
+ val outer_name = "outer_sequence"
+ val inner_actions = Seq("sort", "head")
+ val actions = Seq("split") ++ inner_actions ++ Seq("cat")
+ // create atomic actions
+ for (actionName <- actions) {
+ val file = TestUtils.getTestActionFilename(s"$actionName.js")
+ assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
+ action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
+ }
+ }
+
+ // create inner sequence
+ assetHelper.withCleaner(wsk.action, inner_name) {
+ val inner_sequence = inner_actions.mkString(",")
+ (action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
+ }
+
+ // create outer sequence
+ assetHelper.withCleaner(wsk.action, outer_name) {
+ val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
+ (action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
+ }
+
+ val now = "it is now " + new Date()
+ val args = Array("what time is it?", now)
+ val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
+ withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
+ activation =>
+ checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
+ activation.cause shouldBe None // topmost sequence
+ val result = activation.response.result.get
+ result.fields.get("payload") shouldBe defined
+ result.fields.get("length") should not be defined
+ result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
+ }
+ }
+
/**
* s -> echo, x, echo
* x -> echo
*
* update x -> <limit-1> echo -- should work
* run s -> should stop after <limit> echo
+ *
+ * This confirms that a dynamic check on the sequence length holds within the system limit.
+ * This is different from creating a long sequence up front which will report a length error at create time.
*/
- it should "create a sequence, run it, update one of the atomic actions to a sequence and stop executing the outer sequence when limit reached" in withAssetCleaner(wskprops) {
+ it should "replace atomic component in a sequence that is too long and report invoke error" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val xName = "xSequence"
val sName = "sSequence"
@@ -176,52 +219,11 @@ class WskSequenceTests
withActivation(wsk.activation, getInnerSeq, totalWait = allowedActionDuration) {
innerSeqActivation =>
innerSeqActivation.logs.get.size shouldBe (limit - 1)
- innerSeqActivation.cause shouldBe defined
- innerSeqActivation.cause.get shouldBe (activation.activationId)
+ innerSeqActivation.cause shouldBe Some(activation.activationId)
}
}
}
- it should "invoke a blocking sequence action with an enclosing sequence action" in withAssetCleaner(wskprops) {
- (wp, assetHelper) =>
- val inner_name = "inner_sequence"
- val outer_name = "outer_sequence"
- val inner_actions = Seq("sort", "head")
- val actions = Seq("split") ++ inner_actions ++ Seq("cat")
- // create atomic actions
- for (actionName <- actions) {
- val file = TestUtils.getTestActionFilename(s"$actionName.js")
- assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
- action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
- }
- }
-
- // create inner sequence
- assetHelper.withCleaner(wsk.action, inner_name) {
- val inner_sequence = inner_actions.mkString(",")
- (action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
- }
-
- // create outer sequence
- assetHelper.withCleaner(wsk.action, outer_name) {
- val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
- (action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
- }
-
- val now = "it is now " + new Date()
- val args = Array("what time is it?", now)
- val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
- withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
- activation =>
- checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
- activation.cause shouldBe None // topmost sequence
- val result = activation.response.result.get
- result.fields.get("payload") shouldBe defined
- result.fields.get("length") should not be defined
- result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
- }
- }
-
it should "create and run a sequence in a package with parameters" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val sName = "sSequence"
@@ -294,6 +296,7 @@ class WskSequenceTests
// action params trump package params
checkLogsAtomicAction(0, run, new Regex(String.format(".*key0: value0.*key1a: value1a.*key1b: value2b.*key2a: value2a.*payload: %s", now)))
}
+
/**
* s -> apperror, echo
* only apperror should run
diff --git a/tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala b/tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala
new file mode 100644
index 0000000..d3e5918
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package whisk.core.controller.actions.test
+
+import java.time.Instant
+
+import scala.concurrent.duration.DurationInt
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import common.WskActorSystem
+import spray.json._
+import whisk.core.controller.actions.SequenceAccounting
+import whisk.core.entity._
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.size.SizeInt
+import whisk.http.Messages
+
+@RunWith(classOf[JUnitRunner])
+class SequenceAccountingTests extends FlatSpec with Matchers with WskActorSystem {
+
+ behavior of "sequence accounting"
+
+ val okRes1 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1))))
+ val okRes2 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(2))))
+ val failedRes = ActivationResponse.applicationError(JsNumber(3))
+
+ val okActivation = WhiskActivation(
+ namespace = EntityPath("ns"),
+ name = EntityName("a"),
+ Subject(),
+ activationId = ActivationId(),
+ start = Instant.now(),
+ end = Instant.now(),
+ response = okRes2,
+ annotations = Parameters("limits", ActionLimits(
+ TimeLimit(1.second),
+ MemoryLimit(128.MB),
+ LogLimit(1.MB)).toJson),
+ duration = Some(123))
+
+ val notOkActivation = WhiskActivation(
+ namespace = EntityPath("ns"),
+ name = EntityName("a"),
+ Subject(),
+ activationId = ActivationId(),
+ start = Instant.now(),
+ end = Instant.now(),
+ response = failedRes,
+ annotations = Parameters("limits", ActionLimits(
+ TimeLimit(11.second),
+ MemoryLimit(256.MB),
+ LogLimit(2.MB)).toJson),
+ duration = Some(234))
+
+ it should "create initial accounting object" in {
+ val s = SequenceAccounting(2, okRes1)
+ s.atomicActionCnt shouldBe 2
+ s.previousResponse.get shouldBe okRes1
+ s.logs shouldBe empty
+ s.duration shouldBe 0
+ s.maxMemory shouldBe None
+ s.shortcircuit shouldBe false
+ }
+
+ it should "resolve maybe to success and update accounting object" in {
+ val p = SequenceAccounting(2, okRes1)
+ val n1 = p.maybe(okActivation, 3, 5)
+ n1.atomicActionCnt shouldBe 3
+ n1.previousResponse.get shouldBe okRes2
+ n1.logs.length shouldBe 1
+ n1.logs(0) shouldBe okActivation.activationId
+ n1.duration shouldBe 123
+ n1.maxMemory shouldBe Some(128)
+ n1.shortcircuit shouldBe false
+ }
+
+ it should "resolve maybe and enable short circuit" in {
+ val p = SequenceAccounting(2, okRes1)
+ val n1 = p.maybe(okActivation, 3, 5)
+ val n2 = n1.maybe(notOkActivation, 4, 5)
+ n2.atomicActionCnt shouldBe 4
+ n2.previousResponse.get shouldBe failedRes
+ n2.logs.length shouldBe 2
+ n2.logs(0) shouldBe okActivation.activationId
+ n2.logs(1) shouldBe notOkActivation.activationId
+ n2.duration shouldBe (123 + 234)
+ n2.maxMemory shouldBe Some(256)
+ n2.shortcircuit shouldBe true
+ }
+
+ it should "record an activation that exceeds allowed limit but also short circuit" in {
+ val p = SequenceAccounting(2, okRes1)
+ val n = p.maybe(okActivation, 3, 2)
+ n.atomicActionCnt shouldBe 3
+ n.previousResponse.get shouldBe ActivationResponse.applicationError(Messages.sequenceIsTooLong)
+ n.logs.length shouldBe 1
+ n.logs(0) shouldBe okActivation.activationId
+ n.duration shouldBe 123
+ n.maxMemory shouldBe Some(128)
+ n.shortcircuit shouldBe true
+ }
+
+ it should "set failed response and short circuit on failure" in {
+ val p = SequenceAccounting(2, okRes1)
+ val n = p.maybe(okActivation, 3, 3)
+ val f = n.fail(failedRes, None)
+ f.atomicActionCnt shouldBe 3
+ f.previousResponse.get shouldBe failedRes
+ f.logs.length shouldBe 1
+ f.logs(0) shouldBe okActivation.activationId
+ f.duration shouldBe 123
+ f.maxMemory shouldBe Some(128)
+ f.shortcircuit shouldBe true
+ }
+
+ it should "resolve max memory" in {
+ SequenceAccounting.maxMemory(None, None) shouldBe None
+ SequenceAccounting.maxMemory(None, Some(1)) shouldBe Some(1)
+ SequenceAccounting.maxMemory(Some(1), None) shouldBe Some(1)
+ SequenceAccounting.maxMemory(Some(1), Some(2)) shouldBe Some(2)
+ SequenceAccounting.maxMemory(Some(2), Some(1)) shouldBe Some(2)
+ SequenceAccounting.maxMemory(Some(2), Some(2)) shouldBe Some(2)
+ }
+}
diff --git a/tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala b/tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala
new file mode 100644
index 0000000..b530d3a
--- /dev/null
+++ b/tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package whisk.utils.test
+
+import scala.concurrent.Await
+import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import common.WskActorSystem
+import whisk.utils.ExecutionContextFactory.FutureExtensions
+
+@RunWith(classOf[JUnitRunner])
+class ExecutionContextFactoryTests extends FlatSpec with Matchers with WskActorSystem {
+
+ behavior of "future extensions"
+
+ it should "take first to complete" in {
+ val f1 = Future.successful({}).withTimeout(500.millis, new Throwable("error"))
+ Await.result(f1, 1.second) shouldBe ({})
+
+ val failure = new Throwable("error")
+ val f2 = Future { Thread.sleep(1.second.toMillis) }.withTimeout(500.millis, failure)
+ a[Throwable] shouldBe thrownBy { Await.result(f2, 1.seconds) }
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].