You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ja...@apache.org on 2018/08/20 23:18:55 UTC
[incubator-openwhisk-package-alarms] 01/01: create fire once action
to support firing trigger on a specific date
This is an automated email from the ASF dual-hosted git repository.
japetrsn pushed a commit to tag 1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-alarms.git
commit 1c7febdcb59705df76dc209fe7e1d585d5bfdc18
Author: Jason Peterson <ja...@us.ibm.com>
AuthorDate: Tue Nov 7 17:32:04 2017 -0500
create fire once action to support firing trigger on a specific date
---
.gitignore | 1 +
action/alarm.js | 63 +----
action/alarmFeed_package.json | 5 +
action/alarmOnce.js | 31 +++
action/alarmOnce_package.json | 5 +
action/alarmWebAction.js | 310 +++++--------------------
action/{package.json => alarmWeb_package.json} | 2 +-
action/lib/Database.js | 172 ++++++++++++++
action/{alarm.js => lib/common.js} | 75 +++---
installCatalog.sh | 31 ++-
package.json | 2 +-
provider/lib/utils.js | 89 ++++---
12 files changed, 410 insertions(+), 376 deletions(-)
diff --git a/.gitignore b/.gitignore
index ac4bde5..30c1445 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,4 @@ node_modules/
.env
.vscode
action/*.zip
+action/package.json
diff --git a/action/alarm.js b/action/alarm.js
index a5c217f..6e72985 100644
--- a/action/alarm.js
+++ b/action/alarm.js
@@ -1,4 +1,4 @@
-var request = require('request');
+const common = require('./lib/common');
function main(msg) {
@@ -14,74 +14,17 @@ function main(msg) {
var lifecycleEvent = msg.lifecycleEvent;
var endpoint = msg.apihost;
- var webparams = createWebParams(msg);
+ var webparams = common.createWebParams(msg);
var url = `https://${endpoint}/api/v1/web/whisk.system/alarmsWeb/alarmWebAction.http`;
if (lifecycleEvent in eventMap) {
var method = eventMap[lifecycleEvent];
- return requestHelper(url, webparams, method);
+ return common.requestHelper(url, webparams, method);
} else {
return Promise.reject('unsupported lifecycleEvent');
}
}
-function requestHelper(url, input, method) {
-
- return new Promise(function(resolve, reject) {
-
- request({
- method : method,
- url : url,
- json: input,
- rejectUnauthorized: false
- }, function(error, response, body) {
-
- if (!error && response.statusCode === 200) {
- resolve(body);
- }
- else {
- if (response) {
- console.log('alarm: Error invoking whisk action:', response.statusCode, body);
- reject(body);
- }
- else {
- console.log('alarm: Error invoking whisk action:', error);
- reject(error);
- }
- }
- });
- });
-}
-
-function createWebParams(rawParams) {
- var namespace = process.env.__OW_NAMESPACE;
- var triggerName = '/' + namespace + '/' + parseQName(rawParams.triggerName).name;
-
- var webparams = Object.assign({}, rawParams);
- delete webparams.lifecycleEvent;
- delete webparams.apihost;
-
- webparams.triggerName = triggerName;
-
- return webparams;
-}
-
-function parseQName(qname) {
- var parsed = {};
- var delimiter = '/';
- var defaultNamespace = '_';
- if (qname && qname.charAt(0) === delimiter) {
- var parts = qname.split(delimiter);
- parsed.namespace = parts[1];
- parsed.name = parts.length > 2 ? parts.slice(2).join(delimiter) : '';
- } else {
- parsed.namespace = defaultNamespace;
- parsed.name = qname;
- }
- return parsed;
-}
exports.main = main;
-
-
diff --git a/action/alarmFeed_package.json b/action/alarmFeed_package.json
new file mode 100644
index 0000000..8ae9040
--- /dev/null
+++ b/action/alarmFeed_package.json
@@ -0,0 +1,5 @@
+{
+ "name": "alarmFeed",
+ "version": "1.0.0",
+ "main": "alarm.js"
+}
diff --git a/action/alarmOnce.js b/action/alarmOnce.js
new file mode 100644
index 0000000..38a7ebc
--- /dev/null
+++ b/action/alarmOnce.js
@@ -0,0 +1,31 @@
+const common = require('./lib/common');
+
+function main(msg) {
+
+ let eventMap = {
+ CREATE: 'put',
+ READ: 'get',
+ // UPDATE: 'put',
+ DELETE: 'delete'
+ };
+ // for creation -> CREATE
+ // for reading -> READ
+ // for deletion -> DELETE
+ var lifecycleEvent = msg.lifecycleEvent;
+
+ var endpoint = msg.apihost;
+ var webparams = common.createWebParams(msg);
+ webparams.fireOnce = true;
+
+ var url = `https://${endpoint}/api/v1/web/whisk.system/alarmsWeb/alarmWebAction.http`;
+
+ if (lifecycleEvent in eventMap) {
+ var method = eventMap[lifecycleEvent];
+ return common.requestHelper(url, webparams, method);
+ } else {
+ return Promise.reject('unsupported lifecycleEvent');
+ }
+}
+
+
+exports.main = main;
diff --git a/action/alarmOnce_package.json b/action/alarmOnce_package.json
new file mode 100644
index 0000000..e48cd14
--- /dev/null
+++ b/action/alarmOnce_package.json
@@ -0,0 +1,5 @@
+{
+ "name": "alarmOnce",
+ "version": "1.0.0",
+ "main": "alarmOnce.js"
+}
diff --git a/action/alarmWebAction.js b/action/alarmWebAction.js
index 31ed6fd..41d7917 100644
--- a/action/alarmWebAction.js
+++ b/action/alarmWebAction.js
@@ -1,38 +1,27 @@
-var request = require('request');
-var CronJob = require('cron').CronJob;
+var cronParser = require('cron-parser');
var moment = require('moment');
+const common = require('./lib/common');
+const Database = require('./lib/Database');
+
function main(params) {
if (!params.authKey) {
- return sendError(400, 'no authKey parameter was provided');
+ return common.sendError(400, 'no authKey parameter was provided');
}
if (!params.triggerName) {
- return sendError(400, 'no trigger name parameter was provided');
+ return common.sendError(400, 'no trigger name parameter was provided');
}
- var triggerParts = parseQName(params.triggerName);
+ var triggerParts = common.parseQName(params.triggerName);
var triggerID = `${params.authKey}/${triggerParts.namespace}/${triggerParts.name}`;
-
var triggerURL = `https://${params.apihost}/api/v1/namespaces/${triggerParts.namespace}/triggers/${triggerParts.name}`;
- var nano = require('nano')(params.DB_URL);
- var db = nano.db.use(params.DB_NAME);
var workers = params.workers instanceof Array ? params.workers : [];
+ var db;
if (params.__ow_method === "put") {
- if (!params.cron) {
- return sendError(400, 'alarms trigger feed is missing the cron parameter');
- }
- else {
- try {
- new CronJob(params.cron, function() {});
- } catch(ex) {
- return sendError(400, `cron pattern '${params.cron}' is not valid`);
- }
- }
-
if (typeof params.trigger_payload === 'string') {
params.trigger_payload = {payload: params.trigger_payload};
}
@@ -41,24 +30,55 @@ function main(params) {
apikey: params.authKey,
name: triggerParts.name,
namespace: triggerParts.namespace,
- cron: params.cron,
payload: params.trigger_payload || {},
- maxTriggers: params.maxTriggers || -1,
status: {
'active': true,
'dateChanged': Date.now()
}
};
+ if (params.fireOnce) {
+ if (!params.date) {
+ return common.sendError(400, 'alarms once trigger feed is missing the date parameter');
+ }
+ else {
+ var date = new Date(params.date);
+ if (isNaN(date.getTime())) {
+ return common.sendError(400, `date parameter '${params.date}' is not a valid Date`);
+ }
+ else if (Date.now() >= date.getTime()) {
+ return common.sendError(400, `date parameter '${params.date}' must be in the future`);
+ }
+ else {
+ newTrigger.date = params.date;
+ }
+ }
+ }
+ else {
+ if (!params.cron) {
+ return common.sendError(400, 'alarms trigger feed is missing the cron parameter');
+ }
+ else {
+ try {
+ cronParser.parseExpression(params.cron);
+ newTrigger.cron = params.cron;
+ newTrigger.maxTriggers = params.maxTriggers || -1;
+ } catch(ex) {
+ return common.sendError(400, `cron pattern '${params.cron}' is not valid`);
+ }
+ }
+ }
+
return new Promise(function (resolve, reject) {
- verifyTriggerAuth(triggerURL, params.authKey, false)
+ common.verifyTriggerAuth(triggerURL, params.authKey, false)
.then(() => {
- return getWorkerID(db, workers);
+ db = new Database(params.DB_URL, params.DB_NAME);
+ return db.getWorkerID(workers);
})
.then((worker) => {
console.log('trigger will be assigned to worker ' + worker);
newTrigger.worker = worker;
- return createTrigger(db, triggerID, newTrigger);
+ return db.createTrigger(triggerID, newTrigger);
})
.then(() => {
resolve({
@@ -75,16 +95,16 @@ function main(params) {
}
else if (params.__ow_method === "get") {
return new Promise(function (resolve, reject) {
- verifyTriggerAuth(triggerURL, params.authKey, false)
+ common.verifyTriggerAuth(triggerURL, params.authKey, false)
.then(() => {
- return getTrigger(db, triggerID);
+ db = new Database(params.DB_URL, params.DB_NAME);
+ return db.getTrigger(triggerID);
})
.then(doc => {
var body = {
config: {
name: doc.name,
namespace: doc.namespace,
- cron: doc.cron,
payload: doc.payload
},
status: {
@@ -94,6 +114,12 @@ function main(params) {
reason: doc.status.reason
}
};
+ if (doc.date) {
+ body.config.date = doc.date;
+ }
+ else {
+ body.config.cron = doc.cron;
+ }
resolve({
statusCode: 200,
headers: {'Content-Type': 'application/json'},
@@ -108,12 +134,13 @@ function main(params) {
else if (params.__ow_method === "delete") {
return new Promise(function (resolve, reject) {
- verifyTriggerAuth(triggerURL, params.authKey, true)
+ common.verifyTriggerAuth(triggerURL, params.authKey, true)
.then(() => {
- return updateTrigger(db, triggerID, 0);
+ db = new Database(params.DB_URL, params.DB_NAME);
+ return db.updateTrigger(triggerID, 0);
})
.then(id => {
- return deleteTrigger(db, id, 0);
+ return db.deleteTrigger(id, 0);
})
.then(() => {
resolve({
@@ -128,231 +155,10 @@ function main(params) {
});
}
else {
- return sendError(400, 'unsupported lifecycleEvent');
+ return common.sendError(400, 'unsupported lifecycleEvent');
}
}
-function getWorkerID(db, availabeWorkers) {
-
- return new Promise((resolve, reject) => {
- var workerID = availabeWorkers[0] || 'worker0';
-
- if (availabeWorkers.length > 1) {
- db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) {
- if (!err) {
- var triggersByWorker = {};
-
- availabeWorkers.forEach(worker => {
- triggersByWorker[worker] = 0;
- });
-
- body.rows.forEach(row => {
- if (row.key in triggersByWorker) {
- triggersByWorker[row.key] = row.value;
- }
- });
-
- // find which worker has the least number of assigned triggers
- for (var worker in triggersByWorker) {
- if (triggersByWorker[worker] < triggersByWorker[workerID]) {
- workerID = worker;
- }
- }
- resolve(workerID);
- } else {
- reject(err);
- }
- });
- }
- else {
- resolve(workerID);
- }
- });
-}
-
-function createTrigger(triggerDB, triggerID, newTrigger) {
-
- return new Promise(function(resolve, reject) {
-
- triggerDB.insert(newTrigger, triggerID, function (err) {
- if (!err) {
- resolve();
- }
- else {
- reject(sendError(err.statusCode, 'error creating alarm trigger.', err.message));
- }
- });
- });
-}
-
-function getTrigger(triggerDB, triggerID, retry = true) {
-
- return new Promise(function(resolve, reject) {
-
- triggerDB.get(triggerID, function (err, existing) {
- if (err) {
- if (retry) {
- var parts = triggerID.split('/');
- var id = parts[0] + '/_/' + parts[2];
- getTrigger(triggerDB, id, false)
- .then(doc => {
- resolve(doc);
- })
- .catch(err => {
- reject(err);
- });
- } else {
- reject(sendError(err.statusCode, 'could not find the trigger in the database'));
- }
- } else {
- resolve(existing);
- }
- });
- });
-}
-
-function updateTrigger(triggerDB, triggerID, retryCount) {
-
- return new Promise(function(resolve, reject) {
-
- triggerDB.get(triggerID, function (err, existing) {
- if (!err) {
- var updatedTrigger = existing;
- updatedTrigger.status = {'active': false};
-
- triggerDB.insert(updatedTrigger, triggerID, function (err) {
- if (err) {
- if (err.statusCode === 409 && retryCount < 5) {
- setTimeout(function () {
- updateTrigger(triggerDB, triggerID, (retryCount + 1))
- .then(id => {
- resolve(id);
- })
- .catch(err => {
- reject(err);
- });
- }, 1000);
- }
- else {
- reject(sendError(err.statusCode, 'there was an error while marking the trigger for delete in the database.', err.message));
- }
- }
- else {
- resolve(triggerID);
- }
- });
- }
- else {
- //legacy alarms triggers may have been created with _ namespace
- if (retryCount === 0) {
- var parts = triggerID.split('/');
- var id = parts[0] + '/_/' + parts[2];
- updateTrigger(triggerDB, id, (retryCount + 1))
- .then(id => {
- resolve(id);
- })
- .catch(err => {
- reject(err);
- });
- }
- else {
- reject(sendError(err.statusCode, 'could not find the trigger in the database'));
- }
- }
- });
- });
-}
-
-function deleteTrigger(triggerDB, triggerID, retryCount) {
-
- return new Promise(function(resolve, reject) {
-
- triggerDB.get(triggerID, function (err, existing) {
- if (!err) {
- triggerDB.destroy(existing._id, existing._rev, function (err) {
- if (err) {
- if (err.statusCode === 409 && retryCount < 5) {
- setTimeout(function () {
- deleteTrigger(triggerDB, triggerID, (retryCount + 1))
- .then(resolve)
- .catch(err => {
- reject(err);
- });
- }, 1000);
- }
- else {
- reject(sendError(err.statusCode, 'there was an error while deleting the trigger from the database.', err.message));
- }
- }
- else {
- resolve();
- }
- });
- }
- else {
- reject(sendError(err.statusCode, 'could not find the trigger in the database'));
- }
- });
- });
-}
-
-function verifyTriggerAuth(triggerURL, authKey, isDelete) {
- var auth = authKey.split(':');
-
- return new Promise(function(resolve, reject) {
-
- request({
- method: 'get',
- url: triggerURL,
- auth: {
- user: auth[0],
- pass: auth[1]
- },
- rejectUnauthorized: false
- }, function(err, response) {
- if (err) {
- reject(sendError(400, 'Trigger authentication request failed.', err.message));
- }
- else if(response.statusCode >= 400 && !(isDelete && response.statusCode === 404)) {
- reject(sendError(response.statusCode, 'Trigger authentication request failed.'));
- }
- else {
- resolve();
- }
- });
- });
-}
-
-function sendError(statusCode, error, message) {
- var params = {error: error};
- if (message) {
- params.message = message;
- }
-
- return {
- statusCode: statusCode,
- headers: { 'Content-Type': 'application/json' },
- body: new Buffer(JSON.stringify(params)).toString('base64')
- };
-}
-
-
-function parseQName(qname) {
- var parsed = {};
- var delimiter = '/';
- var defaultNamespace = '_';
- if (qname && qname.charAt(0) === delimiter) {
- var parts = qname.split(delimiter);
- parsed.namespace = parts[1];
- parsed.name = parts.length > 2 ? parts.slice(2).join(delimiter) : '';
- } else {
- parsed.namespace = defaultNamespace;
- parsed.name = qname;
- }
- return parsed;
-}
-
-
exports.main = main;
diff --git a/action/package.json b/action/alarmWeb_package.json
similarity index 79%
rename from action/package.json
rename to action/alarmWeb_package.json
index e80f7af..1a9c164 100644
--- a/action/package.json
+++ b/action/alarmWeb_package.json
@@ -3,6 +3,6 @@
"version": "1.0.0",
"main": "alarmWebAction.js",
"dependencies" : {
- "cron": "^1.2.1"
+ "cron-parser": "^2.4.3"
}
}
diff --git a/action/lib/Database.js b/action/lib/Database.js
new file mode 100644
index 0000000..e1fdb27
--- /dev/null
+++ b/action/lib/Database.js
@@ -0,0 +1,172 @@
+const common = require('./common');
+
+// constructor for DB object - a thin, promise-loving wrapper around nano
+module.exports = function(dbURL, dbName) {
+ var nano = require('nano')(dbURL);
+ this.db = nano.db.use(dbName);
+ var utilsDB = this;
+
+ this.getWorkerID = function(availabeWorkers) {
+
+ return new Promise((resolve, reject) => {
+ var workerID = availabeWorkers[0] || 'worker0';
+
+ if (availabeWorkers.length > 1) {
+ utilsDB.db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) {
+ if (!err) {
+ var triggersByWorker = {};
+
+ availabeWorkers.forEach(worker => {
+ triggersByWorker[worker] = 0;
+ });
+
+ body.rows.forEach(row => {
+ if (row.key in triggersByWorker) {
+ triggersByWorker[row.key] = row.value;
+ }
+ });
+
+ // find which worker has the least number of assigned triggers
+ for (var worker in triggersByWorker) {
+ if (triggersByWorker[worker] < triggersByWorker[workerID]) {
+ workerID = worker;
+ }
+ }
+ resolve(workerID);
+ } else {
+ reject(err);
+ }
+ });
+ }
+ else {
+ resolve(workerID);
+ }
+ });
+ };
+
+ this.createTrigger = function(triggerID, newTrigger) {
+
+ return new Promise(function(resolve, reject) {
+
+ utilsDB.db.insert(newTrigger, triggerID, function (err) {
+ if (!err) {
+ resolve();
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'error creating alarm trigger.', err.message));
+ }
+ });
+ });
+ };
+
+ this.getTrigger = function(triggerID, retry = true) {
+
+ return new Promise(function(resolve, reject) {
+
+ utilsDB.db.get(triggerID, function (err, existing) {
+ if (err) {
+ if (retry) {
+ var parts = triggerID.split('/');
+ var id = parts[0] + '/_/' + parts[2];
+ utilsDB.getTrigger(id, false)
+ .then(doc => {
+ resolve(doc);
+ })
+ .catch(err => {
+ reject(err);
+ });
+ } else {
+ reject(common.sendError(err.statusCode, 'could not find the trigger in the database'));
+ }
+ } else {
+ resolve(existing);
+ }
+ });
+ });
+ };
+
+ this.updateTrigger = function(triggerID, retryCount) {
+
+ return new Promise(function(resolve, reject) {
+
+ utilsDB.db.get(triggerID, function (err, existing) {
+ if (!err) {
+ var updatedTrigger = existing;
+ updatedTrigger.status = {'active': false};
+
+ utilsDB.db.insert(updatedTrigger, triggerID, function (err) {
+ if (err) {
+ if (err.statusCode === 409 && retryCount < 5) {
+ setTimeout(function () {
+ utilsDB.updateTrigger(triggerID, (retryCount + 1))
+ .then(id => {
+ resolve(id);
+ })
+ .catch(err => {
+ reject(err);
+ });
+ }, 1000);
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'there was an error while marking the trigger for delete in the database.', err.message));
+ }
+ }
+ else {
+ resolve(triggerID);
+ }
+ });
+ }
+ else {
+ //legacy alarms triggers may have been created with _ namespace
+ if (retryCount === 0) {
+ var parts = triggerID.split('/');
+ var id = parts[0] + '/_/' + parts[2];
+ utilsDB.updateTrigger(id, (retryCount + 1))
+ .then(id => {
+ resolve(id);
+ })
+ .catch(err => {
+ reject(err);
+ });
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'could not find the trigger in the database'));
+ }
+ }
+ });
+ });
+ };
+
+ this.deleteTrigger = function(triggerID, retryCount) {
+
+ return new Promise(function(resolve, reject) {
+
+ utilsDB.db.get(triggerID, function (err, existing) {
+ if (!err) {
+ utilsDB.db.destroy(existing._id, existing._rev, function (err) {
+ if (err) {
+ if (err.statusCode === 409 && retryCount < 5) {
+ setTimeout(function () {
+ utilsDB.deleteTrigger(triggerID, (retryCount + 1))
+ .then(resolve)
+ .catch(err => {
+ reject(err);
+ });
+ }, 1000);
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'there was an error while deleting the trigger from the database.', err.message));
+ }
+ }
+ else {
+ resolve();
+ }
+ });
+ }
+ else {
+ reject(common.sendError(err.statusCode, 'could not find the trigger in the database'));
+ }
+ });
+ });
+ };
+};
diff --git a/action/alarm.js b/action/lib/common.js
similarity index 53%
copy from action/alarm.js
copy to action/lib/common.js
index a5c217f..a6c1477 100644
--- a/action/alarm.js
+++ b/action/lib/common.js
@@ -1,30 +1,4 @@
-var request = require('request');
-
-function main(msg) {
-
- let eventMap = {
- CREATE: 'put',
- READ: 'get',
- // UPDATE: 'put',
- DELETE: 'delete'
- };
- // for creation -> CREATE
- // for reading -> READ
- // for deletion -> DELETE
- var lifecycleEvent = msg.lifecycleEvent;
-
- var endpoint = msg.apihost;
- var webparams = createWebParams(msg);
-
- var url = `https://${endpoint}/api/v1/web/whisk.system/alarmsWeb/alarmWebAction.http`;
-
- if (lifecycleEvent in eventMap) {
- var method = eventMap[lifecycleEvent];
- return requestHelper(url, webparams, method);
- } else {
- return Promise.reject('unsupported lifecycleEvent');
- }
-}
+const request = require('request');
function requestHelper(url, input, method) {
@@ -67,6 +41,33 @@ function createWebParams(rawParams) {
return webparams;
}
+function verifyTriggerAuth(triggerURL, authKey, isDelete) {
+ var auth = authKey.split(':');
+
+ return new Promise(function(resolve, reject) {
+
+ request({
+ method: 'get',
+ url: triggerURL,
+ auth: {
+ user: auth[0],
+ pass: auth[1]
+ },
+ rejectUnauthorized: false
+ }, function(err, response) {
+ if (err) {
+ reject(sendError(400, 'Trigger authentication request failed.', err.message));
+ }
+ else if(response.statusCode >= 400 && !(isDelete && response.statusCode === 404)) {
+ reject(sendError(response.statusCode, 'Trigger authentication request failed.'));
+ }
+ else {
+ resolve();
+ }
+ });
+ });
+}
+
function parseQName(qname) {
var parsed = {};
var delimiter = '/';
@@ -82,6 +83,24 @@ function parseQName(qname) {
return parsed;
}
-exports.main = main;
+function sendError(statusCode, error, message) {
+ var params = {error: error};
+ if (message) {
+ params.message = message;
+ }
+
+ return {
+ statusCode: statusCode,
+ headers: { 'Content-Type': 'application/json' },
+ body: new Buffer(JSON.stringify(params)).toString('base64')
+ };
+}
+module.exports = {
+ 'requestHelper': requestHelper,
+ 'createWebParams': createWebParams,
+ 'verifyTriggerAuth': verifyTriggerAuth,
+ 'parseQName': parseQName,
+ 'sendError': sendError
+};
diff --git a/installCatalog.sh b/installCatalog.sh
index 41fd83b..63fde64 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -55,10 +55,35 @@ $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared yes ala
-p cron '' \
-p trigger_payload ''
-$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarms/alarm "$PACKAGE_HOME/action/alarm.js" \
+# make alarmFeed.zip
+cd action
+
+if [ -e alarmFeed.zip ]
+then
+ rm -rf alarmFeed.zip
+fi
+
+cp -f alarmFeed_package.json package.json
+zip -r alarmFeed.zip lib package.json alarm.js
+
+$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarms/alarm "$PACKAGE_HOME/action/alarmFeed.zip" \
-a description 'Fire trigger when alarm occurs' \
-a feed true
+
+# make alarmOnce.zip
+if [ -e alarmOnce.zip ]
+then
+ rm -rf alarmOnce.zip
+fi
+
+cp -f alarmOnce_package.json package.json
+zip -r alarmOnce.zip lib package.json alarmOnce.js
+
+$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarms/once "$PACKAGE_HOME/action/alarmOnce.zip" \
+ -a description 'Fire trigger once when alarm occurs' \
+ -a feed true
+
if [ -n "$WORKERS" ];
then
$WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no alarmsWeb \
@@ -74,7 +99,7 @@ else
fi
# make alarmWebAction.zip
-cd action
+cp -f alarmWeb_package.json package.json
npm install
if [ -e alarmWebAction.zip ];
@@ -82,7 +107,7 @@ then
rm -rf alarmWebAction.zip
fi
-zip -r alarmWebAction.zip package.json alarmWebAction.js node_modules
+zip -r alarmWebAction.zip lib package.json alarmWebAction.js node_modules
$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarmsWeb/alarmWebAction "$PACKAGE_HOME/action/alarmWebAction.zip" \
-a description 'Create/Delete a trigger in alarms provider Database' \
diff --git a/package.json b/package.json
index 6331109..0b21780 100755
--- a/package.json
+++ b/package.json
@@ -6,7 +6,7 @@
"license": "ISC",
"dependencies": {
"body-parser": "^1.15.0",
- "cron": "^1.1.0",
+ "node-schedule": "^1.2.5",
"express": "^4.13.4",
"lodash": "^4.5.0",
"nano": "6.4.2",
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index c804cb8..ff4ae05 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -1,6 +1,6 @@
var _ = require('lodash');
var request = require('request');
-var CronJob = require('cron').CronJob;
+var schedule = require('node-schedule');
var HttpStatus = require('http-status-codes');
var constants = require('./constants.js');
@@ -35,34 +35,49 @@ module.exports = function(
try {
return new Promise(function(resolve, reject) {
- var cronHandle = new CronJob(newTrigger.cron,
- function onTick() {
- if (utils.activeHost === utils.host) {
- var triggerHandle = utils.triggers[triggerIdentifier];
- if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
- try {
- utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
- } catch (e) {
- logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
- }
+ var cachedTrigger = {
+ apikey: newTrigger.apikey,
+ name: newTrigger.name,
+ namespace: newTrigger.namespace
+ };
+
+ var cron;
+ if (newTrigger.date) {
+ cron = new Date(newTrigger.date);
+ if (cron.getTime() > Date.now()) {
+ logger.info(method, 'Creating a fire once alarms trigger', triggerIdentifier);
+ cachedTrigger.date = newTrigger.date;
+ }
+ else {
+ return reject("the fire once date has expired");
+ }
+ }
+ else {
+ cron = newTrigger.cron;
+ var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
+ cachedTrigger.triggersLeft = maxTriggers;
+ cachedTrigger.maxTriggers = maxTriggers;
+ cachedTrigger.cron = cron;
+ }
+
+ var cronHandle = new schedule.Job(function() {
+ if (utils.activeHost === utils.host) {
+ var triggerHandle = utils.triggers[triggerIdentifier];
+ if (triggerHandle && (!triggerHandle.maxTriggers || triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
+ try {
+ utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
+ } catch (e) {
+ logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
}
}
}
- );
+ });
logger.info(method, triggerIdentifier, 'starting cron job');
- cronHandle.start();
+ cronHandle.schedule(cron);
- var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
+ cachedTrigger.cronHandle = cronHandle;
+ utils.triggers[triggerIdentifier] = cachedTrigger;
- utils.triggers[triggerIdentifier] = {
- cron: newTrigger.cron,
- cronHandle: cronHandle,
- triggersLeft: maxTriggers,
- maxTriggers: maxTriggers,
- apikey: newTrigger.apikey,
- name: newTrigger.name,
- namespace: newTrigger.namespace
- };
resolve(triggerIdentifier);
});
} catch (err) {
@@ -83,13 +98,11 @@ module.exports = function(
utils.postTrigger(dataTrigger, payload, uri, auth, 0)
.then(triggerId => {
logger.info(method, 'Trigger', triggerId, 'was successfully fired');
- if (dataTrigger.triggersLeft === 0) {
- utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after reaching max triggers');
- logger.warn(method, 'no more triggers left, disabled', triggerIdentifier);
- }
+ utils.disableExtinctTriggers(triggerIdentifier, dataTrigger);
})
.catch(err => {
logger.error(method, err);
+ utils.disableExtinctTriggers(triggerIdentifier, dataTrigger);
});
};
@@ -99,7 +112,7 @@ module.exports = function(
return new Promise(function(resolve, reject) {
// only manage trigger fires if they are not infinite
- if (dataTrigger.maxTriggers !== -1) {
+ if (dataTrigger.maxTriggers && dataTrigger.maxTriggers !== -1) {
dataTrigger.triggersLeft--;
}
@@ -118,7 +131,7 @@ module.exports = function(
if (error || response.statusCode >= 400) {
// only manage trigger fires if they are not infinite
- if (dataTrigger.maxTriggers !== -1) {
+ if (dataTrigger.maxTriggers && dataTrigger.maxTriggers !== -1) {
dataTrigger.triggersLeft++;
}
logger.error(method, 'there was an error invoking', triggerIdentifier, response ? response.statusCode : error);
@@ -145,7 +158,7 @@ module.exports = function(
}
}
} else {
- logger.info(method, 'fired', triggerIdentifier, dataTrigger.triggersLeft, 'triggers left');
+ logger.info(method, 'fired', triggerIdentifier);
resolve(triggerIdentifier);
}
}
@@ -161,6 +174,20 @@ module.exports = function(
[HttpStatus.REQUEST_TIMEOUT, HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1);
};
+ this.disableExtinctTriggers = function(triggerIdentifier, dataTrigger) {
+ var method = 'disableExtinctTriggers';
+
+ if (dataTrigger.date) {
+ utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after firing once');
+ logger.info(method, 'the fire once date has expired, disabled', triggerIdentifier);
+ }
+ else if (dataTrigger.maxTriggers && dataTrigger.triggersLeft === 0) {
+ utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after reaching max triggers');
+ logger.warn(method, 'no more triggers left, disabled', triggerIdentifier);
+ }
+
+ };
+
this.disableTrigger = function(triggerIdentifier, statusCode, message) {
var method = 'disableTrigger';
@@ -198,7 +225,7 @@ module.exports = function(
if (utils.triggers[triggerIdentifier]) {
if (utils.triggers[triggerIdentifier].cronHandle) {
- utils.triggers[triggerIdentifier].cronHandle.stop();
+ utils.triggers[triggerIdentifier].cronHandle.cancel();
}
delete utils.triggers[triggerIdentifier];
logger.info(method, 'trigger', triggerIdentifier, 'successfully deleted from memory');