You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2018/08/01 22:23:16 UTC
[incubator-openwhisk-runtime-nodejs] branch master updated: enable
concurrent processing via env variable (#41)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-runtime-nodejs.git
The following commit(s) were added to refs/heads/master by this push:
new b4b68b5 enable concurrent processing via env variable (#41)
b4b68b5 is described below
commit b4b68b5280dcd42698efdf3d5f77a943c33121ba
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Wed Aug 1 15:23:14 2018 -0700
enable concurrent processing via env variable (#41)
---
core/nodejs6Action/CHANGELOG.md | 5 ++
core/nodejs8Action/CHANGELOG.md | 5 ++
core/nodejsActionBase/app.js | 3 +-
core/nodejsActionBase/src/service.js | 9 +-
.../NodeJs6ActionContainerTests.scala | 2 +-
.../NodeJs8ActionContainerTests.scala | 2 +-
.../NodeJsActionContainerTests.scala | 1 +
.../actionContainers/NodeJsConcurrentTests.scala | 97 ++++++++++++++++++++++
.../NodeJsNonConcurrentTests.scala | 77 +++++++++++++++++
...nerTests.scala => Nodejs6ConcurrentTests.scala} | 4 +-
...nerTests.scala => Nodejs8ConcurrentTests.scala} | 6 +-
11 files changed, 199 insertions(+), 12 deletions(-)
diff --git a/core/nodejs6Action/CHANGELOG.md b/core/nodejs6Action/CHANGELOG.md
index b0e7c04..369a5b9 100644
--- a/core/nodejs6Action/CHANGELOG.md
+++ b/core/nodejs6Action/CHANGELOG.md
@@ -19,6 +19,11 @@
# NodeJS 6 OpenWhisk Runtime Container
+## 1.11.0
+Change: Update runtime to work in concurrent mode
+
+- Update runtime to work in concurrent mode [#41](https://github.com/apache/incubator-openwhisk-runtime-nodejs/pull/41/files)
+
## 1.10.0
Change: Update npm openwhisk package from `3.15.0` to `3.16.0`
diff --git a/core/nodejs8Action/CHANGELOG.md b/core/nodejs8Action/CHANGELOG.md
index 6e621d5..9cb6d84 100644
--- a/core/nodejs8Action/CHANGELOG.md
+++ b/core/nodejs8Action/CHANGELOG.md
@@ -19,6 +19,11 @@
# NodeJS 8 OpenWhisk Runtime Container
+## 1.8.0
+Change: Update runtime to work in concurrent mode
+
+- Update runtime to work in concurrent mode [#41](https://github.com/apache/incubator-openwhisk-runtime-nodejs/pull/41/files)
+
## 1.7.0
Change: Update openwhisk npm package from `3.15.0` to `3.16.0`
diff --git a/core/nodejsActionBase/app.js b/core/nodejsActionBase/app.js
index 2da2b89..7dbc433 100644
--- a/core/nodejsActionBase/app.js
+++ b/core/nodejsActionBase/app.js
@@ -17,7 +17,8 @@
var config = {
'port': 8080,
- 'apiHost': process.env.__OW_API_HOST
+ 'apiHost': process.env.__OW_API_HOST,
+ 'allowConcurrent': process.env.__OW_ALLOW_CONCURRENT
};
var bodyParser = require('body-parser');
diff --git a/core/nodejsActionBase/src/service.js b/core/nodejsActionBase/src/service.js
index e1318ee..daf365f 100644
--- a/core/nodejsActionBase/src/service.js
+++ b/core/nodejsActionBase/src/service.js
@@ -27,6 +27,7 @@ function NodeActionService(config) {
};
var status = Status.ready;
+ var ignoreRunStatus = config.allowConcurrent === undefined ? false : config.allowConcurrent.toLowerCase() === "true";
var server = undefined;
var userCodeRunner = undefined;
@@ -112,10 +113,14 @@ function NodeActionService(config) {
*/
this.runCode = function runCode(req) {
if (status === Status.ready) {
- setStatus(Status.running);
+ if (!ignoreRunStatus) {
+ setStatus(Status.running);
+ }
return doRun(req).then(function (result) {
- setStatus(Status.ready);
+ if (!ignoreRunStatus) {
+ setStatus(Status.ready);
+ }
if (typeof result !== "object") {
return errorMessage(502, "The action did not return a dictionary.");
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
index 78f064e..77fea66 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
@@ -21,7 +21,7 @@ import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
-class NodeJs6ActionContainerTests extends NodeJsActionContainerTests {
+class NodeJs6ActionContainerTests extends NodeJsNonConcurrentTests {
override lazy val nodejsContainerImageName = "nodejs6action"
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJs8ActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJs8ActionContainerTests.scala
index ec57018..f4092c6 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJs8ActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJs8ActionContainerTests.scala
@@ -22,7 +22,7 @@ import org.scalatest.junit.JUnitRunner
import spray.json.JsObject
@RunWith(classOf[JUnitRunner])
-class NodeJs8ActionContainerTests extends NodeJsActionContainerTests {
+class NodeJs8ActionContainerTests extends NodeJsNonConcurrentTests {
override lazy val nodejsContainerImageName = "action-nodejs-v8"
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJsActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJsActionContainerTests.scala
index a5a136f..3f9694d 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJsActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJsActionContainerTests.scala
@@ -585,4 +585,5 @@ abstract class NodeJsActionContainerTests extends BasicActionRunnerTests with Ws
e shouldBe empty
})
}
+
}
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala
new file mode 100644
index 0000000..efc294e
--- /dev/null
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package runtime.actionContainers
+
+import actionContainers.ActionContainer
+import spray.json.JsObject
+import spray.json.JsString
+
+abstract class NodeJsConcurrentTests extends NodeJsActionContainerTests {
+
+ override lazy val nodejsContainerImageName = "action-nodejs-v8"
+
+ override def withNodeJsContainer(code: ActionContainer => Unit) =
+ withActionContainer(Map("__OW_ALLOW_CONCURRENT" -> "true"))(code)
+
+ it should "allow running activations concurrently" in {
+
+ val requestCount = actorSystem.settings.config.getInt("akka.http.host-connection-pool.max-connections")
+ require(requestCount > 100, "test requires that max-connections be set > 100")
+ println(s"running $requestCount requests")
+
+ val (out, err) = withNodeJsContainer { c =>
+ //this action will create a log entry, and only complete once all activations have arrived and emitted logg
+ //this forces all of the in-action logs to appear in a single portion of the stdout, and all of the sentinels to appear following that
+
+ val code =
+ s"""
+ |
+ | global.count = 0;
+ | let requestCount = $requestCount;
+ | let interval = 1000;
+ | function main(args) {
+ | global.count++;
+ | console.log("interleave me");
+ | return new Promise(function(resolve, reject) {
+ | setTimeout(function() {
+ | checkRequests(args, resolve, reject);
+ | }, interval);
+ | });
+ | }
+ | function checkRequests(args, resolve, reject, elapsed) {
+ | let elapsedTime = elapsed||0;
+ | if (global.count == requestCount) {
+ | resolve({ args: args});
+ | } else {
+ | if (elapsedTime > 30000) {
+ | reject("did not receive "+requestCount+" activations within 30s");
+ | } else {
+ | setTimeout(function() {
+ | checkRequests(args, resolve, reject, elapsedTime+interval);
+ | }, interval);
+ | }
+ | }
+ | }
+ """.stripMargin
+
+ c.init(initPayload(code))._1 should be(200)
+
+ val payloads = (1 to requestCount).map({ i =>
+ JsObject(s"arg$i" -> JsString(s"value$i"))
+ })
+
+ val responses = c.runMultiple(payloads.map {
+ runPayload(_)
+ })
+ payloads.foreach { a =>
+ responses should contain(200, Some(JsObject("args" -> a)))
+ }
+ }
+
+ checkStreams(out, err, {
+ case (o, e) =>
+ o.replaceAll("\n", "") shouldBe "interleave me" * requestCount
+ e shouldBe empty
+ }, requestCount)
+
+ withClue("expected grouping of stdout sentinels") {
+ out should include((ActionContainer.sentinel + "\n") * requestCount)
+ }
+ }
+
+}
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJsNonConcurrentTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJsNonConcurrentTests.scala
new file mode 100644
index 0000000..85a4645
--- /dev/null
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJsNonConcurrentTests.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package runtime.actionContainers
+
+import actionContainers.ActionContainer
+import spray.json.JsObject
+import spray.json.JsString
+
+abstract class NodeJsNonConcurrentTests extends NodeJsActionContainerTests {
+
+ override def withNodeJsContainer(code: ActionContainer => Unit) =
+ withActionContainer()(code)
+
+ it should "NOT allow running activations concurrently (without proper env setup)" in {
+ val (out, err) = withNodeJsContainer { c =>
+ //this action will create a log entry, and only complete after 1s, to guarantee previous is still running
+ val code = """
+ | function main(args) {
+ | console.log("no concurrency");
+ | return new Promise(function(resolve, reject) {
+ | setTimeout(function() {
+ | resolve({ args: args});
+ | }, 1000);
+ | });
+ | }
+ """.stripMargin
+
+ c.init(initPayload(code))._1 should be(200)
+ val requestCount = 2
+
+ val payloads = (1 to requestCount).map({ i =>
+ JsObject(s"arg$i" -> JsString(s"value$i"))
+ })
+
+ //run payloads concurrently
+ val responses = c.runMultiple(payloads.map {
+ runPayload(_)
+ })
+
+ //one will fail, one will succeed - currently there is no way to guarantee which one succeeds, since both arrive "at the same time"
+ responses.count {
+ case (200, Some(JsObject(a))) if a.get("args").isDefined => true
+ case _ => false
+ } shouldBe 1
+
+ responses.count {
+ case (403, Some(JsObject(e)))
+ if e.getOrElse("error", JsString("")) == JsString("System not ready, status is running.") =>
+ true
+ case _ => false
+ } shouldBe 1
+
+ }
+
+ checkStreams(out, err, {
+ case (o, e) =>
+ o.replaceAll("\n", "") shouldBe "no concurrency"
+ e shouldBe "Internal system error: System not ready, status is running."
+ }, 1)
+ }
+
+}
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/Nodejs6ConcurrentTests.scala
similarity index 93%
copy from tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
copy to tests/src/test/scala/runtime/actionContainers/Nodejs6ConcurrentTests.scala
index 78f064e..0a218f6 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/Nodejs6ConcurrentTests.scala
@@ -21,8 +21,6 @@ import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
-class NodeJs6ActionContainerTests extends NodeJsActionContainerTests {
-
+class Nodejs6ConcurrentTests extends NodeJsConcurrentTests {
override lazy val nodejsContainerImageName = "nodejs6action"
-
}
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/Nodejs8ConcurrentTests.scala
similarity index 87%
copy from tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
copy to tests/src/test/scala/runtime/actionContainers/Nodejs8ConcurrentTests.scala
index 78f064e..2f1ffaa 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/Nodejs8ConcurrentTests.scala
@@ -21,8 +21,6 @@ import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
-class NodeJs6ActionContainerTests extends NodeJsActionContainerTests {
-
- override lazy val nodejsContainerImageName = "nodejs6action"
-
+class Nodejs8ConcurrentTests extends NodeJsConcurrentTests {
+ override lazy val nodejsContainerImageName = "action-nodejs-v8"
}