You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/06/26 16:00:12 UTC
[incubator-openwhisk-cli] 32/36: Update sequence impl to tune
controller memory consumption (#2387)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-cli.git
commit cb152ae2616f12dc25e52b536ed0a1a8ac8373bb
Author: Nick Mitchell <st...@users.noreply.github.com>
AuthorDate: Tue Jun 20 12:16:36 2017 -0400
Update sequence impl to tune controller memory consumption (#2387)
- switch to scheduleOnce+weakrefs for timeout handling in SequenceActions
- switch SequenceAccounting to store array of ActivationId rather than array of String -- cheaper in memory
- use better (non-dragging) impl of withTimeout
- use a getAndSet(null) pattern to avoid two copies of responses being alive simultaneously
- refactor top level sequence scheduler to eliminate promises
---
.../test/scala/system/basic/WskSequenceTests.scala | 93 +++++++++++-----------
1 file changed, 48 insertions(+), 45 deletions(-)
diff --git a/tests/src/test/scala/system/basic/WskSequenceTests.scala b/tests/src/test/scala/system/basic/WskSequenceTests.scala
index e0dfdef..06f5dea 100644
--- a/tests/src/test/scala/system/basic/WskSequenceTests.scala
+++ b/tests/src/test/scala/system/basic/WskSequenceTests.scala
@@ -60,7 +60,7 @@ class WskSequenceTests
behavior of "Wsk Sequence"
- it should "invoke a blocking sequence action and invoke the updated sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
+ it should "invoke a sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val name = "sequence"
val actions = Seq("split", "sort", "head", "cat")
@@ -109,7 +109,7 @@ class WskSequenceTests
// result of sequence should be identical to previous invocation above
val payload = Map("error" -> JsString("irrelevant error string"), "payload" -> args.mkString("\n").toJson)
val thirdrun = wsk.action.invoke(name, payload)
- withActivation(wsk.activation, thirdrun, totalWait = 2 *allowedActionDuration) {
+ withActivation(wsk.activation, thirdrun, totalWait = 2 * allowedActionDuration) {
activation =>
checkSequenceLogsAndAnnotations(activation, 2) // 2 activations in this sequence
val result = activation.response.result.get
@@ -118,14 +118,57 @@ class WskSequenceTests
}
}
+ it should "invoke a sequence with an enclosing sequence action" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ val inner_name = "inner_sequence"
+ val outer_name = "outer_sequence"
+ val inner_actions = Seq("sort", "head")
+ val actions = Seq("split") ++ inner_actions ++ Seq("cat")
+ // create atomic actions
+ for (actionName <- actions) {
+ val file = TestUtils.getTestActionFilename(s"$actionName.js")
+ assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
+ action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
+ }
+ }
+
+ // create inner sequence
+ assetHelper.withCleaner(wsk.action, inner_name) {
+ val inner_sequence = inner_actions.mkString(",")
+ (action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
+ }
+
+ // create outer sequence
+ assetHelper.withCleaner(wsk.action, outer_name) {
+ val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
+ (action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
+ }
+
+ val now = "it is now " + new Date()
+ val args = Array("what time is it?", now)
+ val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
+ withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
+ activation =>
+ checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
+ activation.cause shouldBe None // topmost sequence
+ val result = activation.response.result.get
+ result.fields.get("payload") shouldBe defined
+ result.fields.get("length") should not be defined
+ result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
+ }
+ }
+
/**
* s -> echo, x, echo
* x -> echo
*
* update x -> <limit-1> echo -- should work
* run s -> should stop after <limit> echo
+ *
+ * This confirms that a dynamic check on the sequence length holds within the system limit.
+ * This is different from creating a long sequence up front which will report a length error at create time.
*/
- it should "create a sequence, run it, update one of the atomic actions to a sequence and stop executing the outer sequence when limit reached" in withAssetCleaner(wskprops) {
+ it should "replace atomic component in a sequence that is too long and report invoke error" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val xName = "xSequence"
val sName = "sSequence"
@@ -176,52 +219,11 @@ class WskSequenceTests
withActivation(wsk.activation, getInnerSeq, totalWait = allowedActionDuration) {
innerSeqActivation =>
innerSeqActivation.logs.get.size shouldBe (limit - 1)
- innerSeqActivation.cause shouldBe defined
- innerSeqActivation.cause.get shouldBe (activation.activationId)
+ innerSeqActivation.cause shouldBe Some(activation.activationId)
}
}
}
- it should "invoke a blocking sequence action with an enclosing sequence action" in withAssetCleaner(wskprops) {
- (wp, assetHelper) =>
- val inner_name = "inner_sequence"
- val outer_name = "outer_sequence"
- val inner_actions = Seq("sort", "head")
- val actions = Seq("split") ++ inner_actions ++ Seq("cat")
- // create atomic actions
- for (actionName <- actions) {
- val file = TestUtils.getTestActionFilename(s"$actionName.js")
- assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
- action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
- }
- }
-
- // create inner sequence
- assetHelper.withCleaner(wsk.action, inner_name) {
- val inner_sequence = inner_actions.mkString(",")
- (action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
- }
-
- // create outer sequence
- assetHelper.withCleaner(wsk.action, outer_name) {
- val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
- (action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
- }
-
- val now = "it is now " + new Date()
- val args = Array("what time is it?", now)
- val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
- withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
- activation =>
- checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
- activation.cause shouldBe None // topmost sequence
- val result = activation.response.result.get
- result.fields.get("payload") shouldBe defined
- result.fields.get("length") should not be defined
- result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
- }
- }
-
it should "create and run a sequence in a package with parameters" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val sName = "sSequence"
@@ -294,6 +296,7 @@ class WskSequenceTests
// action params trump package params
checkLogsAtomicAction(0, run, new Regex(String.format(".*key0: value0.*key1a: value1a.*key1b: value2b.*key2a: value2a.*payload: %s", now)))
}
+
/**
* s -> apperror, echo
* only apperror should run
--
To stop receiving notification emails like this one, please contact
"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>.