You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2018/08/01 22:23:16 UTC

[incubator-openwhisk-runtime-nodejs] branch master updated: enable concurrent processing via env variable (#41)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-runtime-nodejs.git


The following commit(s) were added to refs/heads/master by this push:
     new b4b68b5  enable concurrent processing via env variable (#41)
b4b68b5 is described below

commit b4b68b5280dcd42698efdf3d5f77a943c33121ba
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Wed Aug 1 15:23:14 2018 -0700

    enable concurrent processing via env variable (#41)
---
 core/nodejs6Action/CHANGELOG.md                    |  5 ++
 core/nodejs8Action/CHANGELOG.md                    |  5 ++
 core/nodejsActionBase/app.js                       |  3 +-
 core/nodejsActionBase/src/service.js               |  9 +-
 .../NodeJs6ActionContainerTests.scala              |  2 +-
 .../NodeJs8ActionContainerTests.scala              |  2 +-
 .../NodeJsActionContainerTests.scala               |  1 +
 .../actionContainers/NodeJsConcurrentTests.scala   | 97 ++++++++++++++++++++++
 .../NodeJsNonConcurrentTests.scala                 | 77 +++++++++++++++++
 ...nerTests.scala => Nodejs6ConcurrentTests.scala} |  4 +-
 ...nerTests.scala => Nodejs8ConcurrentTests.scala} |  6 +-
 11 files changed, 199 insertions(+), 12 deletions(-)

diff --git a/core/nodejs6Action/CHANGELOG.md b/core/nodejs6Action/CHANGELOG.md
index b0e7c04..369a5b9 100644
--- a/core/nodejs6Action/CHANGELOG.md
+++ b/core/nodejs6Action/CHANGELOG.md
@@ -19,6 +19,11 @@
 
 # NodeJS 6 OpenWhisk Runtime Container
 
+## 1.11.0
+Change: Update runtime to work in concurrent mode
+
+- Update runtime to work in concurrent mode [#41](https://github.com/apache/incubator-openwhisk-runtime-nodejs/pull/41/files)
+
 ## 1.10.0
 Change: Update npm openwhisk package from `3.15.0` to `3.16.0`
 
diff --git a/core/nodejs8Action/CHANGELOG.md b/core/nodejs8Action/CHANGELOG.md
index 6e621d5..9cb6d84 100644
--- a/core/nodejs8Action/CHANGELOG.md
+++ b/core/nodejs8Action/CHANGELOG.md
@@ -19,6 +19,11 @@
 
 # NodeJS 8 OpenWhisk Runtime Container
 
+## 1.8.0
+Change: Update runtime to work in concurrent mode
+
+- Update runtime to work in concurrent mode [#41](https://github.com/apache/incubator-openwhisk-runtime-nodejs/pull/41/files)
+
 ## 1.7.0
 Change: Update openwhisk npm package from `3.15.0` to `3.16.0`
 
diff --git a/core/nodejsActionBase/app.js b/core/nodejsActionBase/app.js
index 2da2b89..7dbc433 100644
--- a/core/nodejsActionBase/app.js
+++ b/core/nodejsActionBase/app.js
@@ -17,7 +17,8 @@
 
 var config = {
         'port': 8080,
-        'apiHost': process.env.__OW_API_HOST
+        'apiHost': process.env.__OW_API_HOST,
+        'allowConcurrent': process.env.__OW_ALLOW_CONCURRENT
 };
 
 var bodyParser = require('body-parser');
diff --git a/core/nodejsActionBase/src/service.js b/core/nodejsActionBase/src/service.js
index e1318ee..daf365f 100644
--- a/core/nodejsActionBase/src/service.js
+++ b/core/nodejsActionBase/src/service.js
@@ -27,6 +27,7 @@ function NodeActionService(config) {
     };
 
     var status = Status.ready;
+    var ignoreRunStatus = config.allowConcurrent === undefined ? false : config.allowConcurrent.toLowerCase() === "true";
     var server = undefined;
     var userCodeRunner = undefined;
 
@@ -112,10 +113,14 @@ function NodeActionService(config) {
      */
     this.runCode = function runCode(req) {
         if (status === Status.ready) {
-            setStatus(Status.running);
+            if (!ignoreRunStatus) {
+                setStatus(Status.running);
+            }
 
             return doRun(req).then(function (result) {
-                setStatus(Status.ready);
+                if (!ignoreRunStatus) {
+                    setStatus(Status.ready);
+                }
 
                 if (typeof result !== "object") {
                     return errorMessage(502, "The action did not return a dictionary.");
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
index 78f064e..77fea66 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
@@ -21,7 +21,7 @@ import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 
 @RunWith(classOf[JUnitRunner])
-class NodeJs6ActionContainerTests extends NodeJsActionContainerTests {
+class NodeJs6ActionContainerTests extends NodeJsNonConcurrentTests {
 
   override lazy val nodejsContainerImageName = "nodejs6action"
 
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJs8ActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJs8ActionContainerTests.scala
index ec57018..f4092c6 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJs8ActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJs8ActionContainerTests.scala
@@ -22,7 +22,7 @@ import org.scalatest.junit.JUnitRunner
 import spray.json.JsObject
 
 @RunWith(classOf[JUnitRunner])
-class NodeJs8ActionContainerTests extends NodeJsActionContainerTests {
+class NodeJs8ActionContainerTests extends NodeJsNonConcurrentTests {
 
   override lazy val nodejsContainerImageName = "action-nodejs-v8"
 
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJsActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJsActionContainerTests.scala
index a5a136f..3f9694d 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJsActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJsActionContainerTests.scala
@@ -585,4 +585,5 @@ abstract class NodeJsActionContainerTests extends BasicActionRunnerTests with Ws
         e shouldBe empty
     })
   }
+
 }
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala
new file mode 100644
index 0000000..efc294e
--- /dev/null
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package runtime.actionContainers
+
+import actionContainers.ActionContainer
+import spray.json.JsObject
+import spray.json.JsString
+
+abstract class NodeJsConcurrentTests extends NodeJsActionContainerTests {
+
+  override lazy val nodejsContainerImageName = "action-nodejs-v8"
+
+  override def withNodeJsContainer(code: ActionContainer => Unit) =
+    withActionContainer(Map("__OW_ALLOW_CONCURRENT" -> "true"))(code)
+
+  it should "allow running activations concurrently" in {
+
+    val requestCount = actorSystem.settings.config.getInt("akka.http.host-connection-pool.max-connections")
+    require(requestCount > 100, "test requires that max-connections be set > 100")
+    println(s"running $requestCount requests")
+
+    val (out, err) = withNodeJsContainer { c =>
+      //this action will create a log entry, and only complete once all activations have arrived and emitted logg
+      //this forces all of the in-action logs to appear in a single portion of the stdout, and all of the sentinels to appear following that
+
+      val code =
+        s"""
+           |
+           | global.count = 0;
+           | let requestCount = $requestCount;
+           | let interval = 1000;
+           | function main(args) {
+           |     global.count++;
+           |     console.log("interleave me");
+           |     return new Promise(function(resolve, reject) {
+           |         setTimeout(function() {
+           |             checkRequests(args, resolve, reject);
+           |         }, interval);
+           |    });
+           | }
+           | function checkRequests(args, resolve, reject, elapsed) {
+           |     let elapsedTime = elapsed||0;
+           |     if (global.count == requestCount) {
+           |         resolve({ args: args});
+           |     } else {
+           |         if (elapsedTime > 30000) {
+           |             reject("did not receive "+requestCount+" activations within 30s");
+           |         } else {
+           |             setTimeout(function() {
+           |                 checkRequests(args, resolve, reject, elapsedTime+interval);
+           |             }, interval);
+           |         }
+           |     }
+           | }
+        """.stripMargin
+
+      c.init(initPayload(code))._1 should be(200)
+
+      val payloads = (1 to requestCount).map({ i =>
+        JsObject(s"arg$i" -> JsString(s"value$i"))
+      })
+
+      val responses = c.runMultiple(payloads.map {
+        runPayload(_)
+      })
+      payloads.foreach { a =>
+        responses should contain(200, Some(JsObject("args" -> a)))
+      }
+    }
+
+    checkStreams(out, err, {
+      case (o, e) =>
+        o.replaceAll("\n", "") shouldBe "interleave me" * requestCount
+        e shouldBe empty
+    }, requestCount)
+
+    withClue("expected grouping of stdout sentinels") {
+      out should include((ActionContainer.sentinel + "\n") * requestCount)
+    }
+  }
+
+}
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJsNonConcurrentTests.scala b/tests/src/test/scala/runtime/actionContainers/NodeJsNonConcurrentTests.scala
new file mode 100644
index 0000000..85a4645
--- /dev/null
+++ b/tests/src/test/scala/runtime/actionContainers/NodeJsNonConcurrentTests.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package runtime.actionContainers
+
+import actionContainers.ActionContainer
+import spray.json.JsObject
+import spray.json.JsString
+
+abstract class NodeJsNonConcurrentTests extends NodeJsActionContainerTests {
+
+  override def withNodeJsContainer(code: ActionContainer => Unit) =
+    withActionContainer()(code)
+
+  it should "NOT allow running activations concurrently (without proper env setup)" in {
+    val (out, err) = withNodeJsContainer { c =>
+      //this action will create a log entry, and only complete after 1s, to guarantee previous is still running
+      val code = """
+                   | function main(args) {
+                   |     console.log("no concurrency");
+                   |     return new Promise(function(resolve, reject) {
+                   |         setTimeout(function() {
+                   |             resolve({ args: args});
+                   |         }, 1000);
+                   |     });
+                   | }
+                 """.stripMargin
+
+      c.init(initPayload(code))._1 should be(200)
+      val requestCount = 2
+
+      val payloads = (1 to requestCount).map({ i =>
+        JsObject(s"arg$i" -> JsString(s"value$i"))
+      })
+
+      //run payloads concurrently
+      val responses = c.runMultiple(payloads.map {
+        runPayload(_)
+      })
+
+      //one will fail, one will succeed - currently there is no way to guarantee which one succeeds, since both arrive "at the same time"
+      responses.count {
+        case (200, Some(JsObject(a))) if a.get("args").isDefined => true
+        case _                                                   => false
+      } shouldBe 1
+
+      responses.count {
+        case (403, Some(JsObject(e)))
+            if e.getOrElse("error", JsString("")) == JsString("System not ready, status is running.") =>
+          true
+        case _ => false
+      } shouldBe 1
+
+    }
+
+    checkStreams(out, err, {
+      case (o, e) =>
+        o.replaceAll("\n", "") shouldBe "no concurrency"
+        e shouldBe "Internal system error: System not ready, status is running."
+    }, 1)
+  }
+
+}
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/Nodejs6ConcurrentTests.scala
similarity index 93%
copy from tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
copy to tests/src/test/scala/runtime/actionContainers/Nodejs6ConcurrentTests.scala
index 78f064e..0a218f6 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/Nodejs6ConcurrentTests.scala
@@ -21,8 +21,6 @@ import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 
 @RunWith(classOf[JUnitRunner])
-class NodeJs6ActionContainerTests extends NodeJsActionContainerTests {
-
+class Nodejs6ConcurrentTests extends NodeJsConcurrentTests {
   override lazy val nodejsContainerImageName = "nodejs6action"
-
 }
diff --git a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/Nodejs8ConcurrentTests.scala
similarity index 87%
copy from tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
copy to tests/src/test/scala/runtime/actionContainers/Nodejs8ConcurrentTests.scala
index 78f064e..2f1ffaa 100644
--- a/tests/src/test/scala/runtime/actionContainers/NodeJs6ActionContainerTests.scala
+++ b/tests/src/test/scala/runtime/actionContainers/Nodejs8ConcurrentTests.scala
@@ -21,8 +21,6 @@ import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 
 @RunWith(classOf[JUnitRunner])
-class NodeJs6ActionContainerTests extends NodeJsActionContainerTests {
-
-  override lazy val nodejsContainerImageName = "nodejs6action"
-
+class Nodejs8ConcurrentTests extends NodeJsConcurrentTests {
+  override lazy val nodejsContainerImageName = "action-nodejs-v8"
 }