You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/11/13 21:21:06 UTC
[incubator-openwhisk-package-alarms] branch master updated: create
fire once action to support firing trigger on a specific date (#107)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-alarms.git
The following commit(s) were added to refs/heads/master by this push:
new 9714585 create fire once action to support firing trigger on a specific date (#107)
9714585 is described below
commit 9714585d2a64d49ce47ba8e60c567624aa7c7ec1
Author: Jason Peterson <ja...@us.ibm.com>
AuthorDate: Mon Nov 13 16:21:04 2017 -0500
create fire once action to support firing trigger on a specific date (#107)
* create fire once action to support firing trigger on a specific date
* add tests for the new fire once action
---
.gitignore | 1 +
action/alarm.js | 63 +----
action/alarmFeed_package.json | 5 +
action/alarmOnce.js | 31 +++
action/alarmOnce_package.json | 5 +
action/alarmWebAction.js | 306 ++++-----------------
action/{package.json => alarmWeb_package.json} | 0
action/lib/Database.js | 172 ++++++++++++
action/{alarm.js => lib/common.js} | 75 +++--
installCatalog.sh | 31 ++-
provider/lib/cronAlarm.js | 35 +++
provider/lib/dateAlarm.js | 37 +++
provider/lib/utils.js | 90 +++---
.../system/health/AlarmsHealthFeedTests.scala | 43 +--
.../scala/system/packages/AlarmsFeedTests.scala | 180 ++++++++++--
15 files changed, 629 insertions(+), 445 deletions(-)
diff --git a/.gitignore b/.gitignore
index ac4bde5..30c1445 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,4 @@ node_modules/
.env
.vscode
action/*.zip
+action/package.json
diff --git a/action/alarm.js b/action/alarm.js
index a5c217f..6e72985 100644
--- a/action/alarm.js
+++ b/action/alarm.js
@@ -1,4 +1,4 @@
-var request = require('request');
+const common = require('./lib/common');
function main(msg) {
@@ -14,74 +14,17 @@ function main(msg) {
var lifecycleEvent = msg.lifecycleEvent;
var endpoint = msg.apihost;
- var webparams = createWebParams(msg);
+ var webparams = common.createWebParams(msg);
var url = `https://${endpoint}/api/v1/web/whisk.system/alarmsWeb/alarmWebAction.http`;
if (lifecycleEvent in eventMap) {
var method = eventMap[lifecycleEvent];
- return requestHelper(url, webparams, method);
+ return common.requestHelper(url, webparams, method);
} else {
return Promise.reject('unsupported lifecycleEvent');
}
}
-function requestHelper(url, input, method) {
-
- return new Promise(function(resolve, reject) {
-
- request({
- method : method,
- url : url,
- json: input,
- rejectUnauthorized: false
- }, function(error, response, body) {
-
- if (!error && response.statusCode === 200) {
- resolve(body);
- }
- else {
- if (response) {
- console.log('alarm: Error invoking whisk action:', response.statusCode, body);
- reject(body);
- }
- else {
- console.log('alarm: Error invoking whisk action:', error);
- reject(error);
- }
- }
- });
- });
-}
-
-function createWebParams(rawParams) {
- var namespace = process.env.__OW_NAMESPACE;
- var triggerName = '/' + namespace + '/' + parseQName(rawParams.triggerName).name;
-
- var webparams = Object.assign({}, rawParams);
- delete webparams.lifecycleEvent;
- delete webparams.apihost;
-
- webparams.triggerName = triggerName;
-
- return webparams;
-}
-
-function parseQName(qname) {
- var parsed = {};
- var delimiter = '/';
- var defaultNamespace = '_';
- if (qname && qname.charAt(0) === delimiter) {
- var parts = qname.split(delimiter);
- parsed.namespace = parts[1];
- parsed.name = parts.length > 2 ? parts.slice(2).join(delimiter) : '';
- } else {
- parsed.namespace = defaultNamespace;
- parsed.name = qname;
- }
- return parsed;
-}
exports.main = main;
-
-
diff --git a/action/alarmFeed_package.json b/action/alarmFeed_package.json
new file mode 100644
index 0000000..8ae9040
--- /dev/null
+++ b/action/alarmFeed_package.json
@@ -0,0 +1,5 @@
+{
+ "name": "alarmFeed",
+ "version": "1.0.0",
+ "main": "alarm.js"
+}
diff --git a/action/alarmOnce.js b/action/alarmOnce.js
new file mode 100644
index 0000000..38a7ebc
--- /dev/null
+++ b/action/alarmOnce.js
@@ -0,0 +1,31 @@
+const common = require('./lib/common');
+
+function main(msg) {
+
+ let eventMap = {
+ CREATE: 'put',
+ READ: 'get',
+ // UPDATE: 'put',
+ DELETE: 'delete'
+ };
+ // for creation -> CREATE
+ // for reading -> READ
+ // for deletion -> DELETE
+ var lifecycleEvent = msg.lifecycleEvent;
+
+ var endpoint = msg.apihost;
+ var webparams = common.createWebParams(msg);
+ webparams.fireOnce = true;
+
+ var url = `https://${endpoint}/api/v1/web/whisk.system/alarmsWeb/alarmWebAction.http`;
+
+ if (lifecycleEvent in eventMap) {
+ var method = eventMap[lifecycleEvent];
+ return common.requestHelper(url, webparams, method);
+ } else {
+ return Promise.reject('unsupported lifecycleEvent');
+ }
+}
+
+
+exports.main = main;
diff --git a/action/alarmOnce_package.json b/action/alarmOnce_package.json
new file mode 100644
index 0000000..e48cd14
--- /dev/null
+++ b/action/alarmOnce_package.json
@@ -0,0 +1,5 @@
+{
+ "name": "alarmOnce",
+ "version": "1.0.0",
+ "main": "alarmOnce.js"
+}
diff --git a/action/alarmWebAction.js b/action/alarmWebAction.js
index 31ed6fd..4ba3a73 100644
--- a/action/alarmWebAction.js
+++ b/action/alarmWebAction.js
@@ -1,38 +1,27 @@
-var request = require('request');
var CronJob = require('cron').CronJob;
var moment = require('moment');
+const common = require('./lib/common');
+const Database = require('./lib/Database');
+
function main(params) {
if (!params.authKey) {
- return sendError(400, 'no authKey parameter was provided');
+ return common.sendError(400, 'no authKey parameter was provided');
}
if (!params.triggerName) {
- return sendError(400, 'no trigger name parameter was provided');
+ return common.sendError(400, 'no trigger name parameter was provided');
}
- var triggerParts = parseQName(params.triggerName);
+ var triggerParts = common.parseQName(params.triggerName);
var triggerID = `${params.authKey}/${triggerParts.namespace}/${triggerParts.name}`;
-
var triggerURL = `https://${params.apihost}/api/v1/namespaces/${triggerParts.namespace}/triggers/${triggerParts.name}`;
- var nano = require('nano')(params.DB_URL);
- var db = nano.db.use(params.DB_NAME);
var workers = params.workers instanceof Array ? params.workers : [];
+ var db;
if (params.__ow_method === "put") {
- if (!params.cron) {
- return sendError(400, 'alarms trigger feed is missing the cron parameter');
- }
- else {
- try {
- new CronJob(params.cron, function() {});
- } catch(ex) {
- return sendError(400, `cron pattern '${params.cron}' is not valid`);
- }
- }
-
if (typeof params.trigger_payload === 'string') {
params.trigger_payload = {payload: params.trigger_payload};
}
@@ -41,7 +30,6 @@ function main(params) {
apikey: params.authKey,
name: triggerParts.name,
namespace: triggerParts.namespace,
- cron: params.cron,
payload: params.trigger_payload || {},
maxTriggers: params.maxTriggers || -1,
status: {
@@ -50,15 +38,47 @@ function main(params) {
}
};
+ if (params.fireOnce) {
+ if (!params.date) {
+ return common.sendError(400, 'alarms once trigger feed is missing the date parameter');
+ }
+ else {
+ var date = new Date(params.date);
+ if (isNaN(date.getTime())) {
+ return common.sendError(400, `date parameter '${params.date}' is not a valid Date`);
+ }
+ else if (Date.now() >= date.getTime()) {
+ return common.sendError(400, `date parameter '${params.date}' must be in the future`);
+ }
+ else {
+ newTrigger.date = params.date;
+ }
+ }
+ }
+ else {
+ if (!params.cron) {
+ return common.sendError(400, 'alarms trigger feed is missing the cron parameter');
+ }
+ else {
+ try {
+ new CronJob(params.cron, function() {});
+ newTrigger.cron = params.cron;
+ } catch(ex) {
+ return common.sendError(400, `cron pattern '${params.cron}' is not valid`);
+ }
+ }
+ }
+
return new Promise(function (resolve, reject) {
- verifyTriggerAuth(triggerURL, params.authKey, false)
+ common.verifyTriggerAuth(triggerURL, params.authKey, false)
.then(() => {
- return getWorkerID(db, workers);
+ db = new Database(params.DB_URL, params.DB_NAME);
+ return db.getWorkerID(workers);
})
.then((worker) => {
console.log('trigger will be assigned to worker ' + worker);
newTrigger.worker = worker;
- return createTrigger(db, triggerID, newTrigger);
+ return db.createTrigger(triggerID, newTrigger);
})
.then(() => {
resolve({
@@ -75,16 +95,16 @@ function main(params) {
}
else if (params.__ow_method === "get") {
return new Promise(function (resolve, reject) {
- verifyTriggerAuth(triggerURL, params.authKey, false)
+ common.verifyTriggerAuth(triggerURL, params.authKey, false)
.then(() => {
- return getTrigger(db, triggerID);
+ db = new Database(params.DB_URL, params.DB_NAME);
+ return db.getTrigger(triggerID);
})
.then(doc => {
var body = {
config: {
name: doc.name,
namespace: doc.namespace,
- cron: doc.cron,
payload: doc.payload
},
status: {
@@ -94,6 +114,12 @@ function main(params) {
reason: doc.status.reason
}
};
+ if (doc.date) {
+ body.config.date = doc.date;
+ }
+ else {
+ body.config.cron = doc.cron;
+ }
resolve({
statusCode: 200,
headers: {'Content-Type': 'application/json'},
@@ -108,12 +134,13 @@ function main(params) {
else if (params.__ow_method === "delete") {
return new Promise(function (resolve, reject) {
- verifyTriggerAuth(triggerURL, params.authKey, true)
+ common.verifyTriggerAuth(triggerURL, params.authKey, true)
.then(() => {
- return updateTrigger(db, triggerID, 0);
+ db = new Database(params.DB_URL, params.DB_NAME);
+ return db.updateTrigger(triggerID, 0);
})
.then(id => {
- return deleteTrigger(db, id, 0);
+ return db.deleteTrigger(id, 0);
})
.then(() => {
resolve({
@@ -128,231 +155,10 @@ function main(params) {
});
}
else {
- return sendError(400, 'unsupported lifecycleEvent');
+ return common.sendError(400, 'unsupported lifecycleEvent');
}
}
-function getWorkerID(db, availabeWorkers) {
-
- return new Promise((resolve, reject) => {
- var workerID = availabeWorkers[0] || 'worker0';
-
- if (availabeWorkers.length > 1) {
- db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) {
- if (!err) {
- var triggersByWorker = {};
-
- availabeWorkers.forEach(worker => {
- triggersByWorker[worker] = 0;
- });
-
- body.rows.forEach(row => {
- if (row.key in triggersByWorker) {
- triggersByWorker[row.key] = row.value;
- }
- });
-
- // find which worker has the least number of assigned triggers
- for (var worker in triggersByWorker) {
- if (triggersByWorker[worker] < triggersByWorker[workerID]) {
- workerID = worker;
- }
- }
- resolve(workerID);
- } else {
- reject(err);
- }
- });
- }
- else {
- resolve(workerID);
- }
- });
-}
-
-function createTrigger(triggerDB, triggerID, newTrigger) {
-
- return new Promise(function(resolve, reject) {
-
- triggerDB.insert(newTrigger, triggerID, function (err) {
- if (!err) {
- resolve();
- }
- else {
- reject(sendError(err.statusCode, 'error creating alarm trigger.', err.message));
- }
- });
- });
-}
-
-function getTrigger(triggerDB, triggerID, retry = true) {
-
- return new Promise(function(resolve, reject) {
-
- triggerDB.get(triggerID, function (err, existing) {
- if (err) {
- if (retry) {
- var parts = triggerID.split('/');
- var id = parts[0] + '/_/' + parts[2];
- getTrigger(triggerDB, id, false)
- .then(doc => {
- resolve(doc);
- })
- .catch(err => {
- reject(err);
- });
- } else {
- reject(sendError(err.statusCode, 'could not find the trigger in the database'));
- }
- } else {
- resolve(existing);
- }
- });
- });
-}
-
-function updateTrigger(triggerDB, triggerID, retryCount) {
-
- return new Promise(function(resolve, reject) {
-
- triggerDB.get(triggerID, function (err, existing) {
- if (!err) {
- var updatedTrigger = existing;
- updatedTrigger.status = {'active': false};
-
- triggerDB.insert(updatedTrigger, triggerID, function (err) {
- if (err) {
- if (err.statusCode === 409 && retryCount < 5) {
- setTimeout(function () {
- updateTrigger(triggerDB, triggerID, (retryCount + 1))
- .then(id => {
- resolve(id);
- })
- .catch(err => {
- reject(err);
- });
- }, 1000);
- }
- else {
- reject(sendError(err.statusCode, 'there was an error while marking the trigger for delete in the database.', err.message));
- }
- }
- else {
- resolve(triggerID);
- }
- });
- }
- else {
- //legacy alarms triggers may have been created with _ namespace
- if (retryCount === 0) {
- var parts = triggerID.split('/');
- var id = parts[0] + '/_/' + parts[2];
- updateTrigger(triggerDB, id, (retryCount + 1))
- .then(id => {
- resolve(id);
- })
- .catch(err => {
- reject(err);
- });
- }
- else {
- reject(sendError(err.statusCode, 'could not find the trigger in the database'));
- }
- }
- });
- });
-}
-
-function deleteTrigger(triggerDB, triggerID, retryCount) {
-
- return new Promise(function(resolve, reject) {
-
- triggerDB.get(triggerID, function (err, existing) {
- if (!err) {
- triggerDB.destroy(existing._id, existing._rev, function (err) {
- if (err) {
- if (err.statusCode === 409 && retryCount < 5) {
- setTimeout(function () {
- deleteTrigger(triggerDB, triggerID, (retryCount + 1))
- .then(resolve)
- .catch(err => {
- reject(err);
- });
- }, 1000);
- }
- else {
- reject(sendError(err.statusCode, 'there was an error while deleting the trigger from the database.', err.message));
- }
- }
- else {
- resolve();
- }
- });
- }
- else {
- reject(sendError(err.statusCode, 'could not find the trigger in the database'));
- }
- });
- });
-}
-
-function verifyTriggerAuth(triggerURL, authKey, isDelete) {
- var auth = authKey.split(':');
-
- return new Promise(function(resolve, reject) {
-
- request({
- method: 'get',
- url: triggerURL,
- auth: {
- user: auth[0],
- pass: auth[1]
- },
- rejectUnauthorized: false
- }, function(err, response) {
- if (err) {
- reject(sendError(400, 'Trigger authentication request failed.', err.message));
- }
- else if(response.statusCode >= 400 && !(isDelete && response.statusCode === 404)) {
- reject(sendError(response.statusCode, 'Trigger authentication request failed.'));
- }
- else {
- resolve();
- }
- });
- });
-}
-
-function sendError(statusCode, error, message) {
- var params = {error: error};
- if (message) {
- params.message = message;
- }
-
- return {
- statusCode: statusCode,
- headers: { 'Content-Type': 'application/json' },
- body: new Buffer(JSON.stringify(params)).toString('base64')
- };
-}
-
-
-function parseQName(qname) {
- var parsed = {};
- var delimiter = '/';
- var defaultNamespace = '_';
- if (qname && qname.charAt(0) === delimiter) {
- var parts = qname.split(delimiter);
- parsed.namespace = parts[1];
- parsed.name = parts.length > 2 ? parts.slice(2).join(delimiter) : '';
- } else {
- parsed.namespace = defaultNamespace;
- parsed.name = qname;
- }
- return parsed;
-}
-
-
exports.main = main;
diff --git a/action/package.json b/action/alarmWeb_package.json
similarity index 100%
rename from action/package.json
rename to action/alarmWeb_package.json
diff --git a/action/lib/Database.js b/action/lib/Database.js
new file mode 100644
index 0000000..e1fdb27
--- /dev/null
+++ b/action/lib/Database.js
@@ -0,0 +1,172 @@
+const common = require('./common');
+
+// constructor for DB object - a thin, promise-loving wrapper around nano
+module.exports = function(dbURL, dbName) {
+ var nano = require('nano')(dbURL);
+ this.db = nano.db.use(dbName);
+ var utilsDB = this;
+
+ this.getWorkerID = function(availabeWorkers) {
+
+ return new Promise((resolve, reject) => {
+ var workerID = availabeWorkers[0] || 'worker0';
+
+ if (availabeWorkers.length > 1) {
+ utilsDB.db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) {
+ if (!err) {
+ var triggersByWorker = {};
+
+ availabeWorkers.forEach(worker => {
+ triggersByWorker[worker] = 0;
+ });
+
+ body.rows.forEach(row => {
+ if (row.key in triggersByWorker) {
+ triggersByWorker[row.key] = row.value;
+ }
+ });
+
+ // find which worker has the least number of assigned triggers
+ for (var worker in triggersByWorker) {
+ if (triggersByWorker[worker] < triggersByWorker[workerID]) {
+ workerID = worker;
+ }
+ }
+ resolve(workerID);
+ } else {
+ reject(err);
+ }
+ });
+ }
+ else {
+ resolve(workerID);
+ }
+ });
+ };
+
+ this.createTrigger = function(triggerID, newTrigger) {
+
+ return new Promise(function(resolve, reject) {
+
+ utilsDB.db.insert(newTrigger, triggerID, function (err) {
+ if (!err) {
+ resolve();
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'error creating alarm trigger.', err.message));
+ }
+ });
+ });
+ };
+
+ this.getTrigger = function(triggerID, retry = true) {
+
+ return new Promise(function(resolve, reject) {
+
+ utilsDB.db.get(triggerID, function (err, existing) {
+ if (err) {
+ if (retry) {
+ var parts = triggerID.split('/');
+ var id = parts[0] + '/_/' + parts[2];
+ utilsDB.getTrigger(id, false)
+ .then(doc => {
+ resolve(doc);
+ })
+ .catch(err => {
+ reject(err);
+ });
+ } else {
+ reject(common.sendError(err.statusCode, 'could not find the trigger in the database'));
+ }
+ } else {
+ resolve(existing);
+ }
+ });
+ });
+ };
+
+ this.updateTrigger = function(triggerID, retryCount) {
+
+ return new Promise(function(resolve, reject) {
+
+ utilsDB.db.get(triggerID, function (err, existing) {
+ if (!err) {
+ var updatedTrigger = existing;
+ updatedTrigger.status = {'active': false};
+
+ utilsDB.db.insert(updatedTrigger, triggerID, function (err) {
+ if (err) {
+ if (err.statusCode === 409 && retryCount < 5) {
+ setTimeout(function () {
+ utilsDB.updateTrigger(triggerID, (retryCount + 1))
+ .then(id => {
+ resolve(id);
+ })
+ .catch(err => {
+ reject(err);
+ });
+ }, 1000);
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'there was an error while marking the trigger for delete in the database.', err.message));
+ }
+ }
+ else {
+ resolve(triggerID);
+ }
+ });
+ }
+ else {
+ //legacy alarms triggers may have been created with _ namespace
+ if (retryCount === 0) {
+ var parts = triggerID.split('/');
+ var id = parts[0] + '/_/' + parts[2];
+ utilsDB.updateTrigger(id, (retryCount + 1))
+ .then(id => {
+ resolve(id);
+ })
+ .catch(err => {
+ reject(err);
+ });
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'could not find the trigger in the database'));
+ }
+ }
+ });
+ });
+ };
+
+ this.deleteTrigger = function(triggerID, retryCount) {
+
+ return new Promise(function(resolve, reject) {
+
+ utilsDB.db.get(triggerID, function (err, existing) {
+ if (!err) {
+ utilsDB.db.destroy(existing._id, existing._rev, function (err) {
+ if (err) {
+ if (err.statusCode === 409 && retryCount < 5) {
+ setTimeout(function () {
+ utilsDB.deleteTrigger(triggerID, (retryCount + 1))
+ .then(resolve)
+ .catch(err => {
+ reject(err);
+ });
+ }, 1000);
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'there was an error while deleting the trigger from the database.', err.message));
+ }
+ }
+ else {
+ resolve();
+ }
+ });
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'could not find the trigger in the database'));
+ }
+ });
+ });
+ };
+};
diff --git a/action/alarm.js b/action/lib/common.js
similarity index 53%
copy from action/alarm.js
copy to action/lib/common.js
index a5c217f..a6c1477 100644
--- a/action/alarm.js
+++ b/action/lib/common.js
@@ -1,30 +1,4 @@
-var request = require('request');
-
-function main(msg) {
-
- let eventMap = {
- CREATE: 'put',
- READ: 'get',
- // UPDATE: 'put',
- DELETE: 'delete'
- };
- // for creation -> CREATE
- // for reading -> READ
- // for deletion -> DELETE
- var lifecycleEvent = msg.lifecycleEvent;
-
- var endpoint = msg.apihost;
- var webparams = createWebParams(msg);
-
- var url = `https://${endpoint}/api/v1/web/whisk.system/alarmsWeb/alarmWebAction.http`;
-
- if (lifecycleEvent in eventMap) {
- var method = eventMap[lifecycleEvent];
- return requestHelper(url, webparams, method);
- } else {
- return Promise.reject('unsupported lifecycleEvent');
- }
-}
+const request = require('request');
function requestHelper(url, input, method) {
@@ -67,6 +41,33 @@ function createWebParams(rawParams) {
return webparams;
}
+function verifyTriggerAuth(triggerURL, authKey, isDelete) {
+ var auth = authKey.split(':');
+
+ return new Promise(function(resolve, reject) {
+
+ request({
+ method: 'get',
+ url: triggerURL,
+ auth: {
+ user: auth[0],
+ pass: auth[1]
+ },
+ rejectUnauthorized: false
+ }, function(err, response) {
+ if (err) {
+ reject(sendError(400, 'Trigger authentication request failed.', err.message));
+ }
+ else if(response.statusCode >= 400 && !(isDelete && response.statusCode === 404)) {
+ reject(sendError(response.statusCode, 'Trigger authentication request failed.'));
+ }
+ else {
+ resolve();
+ }
+ });
+ });
+}
+
function parseQName(qname) {
var parsed = {};
var delimiter = '/';
@@ -82,6 +83,24 @@ function parseQName(qname) {
return parsed;
}
-exports.main = main;
+function sendError(statusCode, error, message) {
+ var params = {error: error};
+ if (message) {
+ params.message = message;
+ }
+
+ return {
+ statusCode: statusCode,
+ headers: { 'Content-Type': 'application/json' },
+ body: new Buffer(JSON.stringify(params)).toString('base64')
+ };
+}
+module.exports = {
+ 'requestHelper': requestHelper,
+ 'createWebParams': createWebParams,
+ 'verifyTriggerAuth': verifyTriggerAuth,
+ 'parseQName': parseQName,
+ 'sendError': sendError
+};
diff --git a/installCatalog.sh b/installCatalog.sh
index 41fd83b..63fde64 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -55,10 +55,35 @@ $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared yes ala
-p cron '' \
-p trigger_payload ''
-$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarms/alarm "$PACKAGE_HOME/action/alarm.js" \
+# make alarmFeed.zip
+cd action
+
+if [ -e alarmFeed.zip ]
+then
+ rm -rf alarmFeed.zip
+fi
+
+cp -f alarmFeed_package.json package.json
+zip -r alarmFeed.zip lib package.json alarm.js
+
+$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarms/alarm "$PACKAGE_HOME/action/alarmFeed.zip" \
-a description 'Fire trigger when alarm occurs' \
-a feed true
+
+# make alarmOnce.zip
+if [ -e alarmOnce.zip ]
+then
+ rm -rf alarmOnce.zip
+fi
+
+cp -f alarmOnce_package.json package.json
+zip -r alarmOnce.zip lib package.json alarmOnce.js
+
+$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarms/once "$PACKAGE_HOME/action/alarmOnce.zip" \
+ -a description 'Fire trigger once when alarm occurs' \
+ -a feed true
+
if [ -n "$WORKERS" ];
then
$WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no alarmsWeb \
@@ -74,7 +99,7 @@ else
fi
# make alarmWebAction.zip
-cd action
+cp -f alarmWeb_package.json package.json
npm install
if [ -e alarmWebAction.zip ];
@@ -82,7 +107,7 @@ then
rm -rf alarmWebAction.zip
fi
-zip -r alarmWebAction.zip package.json alarmWebAction.js node_modules
+zip -r alarmWebAction.zip lib package.json alarmWebAction.js node_modules
$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarmsWeb/alarmWebAction "$PACKAGE_HOME/action/alarmWebAction.zip" \
-a description 'Create/Delete a trigger in alarms provider Database' \
diff --git a/provider/lib/cronAlarm.js b/provider/lib/cronAlarm.js
new file mode 100644
index 0000000..efc68c9
--- /dev/null
+++ b/provider/lib/cronAlarm.js
@@ -0,0 +1,35 @@
+var CronJob = require('cron').CronJob;
+var constants = require('./constants.js');
+
+module.exports = function(logger, newTrigger) {
+
+ var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
+
+ var cachedTrigger = {
+ apikey: newTrigger.apikey,
+ name: newTrigger.name,
+ namespace: newTrigger.namespace,
+ cron: newTrigger.cron,
+ triggersLeft: maxTriggers,
+ maxTriggers: maxTriggers
+ };
+
+ this.scheduleAlarm = function(triggerIdentifier, callback) {
+ var method = 'scheduleCronAlarm';
+
+ try {
+ return new Promise(function(resolve, reject) {
+
+ var cronHandle = new CronJob(newTrigger.cron, callback);
+ logger.info(method, triggerIdentifier, 'starting cron job');
+ cronHandle.start();
+
+ cachedTrigger.cronHandle = cronHandle;
+ resolve(cachedTrigger);
+ });
+ } catch (err) {
+ return Promise.reject(err);
+ }
+ };
+
+};
diff --git a/provider/lib/dateAlarm.js b/provider/lib/dateAlarm.js
new file mode 100644
index 0000000..656ac40
--- /dev/null
+++ b/provider/lib/dateAlarm.js
@@ -0,0 +1,37 @@
+var CronJob = require('cron').CronJob;
+
+module.exports = function(logger, newTrigger) {
+
+ var cachedTrigger = {
+ apikey: newTrigger.apikey,
+ name: newTrigger.name,
+ namespace: newTrigger.namespace,
+ date: newTrigger.date
+ };
+
+ this.scheduleAlarm = function(triggerIdentifier, callback) {
+ var method = 'scheduleDateAlarm';
+
+ try {
+ return new Promise(function(resolve, reject) {
+
+ var cron = new Date(newTrigger.date);
+ if (cron.getTime() > Date.now()) {
+ logger.info(method, 'Creating a fire once alarms trigger', triggerIdentifier);
+ var cronHandle = new CronJob(cron, callback);
+ logger.info(method, triggerIdentifier, 'starting cron job');
+ cronHandle.start();
+
+ cachedTrigger.cronHandle = cronHandle;
+ resolve(cachedTrigger);
+ }
+ else {
+ return reject("the fire once date has expired");
+ }
+ });
+ } catch (err) {
+ return Promise.reject(err);
+ }
+ };
+
+};
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index c804cb8..f5ee08f 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -1,9 +1,9 @@
var _ = require('lodash');
var request = require('request');
-var CronJob = require('cron').CronJob;
var HttpStatus = require('http-status-codes');
var constants = require('./constants.js');
-
+var DateAlarm = require('./dateAlarm.js');
+var CronAlarm = require('./cronAlarm.js');
module.exports = function(
logger,
@@ -32,42 +32,28 @@ module.exports = function(
this.createTrigger = function(triggerIdentifier, newTrigger) {
var method = 'createTrigger';
- try {
- return new Promise(function(resolve, reject) {
-
- var cronHandle = new CronJob(newTrigger.cron,
- function onTick() {
- if (utils.activeHost === utils.host) {
- var triggerHandle = utils.triggers[triggerIdentifier];
- if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
- try {
- utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
- } catch (e) {
- logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
- }
- }
- }
+ var callback = function onTick() {
+ if (utils.activeHost === utils.host) {
+ var triggerHandle = utils.triggers[triggerIdentifier];
+ if (triggerHandle && (!triggerHandle.maxTriggers || triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
+ try {
+ utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
+ } catch (e) {
+ logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
}
- );
- logger.info(method, triggerIdentifier, 'starting cron job');
- cronHandle.start();
-
- var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
-
- utils.triggers[triggerIdentifier] = {
- cron: newTrigger.cron,
- cronHandle: cronHandle,
- triggersLeft: maxTriggers,
- maxTriggers: maxTriggers,
- apikey: newTrigger.apikey,
- name: newTrigger.name,
- namespace: newTrigger.namespace
- };
- resolve(triggerIdentifier);
- });
- } catch (err) {
- return Promise.reject(err);
+ }
+ }
+ };
+
+ var alarm;
+ if (newTrigger.date) {
+ alarm = new DateAlarm(logger, newTrigger);
+ }
+ else {
+ alarm = new CronAlarm(logger, newTrigger);
}
+
+ return alarm.scheduleAlarm(triggerIdentifier, callback);
};
this.fireTrigger = function(namespace, name, payload, apikey) {
@@ -83,13 +69,11 @@ module.exports = function(
utils.postTrigger(dataTrigger, payload, uri, auth, 0)
.then(triggerId => {
logger.info(method, 'Trigger', triggerId, 'was successfully fired');
- if (dataTrigger.triggersLeft === 0) {
- utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after reaching max triggers');
- logger.warn(method, 'no more triggers left, disabled', triggerIdentifier);
- }
+ utils.disableExtinctTriggers(triggerIdentifier, dataTrigger);
})
.catch(err => {
logger.error(method, err);
+ utils.disableExtinctTriggers(triggerIdentifier, dataTrigger);
});
};
@@ -99,7 +83,7 @@ module.exports = function(
return new Promise(function(resolve, reject) {
// only manage trigger fires if they are not infinite
- if (dataTrigger.maxTriggers !== -1) {
+ if (dataTrigger.maxTriggers && dataTrigger.maxTriggers !== -1) {
dataTrigger.triggersLeft--;
}
@@ -118,7 +102,7 @@ module.exports = function(
if (error || response.statusCode >= 400) {
// only manage trigger fires if they are not infinite
- if (dataTrigger.maxTriggers !== -1) {
+ if (dataTrigger.maxTriggers && dataTrigger.maxTriggers !== -1) {
dataTrigger.triggersLeft++;
}
logger.error(method, 'there was an error invoking', triggerIdentifier, response ? response.statusCode : error);
@@ -145,7 +129,7 @@ module.exports = function(
}
}
} else {
- logger.info(method, 'fired', triggerIdentifier, dataTrigger.triggersLeft, 'triggers left');
+ logger.info(method, 'fired', triggerIdentifier);
resolve(triggerIdentifier);
}
}
@@ -161,6 +145,20 @@ module.exports = function(
[HttpStatus.REQUEST_TIMEOUT, HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1);
};
+ this.disableExtinctTriggers = function(triggerIdentifier, dataTrigger) {
+ var method = 'disableExtinctTriggers';
+
+ if (dataTrigger.date) {
+ utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after firing once');
+ logger.info(method, 'the fire once date has expired, disabled', triggerIdentifier);
+ }
+ else if (dataTrigger.maxTriggers && dataTrigger.triggersLeft === 0) {
+ utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after reaching max triggers');
+ logger.warn(method, 'no more triggers left, disabled', triggerIdentifier);
+ }
+
+ };
+
this.disableTrigger = function(triggerIdentifier, statusCode, message) {
var method = 'disableTrigger';
@@ -248,7 +246,8 @@ module.exports = function(
}
else {
utils.createTrigger(triggerIdentifier, doc)
- .then(triggerIdentifier => {
+ .then(cachedTrigger => {
+ utils.triggers[triggerIdentifier] = cachedTrigger;
logger.info(method, triggerIdentifier, 'created successfully');
})
.catch(err => {
@@ -292,7 +291,8 @@ module.exports = function(
//ignore changes to disabled triggers
if (!doc.status || doc.status.active === true) {
utils.createTrigger(triggerIdentifier, doc)
- .then(triggerIdentifier => {
+ .then(cachedTrigger => {
+ utils.triggers[triggerIdentifier] = cachedTrigger;
logger.info(method, triggerIdentifier, 'created successfully');
})
.catch(err => {
diff --git a/tests/src/test/scala/system/health/AlarmsHealthFeedTests.scala b/tests/src/test/scala/system/health/AlarmsHealthFeedTests.scala
index 80004ea..55069f1 100644
--- a/tests/src/test/scala/system/health/AlarmsHealthFeedTests.scala
+++ b/tests/src/test/scala/system/health/AlarmsHealthFeedTests.scala
@@ -22,7 +22,7 @@ import common.{TestHelpers, Wsk, WskProps, WskTestHelpers}
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
-import spray.json.DefaultJsonProtocol.{IntJsonFormat, StringJsonFormat}
+import spray.json.DefaultJsonProtocol.{LongJsonFormat, StringJsonFormat}
import spray.json.pimpAny
/**
@@ -89,7 +89,7 @@ class AlarmsHealthFeedTests
activationsAfterWait should be(activationsAfterDelete)
}
- it should "should not fail when specifying triggers above 1 Million" in withAssetCleaner(wskprops) {
+ it should "fire an alarm once trigger when specifying a future date" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
implicit val wskprops = wp // shadow global props and make implicit
val triggerName = s"dummyAlarmsTrigger-${System.currentTimeMillis}"
@@ -105,43 +105,22 @@ class AlarmsHealthFeedTests
(pkg, name) => pkg.bind("/whisk.system/alarms", name)
}
+ val futureDate = System.currentTimeMillis + (1000 * 30)
+
// create whisk stuff
println(s"Creating trigger: $triggerName")
val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
(trigger, name) =>
- trigger.create(name, feed = Some(s"$packageName/alarm"), parameters = Map(
+ trigger.create(name, feed = Some(s"$packageName/once"), parameters = Map(
"trigger_payload" -> "alarmTest".toJson,
- "cron" -> "* * * * * *".toJson,
- "maxTriggers" -> 100000000.toJson))
+ "date" -> futureDate.toJson))
}
feedCreationResult.stdout should include("ok")
- }
-
- it should "should not deny trigger creation when choosing maxTriggers set to infinity (-1)" in withAssetCleaner(wskprops) {
- (wp, assetHelper) =>
- implicit val wskprops = wp // shadow global props and make implicit
- val triggerName = s"dummyAlarmsTrigger-${System.currentTimeMillis}"
- val packageName = "dummyAlarmsPackage"
-
- // the package alarms should be there
- val packageGetResult = wsk.pkg.get("/whisk.system/alarms")
- println("fetched package alarms")
- packageGetResult.stdout should include("ok")
- // create package binding
- assetHelper.withCleaner(wsk.pkg, packageName) {
- (pkg, name) => pkg.bind("/whisk.system/alarms", name)
- }
-
- // create whisk stuff
- println(s"Creating trigger: $triggerName")
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, name) =>
- trigger.create(name, feed = Some(s"$packageName/alarm"), parameters = Map(
- "trigger_payload" -> "alarmTest".toJson,
- "cron" -> "* * * * * *".toJson,
- "maxTriggers" -> (-1).toJson))
- }
- feedCreationResult.stderr should not include("error")
+ println("waiting for trigger")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 60).length
+ println(s"Found activation size (should be 1): $activations")
+ activations should be(1)
}
+
}
diff --git a/tests/src/test/scala/system/packages/AlarmsFeedTests.scala b/tests/src/test/scala/system/packages/AlarmsFeedTests.scala
index 5377287..f430270 100644
--- a/tests/src/test/scala/system/packages/AlarmsFeedTests.scala
+++ b/tests/src/test/scala/system/packages/AlarmsFeedTests.scala
@@ -24,7 +24,7 @@ import common.Wsk
import common.WskProps
import common.WskTestHelpers
import spray.json.DefaultJsonProtocol.IntJsonFormat
-import spray.json.DefaultJsonProtocol.StringJsonFormat
+import spray.json.DefaultJsonProtocol.{LongJsonFormat, StringJsonFormat}
import spray.json.DefaultJsonProtocol.BooleanJsonFormat
import spray.json.{JsObject, JsString, pimpAny}
@@ -43,32 +43,6 @@ class AlarmsFeedTests
behavior of "Alarms trigger service"
- it should "fire periodic trigger using cron feed using _ namespace" in withAssetCleaner(WskProps()) {
- (wp, assetHelper) =>
- implicit val wskprops = wp // shadow global props and make implicit
- val triggerName = s"dummyAlarmsTrigger-${System.currentTimeMillis}"
-
- // the package alarms should be there
- val packageGetResult = wsk.pkg.get("/whisk.system/alarms")
- println("fetched package alarms")
- packageGetResult.stdout should include("ok")
-
- // create whisk stuff
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, name) =>
- trigger.create(name, feed = Some("/whisk.system/alarms/alarm"), parameters = Map(
- "trigger_payload" -> "alarmTest".toJson,
- "cron" -> "* * * * * *".toJson,
- "maxTriggers" -> 3.toJson))
- }
- feedCreationResult.stdout should include("ok")
-
- // get activation list of the trigger
- val activations = wsk.activation.pollFor(N = 4, Some(triggerName), retries = 15).length
- println(s"Found activation size: $activations")
- activations should be(3)
- }
-
it should "should disable after reaching max triggers" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
implicit val wskprops = wp // shadow global props and make implicit
@@ -163,4 +137,156 @@ class AlarmsFeedTests
}
}
+
+ it should "return error message when alarm action does not include cron parameter" in withAssetCleaner(wskprops) {
+
+ (wp, assetHelper) =>
+ implicit val wskprops = wp // shadow global props and make implicit
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "alarm"
+
+ // the package alarms should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/alarms")
+ println("fetched package alarms")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/alarms", name)
+ }
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "trigger_payload" -> "alarmTest".toJson),
+ expectedExitCode = 246)
+ }
+ feedCreationResult.stderr should include("alarms trigger feed is missing the cron parameter")
+
+ }
+
+ it should "return error message when alarms once action does not include date parameter" in withAssetCleaner(wskprops) {
+
+ (wp, assetHelper) =>
+ implicit val wskprops = wp // shadow global props and make implicit
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "once"
+
+ // the package alarms should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/alarms")
+ println("fetched package alarms")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/alarms", name)
+ }
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "trigger_payload" -> "alarmTest".toJson),
+ expectedExitCode = 246)
+ }
+ feedCreationResult.stderr should include("alarms once trigger feed is missing the date parameter")
+
+ }
+
+ it should "return error message when alarm action includes invalid cron parameter" in withAssetCleaner(wskprops) {
+
+ (wp, assetHelper) =>
+ implicit val wskprops = wp // shadow global props and make implicit
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "alarm"
+
+ // the package alarms should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/alarms")
+ println("fetched package alarms")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/alarms", name)
+ }
+
+ val cron = System.currentTimeMillis
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "trigger_payload" -> "alarmTest".toJson,
+ "cron" -> cron.toJson),
+ expectedExitCode = 246)
+ }
+ feedCreationResult.stderr should include(s"cron pattern '${cron}' is not valid")
+
+ }
+
+ it should "return error message when alarms once action includes an invalid date parameter" in withAssetCleaner(wskprops) {
+
+ (wp, assetHelper) =>
+ implicit val wskprops = wp // shadow global props and make implicit
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "once"
+
+ // the package alarms should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/alarms")
+ println("fetched package alarms")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/alarms", name)
+ }
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "trigger_payload" -> "alarmTest".toJson,
+ "date" -> "tomorrow".toJson),
+ expectedExitCode = 246)
+ }
+ feedCreationResult.stderr should include("date parameter 'tomorrow' is not a valid Date")
+
+ }
+
+ it should "return error message when alarms once action date parameter is not a future date" in withAssetCleaner(wskprops) {
+
+ (wp, assetHelper) =>
+ implicit val wskprops = wp // shadow global props and make implicit
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "once"
+
+ // the package alarms should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/alarms")
+ println("fetched package alarms")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/alarms", name)
+ }
+
+ val pastDate = System.currentTimeMillis - 5000
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "trigger_payload" -> "alarmTest".toJson,
+ "date" -> pastDate.toJson),
+ expectedExitCode = 246)
+ }
+ feedCreationResult.stderr should include(s"date parameter '${pastDate}' must be in the future")
+
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].