You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/09/20 17:38:38 UTC
[incubator-openwhisk-package-alarms] branch master updated: Support
for multiple workers (#95)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-alarms.git
The following commit(s) were added to refs/heads/master by this push:
new 6f4b947 Support for multiple workers (#95)
6f4b947 is described below
commit 6f4b947bed6593dba91579424a8e224206a6465e
Author: Jason Peterson <ja...@us.ibm.com>
AuthorDate: Wed Sep 20 13:38:37 2017 -0400
Support for multiple workers (#95)
---
action/alarmWebAction.js | 60 +++++++++--
installCatalog.sh | 24 +++--
provider/lib/utils.js | 55 +++++-----
.../system/packages/AlarmsMultiWorkersTests.scala | 120 +++++++++++++++++++++
4 files changed, 216 insertions(+), 43 deletions(-)
diff --git a/action/alarmWebAction.js b/action/alarmWebAction.js
index bc13019..232d82d 100644
--- a/action/alarmWebAction.js
+++ b/action/alarmWebAction.js
@@ -17,6 +17,7 @@ function main(params) {
var nano = require('nano')(params.DB_URL);
var db = nano.db.use(params.DB_NAME);
+ var workers = params.workers instanceof Array ? params.workers : [];
if (params.__ow_method === "put") {
@@ -44,20 +45,25 @@ function main(params) {
maxTriggers: params.maxTriggers || -1,
status: {
'active': true,
- 'dateChanged': new Date().toISOString(),
+ 'dateChanged': new Date().toISOString()
}
};
return new Promise(function (resolve, reject) {
verifyTriggerAuth(triggerURL, params.authKey, false)
.then(() => {
- return createTrigger(db, triggerID, newTrigger);
+ return getWorkerID(db, workers);
+ })
+ .then((worker) => {
+ console.log('trigger will be assigned to worker ' + worker);
+ newTrigger.worker = worker;
+ return createTrigger(db, triggerID, newTrigger);
})
.then(() => {
resolve({
statusCode: 200,
headers: {'Content-Type': 'application/json'},
- body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64'),
+ body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64')
});
})
.catch(err => {
@@ -80,7 +86,7 @@ function main(params) {
resolve({
statusCode: 200,
headers: {'Content-Type': 'application/json'},
- body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64'),
+ body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64')
});
})
.catch(err => {
@@ -93,6 +99,44 @@ function main(params) {
}
}
+function getWorkerID(db, availabeWorkers) {
+
+ return new Promise((resolve, reject) => {
+ var workerID = availabeWorkers[0] || 'worker0';
+
+ if (availabeWorkers.length > 1) {
+ db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) {
+ if (!err) {
+ var triggersByWorker = {};
+
+ availabeWorkers.forEach(worker => {
+ triggersByWorker[worker] = 0;
+ });
+
+ body.rows.forEach(row => {
+ if (row.key in triggersByWorker) {
+ triggersByWorker[row.key] = row.value;
+ }
+ });
+
+ // find which worker has the least number of assigned triggers
+ for (var worker in triggersByWorker) {
+ if (triggersByWorker[worker] < triggersByWorker[workerID]) {
+ workerID = worker;
+ }
+ }
+ resolve(workerID);
+ } else {
+ reject(err);
+ }
+ });
+ }
+ else {
+ resolve(workerID);
+ }
+ });
+}
+
function createTrigger(triggerDB, triggerID, newTrigger) {
return new Promise(function(resolve, reject) {
@@ -124,7 +168,8 @@ function updateTrigger(triggerDB, triggerID, retryCount) {
updateTrigger(triggerDB, triggerID, (retryCount + 1))
.then(id => {
resolve(id);
- }).catch(err => {
+ })
+ .catch(err => {
reject(err);
});
}, 1000);
@@ -146,7 +191,8 @@ function updateTrigger(triggerDB, triggerID, retryCount) {
updateTrigger(triggerDB, id, (retryCount + 1))
.then(id => {
resolve(id);
- }).catch(err => {
+ })
+ .catch(err => {
reject(err);
});
}
@@ -227,7 +273,7 @@ function sendError(statusCode, error, message) {
return {
statusCode: statusCode,
headers: { 'Content-Type': 'application/json' },
- body: new Buffer(JSON.stringify(params)).toString('base64'),
+ body: new Buffer(JSON.stringify(params)).toString('base64')
};
}
diff --git a/installCatalog.sh b/installCatalog.sh
index 96934f6..41fd83b 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -4,7 +4,7 @@
# automatically
#
# To run this command
-# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>
+# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> <workers>
set -e
set -x
@@ -14,7 +14,7 @@ WSK_CLI="$OPENWHISK_HOME/bin/wsk"
if [ $# -eq 0 ]
then
-echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>"
+echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> <workers>"
fi
AUTH="$1"
@@ -22,6 +22,7 @@ EDGEHOST="$2"
DB_URL="$3"
DB_NAME="${4}alarmservice"
APIHOST="$5"
+WORKERS="$6"
# If the auth key file exists, read the key in the file. Otherwise, take the
# first argument as the key itself.
@@ -58,16 +59,25 @@ $WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" a
-a description 'Fire trigger when alarm occurs' \
-a feed true
-$WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no alarmsWeb \
- -p DB_URL "$DB_URL" \
- -p DB_NAME "$DB_NAME" \
- -p apihost "$APIHOST"
+if [ -n "$WORKERS" ];
+then
+ $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no alarmsWeb \
+ -p DB_URL "$DB_URL" \
+ -p DB_NAME "$DB_NAME" \
+ -p apihost "$APIHOST" \
+ -p workers "$WORKERS"
+else
+ $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no alarmsWeb \
+ -p DB_URL "$DB_URL" \
+ -p DB_NAME "$DB_NAME" \
+ -p apihost "$APIHOST"
+fi
# make alarmWebAction.zip
cd action
npm install
-if [ -e alarmWebAction.zip ]
+if [ -e alarmWebAction.zip ];
then
rm -rf alarmWebAction.zip
fi
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 83f57d0..5103356 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -164,36 +164,33 @@ module.exports = function(
this.disableTrigger = function(triggerIdentifier, statusCode, message) {
var method = 'disableTrigger';
- //only active/master provider should update the database
- if (utils.activeHost === utils.host) {
- triggerDB.get(triggerIdentifier, function (err, existing) {
- if (!err) {
- if (!existing.status || existing.status.active === true) {
- var updatedTrigger = existing;
- var status = {
- 'active': false,
- 'dateChanged': new Date().toISOString(),
- 'reason': {'kind': 'AUTO', 'statusCode': statusCode, 'message': message}
- };
- updatedTrigger.status = status;
-
- triggerDB.insert(updatedTrigger, triggerIdentifier, function (err) {
- if (err) {
- logger.error(method, 'there was an error while disabling', triggerIdentifier, 'in database.', err);
- }
- else {
- logger.info(method, 'trigger', triggerIdentifier, 'successfully disabled in database');
- }
- });
- }
- }
- else {
- logger.info(method, 'could not find', triggerIdentifier, 'in database');
- //make sure it is removed from memory as well
- utils.deleteTrigger(triggerIdentifier);
+ triggerDB.get(triggerIdentifier, function (err, existing) {
+ if (!err) {
+ if (!existing.status || existing.status.active === true) {
+ var updatedTrigger = existing;
+ var status = {
+ 'active': false,
+ 'dateChanged': new Date().toISOString(),
+ 'reason': {'kind': 'AUTO', 'statusCode': statusCode, 'message': message}
+ };
+ updatedTrigger.status = status;
+
+ triggerDB.insert(updatedTrigger, triggerIdentifier, function (err) {
+ if (err) {
+ logger.error(method, 'there was an error while disabling', triggerIdentifier, 'in database.', err);
+ }
+ else {
+ logger.info(method, 'trigger', triggerIdentifier, 'successfully disabled in database');
+ }
+ });
}
- });
- }
+ }
+ else {
+ logger.info(method, 'could not find', triggerIdentifier, 'in database');
+ //make sure it is removed from memory as well
+ utils.deleteTrigger(triggerIdentifier);
+ }
+ });
};
this.deleteTrigger = function(triggerIdentifier) {
diff --git a/tests/src/test/scala/system/packages/AlarmsMultiWorkersTests.scala b/tests/src/test/scala/system/packages/AlarmsMultiWorkersTests.scala
new file mode 100644
index 0000000..ebdcfb8
--- /dev/null
+++ b/tests/src/test/scala/system/packages/AlarmsMultiWorkersTests.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.packages
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.SSLConfig
+import com.jayway.restassured.http.ContentType
+import common._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol.StringJsonFormat
+import spray.json.DefaultJsonProtocol._
+import spray.json.{pimpAny, _}
+import whisk.core.database.test.DatabaseScriptTestUtils
+import whisk.utils.JsHelpers
+
+
+@RunWith(classOf[JUnitRunner])
+class AlarmsMultiWorkersTests extends FlatSpec
+ with Matchers
+ with WskActorSystem
+ with WskTestHelpers
+ with StreamLogging
+ with DatabaseScriptTestUtils {
+
+ val wskprops = WskProps()
+ val wsk = new Wsk
+ val auth = WhiskProperties.getBasicAuth
+ val user = auth.fst
+ val password = auth.snd
+
+ val webAction = "/whisk.system/alarmsWeb/alarmWebAction"
+ val webActionURL = s"https://${wskprops.apihost}/api/v1/web${webAction}.http"
+
+ behavior of "Alarms multi workers feed tests"
+
+ it should "create triggers assigned to worker0 and worker1" in withAssetCleaner(WskProps()) {
+ (wp, assetHelper) =>
+ implicit val wskprops = wp // shadow global props and make implicit
+
+ val worker0Trigger = s"dummyAlarmsTrigger-${System.currentTimeMillis}"
+ val worker0Params = JsObject(
+ "triggerName" -> JsString(worker0Trigger),
+ "authKey" -> JsString(s"$user:$password"),
+ "cron" -> "* * * * *".toJson,
+ "workers" -> JsArray(JsString("worker0")))
+
+ val worker1Trigger = s"dummyAlarmsTrigger-${System.currentTimeMillis}"
+ val worker1Params = JsObject(
+ "triggerName" -> JsString(worker1Trigger),
+ "authKey" -> JsString(s"$user:$password"),
+ "cron" -> "* * * * *".toJson,
+ "workers" -> JsArray(JsString("worker0"), JsString("worker1")))
+
+ try {
+ wsk.trigger.create(worker0Trigger)
+
+ //create trigger feed and assign to worker0
+ makePutCallWithExpectedResult(worker0Params, 200)
+
+ wsk.trigger.create(worker1Trigger)
+
+ //create trigger feed and assign to worker0 or worker1
+ //the one with the least assigned triggers will be chosen
+ makePutCallWithExpectedResult(worker1Params, 200)
+
+ val dbName = s"${dbPrefix}alarmservice"
+ val documents = getAllDocs(dbName)
+
+ val worker1Doc = documents
+ .fields("rows")
+ .convertTo[List[JsObject]]
+ .filter(_.fields("id").convertTo[String].equals(s"$user:$password/_/$worker1Trigger"))
+
+ JsHelpers.getFieldPath(worker1Doc(0), "doc", "worker") shouldBe Some(JsString("worker1"))
+ } finally {
+ //delete triggers
+ wsk.trigger.delete(worker0Trigger)
+ wsk.trigger.delete(worker1Trigger)
+
+ makeDeleteCallWithExpectedResult(worker0Params, 200)
+ makeDeleteCallWithExpectedResult(worker1Params, 200)
+ }
+ }
+
+ def makePutCallWithExpectedResult(params: JsObject, expectedCode: Int) = {
+ val response = RestAssured.given()
+ .contentType(ContentType.JSON)
+ .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation()))
+ .body(params.toString())
+ .put(webActionURL)
+ assert(response.statusCode() == expectedCode)
+ }
+
+ def makeDeleteCallWithExpectedResult(params: JsObject, expectedCode: Int) = {
+ val response = RestAssured.given()
+ .contentType(ContentType.JSON)
+ .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation()))
+ .body(params.toString())
+ .delete(webActionURL)
+ assert(response.statusCode() == expectedCode)
+ }
+
+
+}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].