You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2018/03/15 14:41:51 UTC

[incubator-openwhisk-package-cloudant] branch master updated: self monitoring support (#161)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-cloudant.git


The following commit(s) were added to refs/heads/master by this push:
     new 054c198  self monitoring support (#161)
054c198 is described below

commit 054c198b1e722e7cd562d7689ed9b4536aa74b4f
Author: Jason Peterson <ja...@us.ibm.com>
AuthorDate: Thu Mar 15 10:41:49 2018 -0400

    self monitoring support (#161)
---
 provider/app.js           |  30 ++++++-
 provider/lib/active.js    |   4 +-
 provider/lib/constants.js |   8 +-
 provider/lib/health.js    | 194 +++++++++++++++++++++++++++++++++++++++++++++-
 provider/lib/utils.js     |  74 +++++++++---------
 5 files changed, 266 insertions(+), 44 deletions(-)

diff --git a/provider/app.js b/provider/app.js
index 2deee1d..0bfd33d 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -25,7 +25,6 @@ app.set('port', process.env.PORT || 8080);
 // Allow invoking servers with self-signed certificates.
 process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
 
-
 // If it does not already exist, create the triggers database.  This is the database that will
 // store the managed triggers.
 var dbUsername = process.env.DB_USERNAME;
@@ -35,6 +34,8 @@ var dbProtocol = process.env.DB_PROTOCOL;
 var dbPrefix = process.env.DB_PREFIX;
 var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
 var redisUrl = process.env.REDIS_URL;
+var monitoringAuth = process.env.MONITORING_AUTH;
+var monitoringInterval = process.env.MONITORING_INTERVAL;
 var filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
 var viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
 
@@ -74,7 +75,7 @@ function createDatabase() {
                 };
 
                 createDesignDoc(nano.db.use(databaseName), viewDDName, viewDD)
-                .then((db) => {
+                .then(db => {
                     var filterDD = {
                         filters: {
                             triggers_by_worker:
@@ -86,6 +87,22 @@ function createDatabase() {
                     };
                     return createDesignDoc(db, filterDDName, filterDD);
                 })
+                .then(db => {
+                    if (monitoringAuth) {
+                        var filterDD = {
+                            filters: {
+                                canary_docs:
+                                    function (doc, req) {
+                                        return doc.isCanaryDoc && doc.host === req.query.host;
+                                    }.toString()
+                            }
+                        };
+                        return createDesignDoc(db, '_design/' + constants.MONITOR_DESIGN_DOC, filterDD);
+                    }
+                    else {
+                        return Promise.resolve(db);
+                    }
+                })
                 .then((db) => {
                     resolve(db);
                 })
@@ -146,7 +163,6 @@ function createRedisClient() {
                 client = redis.createClient(redisUrl);
             }
 
-
             client.on('connect', function () {
                 resolve(client);
             });
@@ -187,7 +203,7 @@ function init(server) {
     })
     .then(() => {
         var providerRAS = new ProviderRAS();
-        var providerHealth = new ProviderHealth(providerUtils);
+        var providerHealth = new ProviderHealth(logger, providerUtils);
         var providerActivation = new ProviderActivation(logger, providerUtils);
 
         // RAS Endpoint
@@ -200,6 +216,12 @@ function init(server) {
         app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
 
         providerUtils.initAllTriggers();
+
+        if (monitoringAuth) {
+            setInterval(function () {
+                providerHealth.monitor(monitoringAuth);
+            }, monitoringInterval || constants.MONITOR_INTERVAL);
+        }
     })
     .catch(err => {
         logger.error(method, 'an error occurred creating database:', err);
diff --git a/provider/lib/active.js b/provider/lib/active.js
index 66efdcd..9f8a0fc 100644
--- a/provider/lib/active.js
+++ b/provider/lib/active.js
@@ -3,13 +3,15 @@ module.exports = function(logger, utils) {
     // Active Endpoint
     this.endPoint = '/active';
 
+    var hostMachine = process.env.HOST_MACHINE;
+
     this.active = function(req, res) {
         var method = 'active';
 
         var response = {
             worker: utils.worker,
             host: utils.host,
-            hostMachine: utils.hostMachine,
+            hostMachine: hostMachine,
             active: utils.host === utils.activeHost
         };
 
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index 0203027..539fd99 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -5,7 +5,10 @@ const RETRY_DELAY = 1000; //in milliseconds
 const REDIS_KEY = 'active';
 const FILTERS_DESIGN_DOC = 'triggerFilters';
 const VIEWS_DESIGN_DOC = 'triggerViews';
+const MONITOR_DESIGN_DOC = 'monitorFilters';
 const TRIGGERS_BY_WORKER = 'triggers_by_worker';
+const DOCS_FOR_MONITOR = 'canary_docs';
+const MONITOR_INTERVAL = 5 * 1000 * 60; //in milliseconds
 
 
 module.exports = {
@@ -16,5 +19,8 @@ module.exports = {
     REDIS_KEY: REDIS_KEY,
     FILTERS_DESIGN_DOC: FILTERS_DESIGN_DOC,
     VIEWS_DESIGN_DOC: VIEWS_DESIGN_DOC,
-    TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER
+    TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER,
+    MONITOR_INTERVAL: MONITOR_INTERVAL,
+    MONITOR_DESIGN_DOC: MONITOR_DESIGN_DOC,
+    DOCS_FOR_MONITOR: DOCS_FOR_MONITOR
 };
diff --git a/provider/lib/health.js b/provider/lib/health.js
index bbdc01d..eb2832d 100644
--- a/provider/lib/health.js
+++ b/provider/lib/health.js
@@ -1,11 +1,21 @@
 var si = require('systeminformation');
 var v8 = require('v8');
+var request = require('request');
+var _ = require('lodash');
+var URL = require('url').URL;
+var constants = require('./constants.js');
 
-module.exports = function(utils) {
+module.exports = function(logger, utils) {
 
     // Health Endpoint
     this.endPoint = '/health';
 
+    var triggerName;
+    var canaryDocID;
+    var monitorStatus;
+    var monitorStages = ['triggerStarted', 'triggerFired', 'triggerStopped'];
+    var healthMonitor = this;
+
     // Health Logic
     this.health = function (req, res) {
 
@@ -20,13 +30,13 @@ module.exports = function(utils) {
             si.inetLatency(utils.routerHost)
         ])
         .then(results => {
+            stats.triggerMonitor = monitorStatus;
             stats.memory = results[0];
-            stats.cpu = results[1];
+            stats.cpu = _.omit(results[1], 'cpus');
             stats.disk = results[2];
             stats.network = results[3];
             stats.apiHostLatency = results[4];
             stats.heapStatistics = v8.getHeapStatistics();
-            stats.heapSpaceStatistics =v8.getHeapSpaceStatistics();
             res.send(stats);
         })
         .catch(error => {
@@ -35,4 +45,182 @@ module.exports = function(utils) {
         });
     };
 
+    this.monitor = function(apikey) {
+        var method = 'monitor';
+
+        var auth = apikey.split(':');
+
+        if (triggerName) {
+            monitorStatus = Object.assign({}, utils.monitorStatus);
+            utils.monitorStatus = {};
+
+            var monitorStatusSize = Object.keys(monitorStatus).length;
+            if (monitorStatusSize < 5) {
+                //we have a failure in one of the stages
+                var stageFailed = monitorStages[monitorStatusSize - 2];
+                monitorStatus[stageFailed] = 'failed';
+            }
+            var existingTriggerID = `:_:${triggerName}`;
+            var existingCanaryID = canaryDocID;
+
+            //delete trigger feed from database
+            healthMonitor.deleteDocFromDB(existingTriggerID, 0);
+
+            //delete the trigger
+            var uri = utils.uriHost + '/api/v1/namespaces/_/triggers/' + triggerName;
+            healthMonitor.deleteTrigger(existingTriggerID, uri, auth, 0);
+
+            //delete the canary doc
+            healthMonitor.deleteDocFromDB(existingCanaryID, 0);
+        }
+
+        //create new cloudant trigger and canary doc
+        var docSuffix = utils.worker + utils.host + '_' + Date.now();
+        triggerName = 'cloudant_' + docSuffix;
+        canaryDocID = 'canary_' + docSuffix;
+
+        //update status monitor object
+        utils.monitorStatus.triggerName = triggerName;
+        utils.monitorStatus.triggerType = 'changes';
+
+        var triggerURL = utils.uriHost + '/api/v1/namespaces/_/triggers/' + triggerName;
+        var triggerID = `:_:${triggerName}`;
+        healthMonitor.createTrigger(triggerURL, auth)
+        .then(info => {
+            logger.info(method, triggerID, info);
+            var newTrigger = healthMonitor.createCloudantTrigger(triggerID, apikey);
+            healthMonitor.createDocInDB(triggerID, newTrigger);
+        })
+        .catch(err => {
+            logger.error(method, triggerID, err);
+        });
+    };
+
+    this.createCloudantTrigger = function(triggerID, apikey) {
+        var method = 'createCloudantTrigger';
+
+        var dbURL = new URL(utils.db.config.url);
+        var dbName = utils.db.config.db;
+
+        var newTrigger = {
+            apikey: apikey,
+            id: triggerID,
+            host: dbURL.hostname,
+            port: dbURL.port,
+            protocol: dbURL.protocol.replace(':', ''),
+            dbname: dbName,
+            user: dbURL.username,
+            pass: dbURL.password,
+            filter: constants.MONITOR_DESIGN_DOC + '/' + constants.DOCS_FOR_MONITOR,
+            query_params: {host: utils.host},
+            maxTriggers: 1,
+            worker: utils.worker,
+            monitor: utils.host
+        };
+
+        return newTrigger;
+    };
+
+    this.createTrigger = function(triggerURL, auth) {
+        var method = 'createTrigger';
+
+        return new Promise(function(resolve, reject) {
+            request({
+                method: 'put',
+                uri: triggerURL,
+                auth: {
+                    user: auth[0],
+                    pass: auth[1]
+                },
+                json: true,
+                body: {}
+            }, function (error, response) {
+                if (error || response.statusCode >= 400) {
+                    reject('monitoring trigger create request failed');
+                }
+                else {
+                    resolve('monitoring trigger create request was successful');
+                }
+            });
+        });
+    };
+
+    this.createDocInDB = function(docID, doc) {
+        var method = 'createDocInDB';
+
+        utils.db.insert(doc, docID, function (err) {
+            if (!err) {
+                logger.info(method, docID, 'was successfully inserted');
+                if (doc.monitor) {
+                    setTimeout(function () {
+                        var canaryDoc = {
+                            isCanaryDoc: true,
+                            host: utils.host
+                        };
+                        healthMonitor.createDocInDB(canaryDocID, canaryDoc);
+                    }, 1000 * 60);
+                }
+            }
+            else {
+                logger.error(method, docID, err);
+            }
+        });
+    };
+
+    this.deleteTrigger = function(triggerID, uri, auth, retryCount) {
+        var method = 'deleteTrigger';
+
+        request({
+            method: 'delete',
+            uri: uri,
+            auth: {
+                user: auth[0],
+                pass: auth[1]
+            },
+        }, function (error, response) {
+            logger.info(method, triggerID, 'http delete request, STATUS:', response ? response.statusCode : undefined);
+            if (error || response.statusCode >= 400) {
+                if (!error && response.statusCode === 409 && retryCount < 5) {
+                    logger.info(method, 'attempting to delete trigger again', triggerID, 'Retry Count:', (retryCount + 1));
+                    setTimeout(function () {
+                        healthMonitor.deleteTrigger(triggerID, uri, auth, (retryCount + 1));
+                    }, 1000);
+                } else {
+                    logger.error(method, triggerID, 'trigger delete request failed');
+                }
+            }
+            else {
+                logger.info(method, triggerID, 'trigger delete request was successful');
+            }
+        });
+    };
+
+    this.deleteDocFromDB = function(docID, retryCount) {
+        var method = 'deleteDocFromDB';
+
+        //delete from database
+        utils.db.get(docID, function (err, existing) {
+            if (!err) {
+                utils.db.destroy(existing._id, existing._rev, function (err) {
+                    if (err) {
+                        if (err.statusCode === 409 && retryCount < 5) {
+                            setTimeout(function () {
+                                healthMonitor.deleteDocFromDB(docID, (retryCount + 1));
+                            }, 1000);
+                        }
+                        else {
+                            logger.error(method, docID, 'could not be deleted from the database');
+                        }
+                    }
+                    else {
+                        logger.info(method, docID, 'was successfully deleted from the database');
+                    }
+                });
+            }
+            else {
+                logger.error(method, docID, 'could not be found in the database');
+            }
+        });
+    };
+
 };
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 8446117..7906c36 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -1,25 +1,21 @@
-var _ = require('lodash');
 var request = require('request');
 var HttpStatus = require('http-status-codes');
 var constants = require('./constants.js');
 
+module.exports = function(logger, triggerDB, redisClient) {
 
-module.exports = function(
-  logger,
-  triggerDB,
-  redisClient
-) {
-    this.module = 'utils';
     this.triggers = {};
     this.endpointAuth = process.env.ENDPOINT_AUTH;
     this.routerHost = process.env.ROUTER_HOST || 'localhost';
     this.worker = process.env.WORKER || 'worker0';
     this.host = process.env.HOST_INDEX || 'host0';
-    this.hostMachine = process.env.HOST_MACHINE;
     this.activeHost = 'host0'; //default value on init (will be updated for existing redis)
+    this.db = triggerDB;
     this.redisClient = redisClient;
-    this.redisHash = triggerDB.config.db + '_' + this.worker;
+    this.redisHash = this.db.config.db + '_' + this.worker;
     this.redisKey = constants.REDIS_KEY;
+    this.uriHost ='https://' + this.routerHost + ':443';
+    this.monitorStatus = {};
 
     var retryAttempts = constants.RETRY_ATTEMPTS;
     var filterDDName = constants.FILTERS_DESIGN_DOC;
@@ -56,16 +52,13 @@ module.exports = function(
             utils.triggers[dataTrigger.id] = dataTrigger;
 
             feed.on('change', function (change) {
-                if (utils.activeHost === utils.host) {
+                var triggerHandle = utils.triggers[dataTrigger.id];
+                if (triggerHandle && utils.shouldFireTrigger(triggerHandle) && utils.hasTriggersRemaining(triggerHandle)) {
                     logger.info(method, 'Trigger', dataTrigger.id, 'got change from', dataTrigger.dbname);
-
-                    var triggerHandle = utils.triggers[dataTrigger.id];
-                    if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
-                        try {
-                            utils.fireTrigger(dataTrigger.id, change);
-                        } catch (e) {
-                            logger.error(method, 'Exception occurred while firing trigger', dataTrigger.id, e);
-                        }
+                    try {
+                        utils.fireTrigger(dataTrigger.id, change);
+                    } catch (e) {
+                        logger.error(method, 'Exception occurred while firing trigger', dataTrigger.id, e);
                     }
                 }
             });
@@ -73,17 +66,15 @@ module.exports = function(
             feed.follow();
 
             return new Promise(function(resolve, reject) {
-
                 feed.on('error', function (err) {
                     logger.error(method,'Error occurred for trigger', dataTrigger.id, '(db ' + dataTrigger.dbname + '):', err);
                     reject(err);
                 });
 
-                feed.on('confirm', function (dbObj) {
+                feed.on('confirm', function () {
                     logger.info(method, 'Added cloudant data trigger', dataTrigger.id, 'listening for changes in database', dataTrigger.dbname);
                     resolve(dataTrigger.id);
                 });
-
             });
 
         } catch (err) {
@@ -94,10 +85,6 @@ module.exports = function(
     };
 
     this.initTrigger = function(newTrigger) {
-        var method = 'initTrigger';
-
-        logger.info(method, 'create trigger', newTrigger.id, 'with the following args', newTrigger);
-
         var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
 
         var trigger = {
@@ -113,7 +100,8 @@ module.exports = function(
             maxTriggers: maxTriggers,
             triggersLeft: maxTriggers,
             filter: newTrigger.filter,
-            query_params: newTrigger.query_params
+            query_params: newTrigger.query_params,
+            monitor: newTrigger.monitor
         };
 
         return trigger;
@@ -124,6 +112,18 @@ module.exports = function(
             [HttpStatus.REQUEST_TIMEOUT, HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1);
     };
 
+    this.shouldFireTrigger = function(trigger) {
+        return trigger.monitor || utils.activeHost === utils.host;
+    };
+
+    this.hasTriggersRemaining = function(trigger) {
+        return !trigger.maxTriggers || trigger.maxTriggers === -1 || trigger.triggersLeft > 0;
+    };
+
+    this.isMonitoringTrigger = function(monitor, triggerIdentifier) {
+        return monitor && utils.monitorStatus.triggerName === utils.parseQName(triggerIdentifier).name;
+    };
+
     this.disableTrigger = function(id, statusCode, message) {
         var method = 'disableTrigger';
 
@@ -188,9 +188,12 @@ module.exports = function(
         utils.postTrigger(dataTrigger, form, uri, auth, 0)
         .then(triggerId => {
             logger.info(method, 'Trigger', triggerId, 'was successfully fired');
+            if (utils.isMonitoringTrigger(dataTrigger.monitor, triggerId)) {
+                utils.monitorStatus.triggerFired = "success";
+            }
             if (dataTrigger.triggersLeft === 0) {
-                utils.disableTrigger(dataTrigger.id, undefined, 'Automatically disabled after reaching max triggers');
-                logger.warn(method, 'no more triggers left, disabled', dataTrigger.id);
+                utils.disableTrigger(triggerId, undefined, 'Automatically disabled after reaching max triggers');
+                logger.warn(method, 'no more triggers left, disabled', triggerId);
             }
         })
         .catch(err => {
@@ -273,7 +276,7 @@ module.exports = function(
                     var triggerIdentifier = trigger.id;
                     var doc = trigger.doc;
 
-                    if (!(triggerIdentifier in utils.triggers)) {
+                    if (!(triggerIdentifier in utils.triggers) && !doc.monitor) {
                         //check if trigger still exists in whisk db
                         var triggerObj = utils.parseQName(triggerIdentifier);
                         var host = 'https://' + utils.routerHost + ':' + 443;
@@ -330,19 +333,23 @@ module.exports = function(
                 var triggerIdentifier = change.id;
                 var doc = change.doc;
 
-                logger.info(method, 'got change for trigger', triggerIdentifier);
-
                 if (utils.triggers[triggerIdentifier]) {
                     if (doc.status && doc.status.active === false) {
                         utils.deleteTrigger(triggerIdentifier);
+                        if (utils.isMonitoringTrigger(doc.monitor, triggerIdentifier)) {
+                            utils.monitorStatus.triggerStopped = "success";
+                        }
                     }
                 }
                 else {
                     //ignore changes to disabled triggers
-                    if (!doc.status || doc.status.active === true) {
+                    if ((!doc.status || doc.status.active === true) && (!doc.monitor || doc.monitor === utils.host)) {
                         utils.createTrigger(utils.initTrigger(doc))
                         .then(triggerIdentifier => {
                             logger.info(method, triggerIdentifier, 'created successfully');
+                            if (utils.isMonitoringTrigger(doc.monitor, triggerIdentifier)) {
+                                utils.monitorStatus.triggerStarted = "success";
+                            }
                         })
                         .catch(err => {
                             var message = 'Automatically disabled after receiving exception on create trigger: ' + err;
@@ -368,7 +375,6 @@ module.exports = function(
         var method = 'authorize';
 
         if (utils.endpointAuth) {
-
             if (!req.headers.authorization) {
                 res.set('www-authenticate', 'Basic realm="Private"');
                 res.status(HttpStatus.UNAUTHORIZED);
@@ -388,9 +394,7 @@ module.exports = function(
 
             var uuid = auth[1];
             var key = auth[2];
-
             var endpointAuth = utils.endpointAuth.split(':');
-
             if (endpointAuth[0] === uuid && endpointAuth[1] === key) {
                 next();
             }

-- 
To stop receiving notification emails like this one, please contact
csantanapr@apache.org.