You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2018/03/15 14:41:51 UTC
[incubator-openwhisk-package-cloudant] branch master updated: self
monitoring support (#161)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-cloudant.git
The following commit(s) were added to refs/heads/master by this push:
new 054c198 self monitoring support (#161)
054c198 is described below
commit 054c198b1e722e7cd562d7689ed9b4536aa74b4f
Author: Jason Peterson <ja...@us.ibm.com>
AuthorDate: Thu Mar 15 10:41:49 2018 -0400
self monitoring support (#161)
---
provider/app.js | 30 ++++++-
provider/lib/active.js | 4 +-
provider/lib/constants.js | 8 +-
provider/lib/health.js | 194 +++++++++++++++++++++++++++++++++++++++++++++-
provider/lib/utils.js | 74 +++++++++---------
5 files changed, 266 insertions(+), 44 deletions(-)
diff --git a/provider/app.js b/provider/app.js
index 2deee1d..0bfd33d 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -25,7 +25,6 @@ app.set('port', process.env.PORT || 8080);
// Allow invoking servers with self-signed certificates.
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
-
// If it does not already exist, create the triggers database. This is the database that will
// store the managed triggers.
var dbUsername = process.env.DB_USERNAME;
@@ -35,6 +34,8 @@ var dbProtocol = process.env.DB_PROTOCOL;
var dbPrefix = process.env.DB_PREFIX;
var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
var redisUrl = process.env.REDIS_URL;
+var monitoringAuth = process.env.MONITORING_AUTH;
+var monitoringInterval = process.env.MONITORING_INTERVAL;
var filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
var viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
@@ -74,7 +75,7 @@ function createDatabase() {
};
createDesignDoc(nano.db.use(databaseName), viewDDName, viewDD)
- .then((db) => {
+ .then(db => {
var filterDD = {
filters: {
triggers_by_worker:
@@ -86,6 +87,22 @@ function createDatabase() {
};
return createDesignDoc(db, filterDDName, filterDD);
})
+ .then(db => {
+ if (monitoringAuth) {
+ var filterDD = {
+ filters: {
+ canary_docs:
+ function (doc, req) {
+ return doc.isCanaryDoc && doc.host === req.query.host;
+ }.toString()
+ }
+ };
+ return createDesignDoc(db, '_design/' + constants.MONITOR_DESIGN_DOC, filterDD);
+ }
+ else {
+ return Promise.resolve(db);
+ }
+ })
.then((db) => {
resolve(db);
})
@@ -146,7 +163,6 @@ function createRedisClient() {
client = redis.createClient(redisUrl);
}
-
client.on('connect', function () {
resolve(client);
});
@@ -187,7 +203,7 @@ function init(server) {
})
.then(() => {
var providerRAS = new ProviderRAS();
- var providerHealth = new ProviderHealth(providerUtils);
+ var providerHealth = new ProviderHealth(logger, providerUtils);
var providerActivation = new ProviderActivation(logger, providerUtils);
// RAS Endpoint
@@ -200,6 +216,12 @@ function init(server) {
app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
providerUtils.initAllTriggers();
+
+ if (monitoringAuth) {
+ setInterval(function () {
+ providerHealth.monitor(monitoringAuth);
+ }, monitoringInterval || constants.MONITOR_INTERVAL);
+ }
})
.catch(err => {
logger.error(method, 'an error occurred creating database:', err);
diff --git a/provider/lib/active.js b/provider/lib/active.js
index 66efdcd..9f8a0fc 100644
--- a/provider/lib/active.js
+++ b/provider/lib/active.js
@@ -3,13 +3,15 @@ module.exports = function(logger, utils) {
// Active Endpoint
this.endPoint = '/active';
+ var hostMachine = process.env.HOST_MACHINE;
+
this.active = function(req, res) {
var method = 'active';
var response = {
worker: utils.worker,
host: utils.host,
- hostMachine: utils.hostMachine,
+ hostMachine: hostMachine,
active: utils.host === utils.activeHost
};
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index 0203027..539fd99 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -5,7 +5,10 @@ const RETRY_DELAY = 1000; //in milliseconds
const REDIS_KEY = 'active';
const FILTERS_DESIGN_DOC = 'triggerFilters';
const VIEWS_DESIGN_DOC = 'triggerViews';
+const MONITOR_DESIGN_DOC = 'monitorFilters';
const TRIGGERS_BY_WORKER = 'triggers_by_worker';
+const DOCS_FOR_MONITOR = 'canary_docs';
+const MONITOR_INTERVAL = 5 * 1000 * 60; //in milliseconds
module.exports = {
@@ -16,5 +19,8 @@ module.exports = {
REDIS_KEY: REDIS_KEY,
FILTERS_DESIGN_DOC: FILTERS_DESIGN_DOC,
VIEWS_DESIGN_DOC: VIEWS_DESIGN_DOC,
- TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER
+ TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER,
+ MONITOR_INTERVAL: MONITOR_INTERVAL,
+ MONITOR_DESIGN_DOC: MONITOR_DESIGN_DOC,
+ DOCS_FOR_MONITOR: DOCS_FOR_MONITOR
};
diff --git a/provider/lib/health.js b/provider/lib/health.js
index bbdc01d..eb2832d 100644
--- a/provider/lib/health.js
+++ b/provider/lib/health.js
@@ -1,11 +1,21 @@
var si = require('systeminformation');
var v8 = require('v8');
+var request = require('request');
+var _ = require('lodash');
+var URL = require('url').URL;
+var constants = require('./constants.js');
-module.exports = function(utils) {
+module.exports = function(logger, utils) {
// Health Endpoint
this.endPoint = '/health';
+ var triggerName;
+ var canaryDocID;
+ var monitorStatus;
+ var monitorStages = ['triggerStarted', 'triggerFired', 'triggerStopped'];
+ var healthMonitor = this;
+
// Health Logic
this.health = function (req, res) {
@@ -20,13 +30,13 @@ module.exports = function(utils) {
si.inetLatency(utils.routerHost)
])
.then(results => {
+ stats.triggerMonitor = monitorStatus;
stats.memory = results[0];
- stats.cpu = results[1];
+ stats.cpu = _.omit(results[1], 'cpus');
stats.disk = results[2];
stats.network = results[3];
stats.apiHostLatency = results[4];
stats.heapStatistics = v8.getHeapStatistics();
- stats.heapSpaceStatistics =v8.getHeapSpaceStatistics();
res.send(stats);
})
.catch(error => {
@@ -35,4 +45,182 @@ module.exports = function(utils) {
});
};
+ this.monitor = function(apikey) {
+ var method = 'monitor';
+
+ var auth = apikey.split(':');
+
+ if (triggerName) {
+ monitorStatus = Object.assign({}, utils.monitorStatus);
+ utils.monitorStatus = {};
+
+ var monitorStatusSize = Object.keys(monitorStatus).length;
+ if (monitorStatusSize < 5) {
+ //we have a failure in one of the stages
+ var stageFailed = monitorStages[monitorStatusSize - 2];
+ monitorStatus[stageFailed] = 'failed';
+ }
+ var existingTriggerID = `:_:${triggerName}`;
+ var existingCanaryID = canaryDocID;
+
+ //delete trigger feed from database
+ healthMonitor.deleteDocFromDB(existingTriggerID, 0);
+
+ //delete the trigger
+ var uri = utils.uriHost + '/api/v1/namespaces/_/triggers/' + triggerName;
+ healthMonitor.deleteTrigger(existingTriggerID, uri, auth, 0);
+
+ //delete the canary doc
+ healthMonitor.deleteDocFromDB(existingCanaryID, 0);
+ }
+
+ //create new cloudant trigger and canary doc
+ var docSuffix = utils.worker + utils.host + '_' + Date.now();
+ triggerName = 'cloudant_' + docSuffix;
+ canaryDocID = 'canary_' + docSuffix;
+
+ //update status monitor object
+ utils.monitorStatus.triggerName = triggerName;
+ utils.monitorStatus.triggerType = 'changes';
+
+ var triggerURL = utils.uriHost + '/api/v1/namespaces/_/triggers/' + triggerName;
+ var triggerID = `:_:${triggerName}`;
+ healthMonitor.createTrigger(triggerURL, auth)
+ .then(info => {
+ logger.info(method, triggerID, info);
+ var newTrigger = healthMonitor.createCloudantTrigger(triggerID, apikey);
+ healthMonitor.createDocInDB(triggerID, newTrigger);
+ })
+ .catch(err => {
+ logger.error(method, triggerID, err);
+ });
+ };
+
+ this.createCloudantTrigger = function(triggerID, apikey) {
+ var method = 'createCloudantTrigger';
+
+ var dbURL = new URL(utils.db.config.url);
+ var dbName = utils.db.config.db;
+
+ var newTrigger = {
+ apikey: apikey,
+ id: triggerID,
+ host: dbURL.hostname,
+ port: dbURL.port,
+ protocol: dbURL.protocol.replace(':', ''),
+ dbname: dbName,
+ user: dbURL.username,
+ pass: dbURL.password,
+ filter: constants.MONITOR_DESIGN_DOC + '/' + constants.DOCS_FOR_MONITOR,
+ query_params: {host: utils.host},
+ maxTriggers: 1,
+ worker: utils.worker,
+ monitor: utils.host
+ };
+
+ return newTrigger;
+ };
+
+ this.createTrigger = function(triggerURL, auth) {
+ var method = 'createTrigger';
+
+ return new Promise(function(resolve, reject) {
+ request({
+ method: 'put',
+ uri: triggerURL,
+ auth: {
+ user: auth[0],
+ pass: auth[1]
+ },
+ json: true,
+ body: {}
+ }, function (error, response) {
+ if (error || response.statusCode >= 400) {
+ reject('monitoring trigger create request failed');
+ }
+ else {
+ resolve('monitoring trigger create request was successful');
+ }
+ });
+ });
+ };
+
+ this.createDocInDB = function(docID, doc) {
+ var method = 'createDocInDB';
+
+ utils.db.insert(doc, docID, function (err) {
+ if (!err) {
+ logger.info(method, docID, 'was successfully inserted');
+ if (doc.monitor) {
+ setTimeout(function () {
+ var canaryDoc = {
+ isCanaryDoc: true,
+ host: utils.host
+ };
+ healthMonitor.createDocInDB(canaryDocID, canaryDoc);
+ }, 1000 * 60);
+ }
+ }
+ else {
+ logger.error(method, docID, err);
+ }
+ });
+ };
+
+ this.deleteTrigger = function(triggerID, uri, auth, retryCount) {
+ var method = 'deleteTrigger';
+
+ request({
+ method: 'delete',
+ uri: uri,
+ auth: {
+ user: auth[0],
+ pass: auth[1]
+ },
+ }, function (error, response) {
+ logger.info(method, triggerID, 'http delete request, STATUS:', response ? response.statusCode : undefined);
+ if (error || response.statusCode >= 400) {
+ if (!error && response.statusCode === 409 && retryCount < 5) {
+ logger.info(method, 'attempting to delete trigger again', triggerID, 'Retry Count:', (retryCount + 1));
+ setTimeout(function () {
+ healthMonitor.deleteTrigger(triggerID, uri, auth, (retryCount + 1));
+ }, 1000);
+ } else {
+ logger.error(method, triggerID, 'trigger delete request failed');
+ }
+ }
+ else {
+ logger.info(method, triggerID, 'trigger delete request was successful');
+ }
+ });
+ };
+
+ this.deleteDocFromDB = function(docID, retryCount) {
+ var method = 'deleteDocFromDB';
+
+ //delete from database
+ utils.db.get(docID, function (err, existing) {
+ if (!err) {
+ utils.db.destroy(existing._id, existing._rev, function (err) {
+ if (err) {
+ if (err.statusCode === 409 && retryCount < 5) {
+ setTimeout(function () {
+ healthMonitor.deleteDocFromDB(docID, (retryCount + 1));
+ }, 1000);
+ }
+ else {
+ logger.error(method, docID, 'could not be deleted from the database');
+ }
+ }
+ else {
+ logger.info(method, docID, 'was successfully deleted from the database');
+ }
+ });
+ }
+ else {
+ logger.error(method, docID, 'could not be found in the database');
+ }
+ });
+ };
+
};
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 8446117..7906c36 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -1,25 +1,21 @@
-var _ = require('lodash');
var request = require('request');
var HttpStatus = require('http-status-codes');
var constants = require('./constants.js');
+module.exports = function(logger, triggerDB, redisClient) {
-module.exports = function(
- logger,
- triggerDB,
- redisClient
-) {
- this.module = 'utils';
this.triggers = {};
this.endpointAuth = process.env.ENDPOINT_AUTH;
this.routerHost = process.env.ROUTER_HOST || 'localhost';
this.worker = process.env.WORKER || 'worker0';
this.host = process.env.HOST_INDEX || 'host0';
- this.hostMachine = process.env.HOST_MACHINE;
this.activeHost = 'host0'; //default value on init (will be updated for existing redis)
+ this.db = triggerDB;
this.redisClient = redisClient;
- this.redisHash = triggerDB.config.db + '_' + this.worker;
+ this.redisHash = this.db.config.db + '_' + this.worker;
this.redisKey = constants.REDIS_KEY;
+ this.uriHost ='https://' + this.routerHost + ':443';
+ this.monitorStatus = {};
var retryAttempts = constants.RETRY_ATTEMPTS;
var filterDDName = constants.FILTERS_DESIGN_DOC;
@@ -56,16 +52,13 @@ module.exports = function(
utils.triggers[dataTrigger.id] = dataTrigger;
feed.on('change', function (change) {
- if (utils.activeHost === utils.host) {
+ var triggerHandle = utils.triggers[dataTrigger.id];
+ if (triggerHandle && utils.shouldFireTrigger(triggerHandle) && utils.hasTriggersRemaining(triggerHandle)) {
logger.info(method, 'Trigger', dataTrigger.id, 'got change from', dataTrigger.dbname);
-
- var triggerHandle = utils.triggers[dataTrigger.id];
- if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
- try {
- utils.fireTrigger(dataTrigger.id, change);
- } catch (e) {
- logger.error(method, 'Exception occurred while firing trigger', dataTrigger.id, e);
- }
+ try {
+ utils.fireTrigger(dataTrigger.id, change);
+ } catch (e) {
+ logger.error(method, 'Exception occurred while firing trigger', dataTrigger.id, e);
}
}
});
@@ -73,17 +66,15 @@ module.exports = function(
feed.follow();
return new Promise(function(resolve, reject) {
-
feed.on('error', function (err) {
logger.error(method,'Error occurred for trigger', dataTrigger.id, '(db ' + dataTrigger.dbname + '):', err);
reject(err);
});
- feed.on('confirm', function (dbObj) {
+ feed.on('confirm', function () {
logger.info(method, 'Added cloudant data trigger', dataTrigger.id, 'listening for changes in database', dataTrigger.dbname);
resolve(dataTrigger.id);
});
-
});
} catch (err) {
@@ -94,10 +85,6 @@ module.exports = function(
};
this.initTrigger = function(newTrigger) {
- var method = 'initTrigger';
-
- logger.info(method, 'create trigger', newTrigger.id, 'with the following args', newTrigger);
-
var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
var trigger = {
@@ -113,7 +100,8 @@ module.exports = function(
maxTriggers: maxTriggers,
triggersLeft: maxTriggers,
filter: newTrigger.filter,
- query_params: newTrigger.query_params
+ query_params: newTrigger.query_params,
+ monitor: newTrigger.monitor
};
return trigger;
@@ -124,6 +112,18 @@ module.exports = function(
[HttpStatus.REQUEST_TIMEOUT, HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1);
};
+ this.shouldFireTrigger = function(trigger) {
+ return trigger.monitor || utils.activeHost === utils.host;
+ };
+
+ this.hasTriggersRemaining = function(trigger) {
+ return !trigger.maxTriggers || trigger.maxTriggers === -1 || trigger.triggersLeft > 0;
+ };
+
+ this.isMonitoringTrigger = function(monitor, triggerIdentifier) {
+ return monitor && utils.monitorStatus.triggerName === utils.parseQName(triggerIdentifier).name;
+ };
+
this.disableTrigger = function(id, statusCode, message) {
var method = 'disableTrigger';
@@ -188,9 +188,12 @@ module.exports = function(
utils.postTrigger(dataTrigger, form, uri, auth, 0)
.then(triggerId => {
logger.info(method, 'Trigger', triggerId, 'was successfully fired');
+ if (utils.isMonitoringTrigger(dataTrigger.monitor, triggerId)) {
+ utils.monitorStatus.triggerFired = "success";
+ }
if (dataTrigger.triggersLeft === 0) {
- utils.disableTrigger(dataTrigger.id, undefined, 'Automatically disabled after reaching max triggers');
- logger.warn(method, 'no more triggers left, disabled', dataTrigger.id);
+ utils.disableTrigger(triggerId, undefined, 'Automatically disabled after reaching max triggers');
+ logger.warn(method, 'no more triggers left, disabled', triggerId);
}
})
.catch(err => {
@@ -273,7 +276,7 @@ module.exports = function(
var triggerIdentifier = trigger.id;
var doc = trigger.doc;
- if (!(triggerIdentifier in utils.triggers)) {
+ if (!(triggerIdentifier in utils.triggers) && !doc.monitor) {
//check if trigger still exists in whisk db
var triggerObj = utils.parseQName(triggerIdentifier);
var host = 'https://' + utils.routerHost + ':' + 443;
@@ -330,19 +333,23 @@ module.exports = function(
var triggerIdentifier = change.id;
var doc = change.doc;
- logger.info(method, 'got change for trigger', triggerIdentifier);
-
if (utils.triggers[triggerIdentifier]) {
if (doc.status && doc.status.active === false) {
utils.deleteTrigger(triggerIdentifier);
+ if (utils.isMonitoringTrigger(doc.monitor, triggerIdentifier)) {
+ utils.monitorStatus.triggerStopped = "success";
+ }
}
}
else {
//ignore changes to disabled triggers
- if (!doc.status || doc.status.active === true) {
+ if ((!doc.status || doc.status.active === true) && (!doc.monitor || doc.monitor === utils.host)) {
utils.createTrigger(utils.initTrigger(doc))
.then(triggerIdentifier => {
logger.info(method, triggerIdentifier, 'created successfully');
+ if (utils.isMonitoringTrigger(doc.monitor, triggerIdentifier)) {
+ utils.monitorStatus.triggerStarted = "success";
+ }
})
.catch(err => {
var message = 'Automatically disabled after receiving exception on create trigger: ' + err;
@@ -368,7 +375,6 @@ module.exports = function(
var method = 'authorize';
if (utils.endpointAuth) {
-
if (!req.headers.authorization) {
res.set('www-authenticate', 'Basic realm="Private"');
res.status(HttpStatus.UNAUTHORIZED);
@@ -388,9 +394,7 @@ module.exports = function(
var uuid = auth[1];
var key = auth[2];
-
var endpointAuth = utils.endpointAuth.split(':');
-
if (endpointAuth[0] === uuid && endpointAuth[1] === key) {
next();
}
--
To stop receiving notification emails like this one, please contact
csantanapr@apache.org.