You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by gi...@git.apache.org on 2017/06/20 05:58:44 UTC

[GitHub] markusthoemmes commented on a change in pull request #2387: Update sequence impl to tune controller memory consumption

markusthoemmes commented on a change in pull request #2387: Update sequence impl to tune controller memory consumption
URL: https://github.com/apache/incubator-openwhisk/pull/2387#discussion_r122886572
 
 

 ##########
 File path: core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
 ##########
 @@ -365,55 +289,195 @@ protected[actions] trait SequenceActions {
      *
      * The method distinguishes between invoking a sequence or an atomic action.
      * @param user the user executing the sequence
-     * @param action the action to be invoked
-     * @param payload the payload for the action
-     * @param cause the activation id of the first sequence containing this action
-     * @param atomicActionCount the number of activations
-     * @return future with the result of the invocation and the dynamic atomic action count so far
+     * @param futureAction the future which fetches the action to be invoked from the db
+     * @param accounting the state of the sequence activation, contains the dynamic activation count, logs and payload for the next action
+     * @param cause the activation id of the first sequence containing this activations
+     * @return a future which resolves with updated accounting for a sequence, including the last result, duration, and activation ids
      */
-    private def invokeSeqOneComponent(user: Identity, action: WhiskAction, payload: Option[JsObject], cause: Option[ActivationId], atomicActionCount: Int)(
-        implicit transid: TransactionId): Future[(Either[ActivationResponse, WhiskActivation], Int)] = {
-        // invoke the action by calling the right method depending on whether it's an atomic action or a sequence
-        // the tuple contains activationId, wskActivation, atomicActionCount (up till this point in execution)
-        val futureWhiskActivationTuple = action.exec match {
-            case SequenceExec(components) =>
-                // invoke a sequence
-                logging.info(this, s"sequence invoking an enclosed sequence $action")
-                // call invokeSequence to invoke the inner sequence
-                // true for blocking; false for topmost
-                invokeSequence(user, action, payload, blocking = true, topmost = false, components, cause, atomicActionCount) map {
-                    case (activationId, wskActivation, seqAtomicActionCnt) =>
-                        (activationId, wskActivation, seqAtomicActionCnt + atomicActionCount)
-                }
-            case _ =>
-                // this is an invoke for an atomic action
-                logging.info(this, s"sequence invoking an enclosed atomic action $action")
-                val timeout = action.limits.timeout.duration + blockingInvokeGrace
-                invokeSingleAction(user, action, payload, timeout, blocking = true, cause) map {
-                    case (activationId, wskActivation) => (activationId, wskActivation, atomicActionCount + 1)
-                }
-        }
+    private def invokeNextAction(
+        user: Identity,
+        futureAction: Future[WhiskAction],
+        accounting: SequenceAccounting,
+        cause: Option[ActivationId])(
+            implicit transid: TransactionId): Future[SequenceAccounting] = {
+        futureAction.flatMap { action =>
+            // the previous response becomes input for the next action in the sequence;
+            // the accounting no longer needs to hold a reference to it once the action is
+            // invoked, so previousResponse.getAndSet(null) drops the reference at this point
+            // which prevents dragging the previous response for the lifetime of the next activation
+            val inputPayload = accounting.previousResponse.getAndSet(null).result.map(_.asJsObject)
+
+            // invoke the action by calling the right method depending on whether it's an atomic action or a sequence
+            val futureWhiskActivationTuple = action.exec match {
+                case SequenceExec(components) =>
+                    logging.info(this, s"sequence invoking an enclosed sequence $action")
+                    // call invokeSequence to invoke the inner sequence
+                    invokeSequence(user, action, inputPayload, blocking = true, topmost = false, components, cause, accounting.atomicActionCnt)
+                case _ =>
+                    // this is an invoke for an atomic action
+                    logging.info(this, s"sequence invoking an enclosed atomic action $action")
+                    val timeout = action.limits.timeout.duration + blockingInvokeGrace
+                    invokeSingleAction(user, action, inputPayload, timeout, blocking = true, cause) map {
+                        case (activationId, wskActivation) => (activationId, wskActivation, accounting.atomicActionCnt + 1)
+                    }
+            }
 
-        futureWhiskActivationTuple map {
-            case (activationId, wskActivation, atomicActionCountSoFar) =>
-                // the activation is None only if the activation could not be retrieved either from active ack or from db
-                wskActivation match {
-                    case Some(activation) => (Right(activation), atomicActionCountSoFar)
-                    case None => {
-                        val activationResponse = ActivationResponse.whiskError(s"$sequenceRetrieveActivationTimeout Activation id '$activationId'.")
-                        (Left(activationResponse), atomicActionCountSoFar) // dynamic count doesn't matter, sequence will be interrupted
+            futureWhiskActivationTuple.map {
+                case (activationId, wskActivation, atomicActionCountSoFar) =>
+                    wskActivation.map {
+                        activation => accounting.maybe(activation, atomicActionCountSoFar, actionSequenceLimit)
+                    }.getOrElse {
+                        // the wskActivation is None only if the result could not be retrieved in time either from active ack or from db
+                        logging.error(this, s"component activation timedout for $activationId")
+                        val activationResponse = ActivationResponse.whiskError(sequenceRetrieveActivationTimeout(activationId))
+                        accounting.fail(activationResponse, Some(activationId))
                     }
-                }
+            }.recover {
+                // check any failure here and generate an activation response to encapsulate
+                // the failure mode; consider this failure a whisk error
+                case t: Throwable =>
+                    logging.error(this, s"component activation failed: $t")
+                    accounting.fail(ActivationResponse.whiskError(sequenceActivationFailure), None)
+            }
         }
     }
 
     /** Replaces default namespaces in a vector of components from a sequence with appropriate namespace. */
     private def resolveDefaultNamespace(components: Vector[FullyQualifiedEntityName], user: Identity): Vector[FullyQualifiedEntityName] = {
-        // if components are part of the default namespace, they contain `_`; replace it!
-        val resolvedComponents = components map { c => FullyQualifiedEntityName(c.path.resolveNamespace(user.namespace), c.name) }
-        resolvedComponents
+        // resolve any namespaces that may appears as "_" (the default namespace)
+        components.map(c => FullyQualifiedEntityName(c.path.resolveNamespace(user.namespace), c.name))
     }
 
     /** Max atomic action count allowed for sequences */
     private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
 }
+
+/**
+ * Cumulative accounting of what happened during the execution of a sequence.
+ *
+ * @param atomicActionCnt the current count of non-sequence (c.f. atomic) actions already invoked
+ * @param previousResponse a reference to the previous activation result which will be nulled out
+ *        when no longer needed (see previousResponse.getAndSet(null) below)
+ * @param logs a mutable buffer that is appended with new activation ids as the sequence unfolds
+ * @param duration the "user" time so far executing the sequence (sum of durations for
+ *        all actions invoked so far which is different from the total time spent executing the sequence)
+ * @param maxMemory the maximum memory annotation observed so far for the
+ *        components (needed to annotate the sequence with GB-s)
+ * @param shortcircuit when true, stops the execution of the next component in the sequence
+ */
+protected[actions] case class SequenceAccounting(
+    atomicActionCnt: Int,
+    previousResponse: AtomicReference[ActivationResponse],
+    logs: mutable.ArrayBuffer[ActivationId],
+    duration: Long = 0,
+    maxMemory: Option[Int] = None,
+    shortcircuit: Boolean = false) {
+
+    /** @return the ActivationLogs data structure for this sequence invocation */
+    def finalLogs = ActivationLogs(logs.map(id => id.asString).toVector)
+
+    /** The previous activation was successful. */
+    private def success(activation: WhiskActivation, newCnt: Int, shortcircuit: Boolean = false) = {
+        previousResponse.set(null)
+        SequenceAccounting(
+            prev = this,
+            newCnt = newCnt,
+            shortcircuit = shortcircuit,
+            incrDuration = activation.duration,
+            newResponse = activation.response,
+            newActivationId = activation.activationId,
+            newMemoryLimit = activation.annotations.get("limits") map {
+                limitsAnnotation => // we have a limits annotation
+                    limitsAnnotation.asJsObject.getFields("memory") match {
+                        case Seq(JsNumber(memory)) => Some(memory.toInt) // we have a numerical "memory" field in the "limits" annotation
+                    }
+            } getOrElse { None })
+    }
+
+    /** The previous activation failed (this is used when there is no activation record or an internal error. */
+    def fail(failureResponse: ActivationResponse, activationId: Option[ActivationId]) = {
+        require(!failureResponse.isSuccess)
+        activationId.foreach(logs += _)
+        copy(previousResponse = new AtomicReference(failureResponse), shortcircuit = true)
+    }
+
+    /** Determines whether the previous activation succeeded or failed. */
+    def maybe(activation: WhiskActivation, newCnt: Int, maxSequenceCnt: Int) = {
+        // check conditions on payload that may lead to interrupting the execution of the sequence
+        //     short-circuit the execution of the sequence iff the payload contains an error field
+        //     and is the result of an action return, not the initial payload
+        val outputPayload = activation.response.result.map(_.asJsObject)
+        val payloadContent = outputPayload getOrElse JsObject.empty
+        val errorField = payloadContent.fields.get(ActivationResponse.ERROR_FIELD)
+        val withinSeqLimit = newCnt <= maxSequenceCnt
+
+        if (withinSeqLimit && errorField.isEmpty) {
+            // all good with this action invocation
+            success(activation, newCnt)
+        } else {
+            val nextActivation = if (!withinSeqLimit) {
+                // no error in the activation but the dynamic count of actions exceeds the threshold
+                // this is here as defensive code; the activation should not occur if its takes the
+                // count above its limit
+                val newResponse = ActivationResponse.applicationError(sequenceIsTooLong)
+                activation.copy(response = newResponse)
+            } else {
+                assert(errorField.isDefined)
+                activation
+            }
+
+            // there is an error field in the activation response. here, we treat this like success,
+            // in the sense of tallying up the accounting fields, but terminate the sequence early
+            success(nextActivation, newCnt, shortcircuit = true)
+        }
+    }
+}
+
+/**
+ *  Three constructors for SequenceAccounting:
+ *     - one for successful invocation of an action in the sequence,
+ *     - one for failed invocation, and
+ *     - one to initialize things
+ */
+protected[actions] object SequenceAccounting {
+
+    def maxMemory(prevMemoryLimit: Option[Int], newMemoryLimit: Option[Int]): Option[Int] = {
+        prevMemoryLimit.map { prevMax =>
+            newMemoryLimit
+                .map(currentMax => Some(Math.max(prevMax, currentMax)))
+                .getOrElse(prevMemoryLimit)
+        }.getOrElse(newMemoryLimit)
+    }
 
 Review comment:
   Is this the same as:
   
   ```
   (prevMemoryLimit ++ newMemoryLimit).reduceOption(Math.max)
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services