You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/08/07 18:51:27 UTC

[incubator-openwhisk-package-alarms] branch master updated: Applying filter on all db changes since 0 takes too long (#82)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-alarms.git


The following commit(s) were added to refs/heads/master by this push:
     new d36f0a4  Applying filter on all db changes since 0 takes too long (#82)
d36f0a4 is described below

commit d36f0a4e755bda0c617a49f532a821577b3426fd
Author: Jason Peterson <ja...@us.ibm.com>
AuthorDate: Mon Aug 7 14:51:25 2017 -0400

    Applying filter on all db changes since 0 takes too long (#82)
---
 provider/app.js           | 114 ++++++++++++++++++++++++++++++----------------
 provider/lib/constants.js |  10 ++--
 provider/lib/utils.js     | 111 +++++++++++++++++++++-----------------------
 3 files changed, 132 insertions(+), 103 deletions(-)

diff --git a/provider/app.js b/provider/app.js
index 63d2079..2746a54 100755
--- a/provider/app.js
+++ b/provider/app.js
@@ -23,7 +23,7 @@ app.use(bodyParser.urlencoded({ extended: false }));
 app.set('port', process.env.PORT || 8080);
 
 // Allow invoking servers with self-signed certificates.
-process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
+process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
 
 // If it does not already exist, create the triggers database.  This is the database that will
 // store the managed triggers.
@@ -34,7 +34,8 @@ var dbProtocol = process.env.DB_PROTOCOL;
 var dbPrefix = process.env.DB_PREFIX;
 var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
 var redisUrl = process.env.REDIS_URL;
-var ddname = '_design/' + constants.DESIGN_DOC_NAME;
+var filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
+var viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
 
 // Create the Provider Server
 var server = http.createServer(app);
@@ -42,56 +43,88 @@ server.listen(app.get('port'), function() {
     logger.info('server.listen', 'Express server listening on port ' + app.get('port'));
 });
 
-function createDatabase(nanop) {
+function createDatabase() {
     var method = 'createDatabase';
     logger.info(method, 'creating the trigger database');
 
-    return new Promise(function(resolve, reject) {
-        nanop.db.create(databaseName, function (err, body) {
-            if (!err) {
-                logger.info(method, 'created trigger database:', databaseName);
-            }
-            else if (err.statusCode !== 412) {
-                logger.info(method, 'failed to create trigger database:', databaseName, err);
-            }
-            var db = nanop.db.use(databaseName);
+    var nano = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
 
-            var only_triggers_by_worker = function(doc, req) {
-                return doc.maxTriggers && ((!doc.worker && req.query.worker === 'worker0') || (doc.worker === req.query.worker));
-            }.toString();
+    if (nano !== null) {
+        return new Promise(function (resolve, reject) {
+            nano.db.create(databaseName, function (err, body) {
+                if (!err) {
+                    logger.info(method, 'created trigger database:', databaseName);
+                }
+                else if (err.statusCode !== 412) {
+                    logger.info(method, 'failed to create trigger database:', databaseName, err);
+                }
+
+                var viewDD = {
+                    views: {
+                        triggers_by_worker: {
+                            map: function (doc) {
+                                if (doc.maxTriggers) {
+                                    emit(doc.worker || 'worker0', 1);
+                                }
+                            }.toString(),
+                            reduce: '_count'
+                        }
+                    }
+                };
 
-            db.get(ddname, function (error, body) {
-                if (error) {
-                    //new design doc
-                    db.insert({
+                createDesignDoc(nano.db.use(databaseName), viewDDName, viewDD)
+                .then((db) => {
+                    var filterDD = {
                         filters: {
-                            only_triggers_by_worker: only_triggers_by_worker
-                        },
-                    }, ddname, function (error, body) {
-                        if (error && error.statusCode !== 409) {
-                            reject("filter could not be created: " + error);
+                            triggers_by_worker:
+                                function (doc, req) {
+                                    return doc.maxTriggers && ((!doc.worker && req.query.worker === 'worker0') ||
+                                            (doc.worker === req.query.worker));
+                                }.toString()
                         }
-                        resolve(db);
-                    });
-                }
-                else {
+                    };
+                    return createDesignDoc(db, filterDDName, filterDD);
+                })
+                .then((db) => {
                     resolve(db);
-                }
+                })
+                .catch(err => {
+                    reject(err);
+                });
+
             });
         });
-    });
-}
-
-function createTriggerDb() {
-    var nanop = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
-    if (nanop !== null) {
-        return createDatabase(nanop);
     }
     else {
         Promise.reject('nano provider did not get created.  check db URL: ' + dbHost);
     }
 }
 
+function createDesignDoc(db, ddName, designDoc) {
+    var method = 'createDesignDoc';
+
+    return new Promise(function(resolve, reject) {
+
+        db.get(ddName, function (error, body) {
+            if (error) {
+                //new design doc
+                db.insert(designDoc, ddName, function (error, body) {
+                    if (error && error.statusCode !== 409) {
+                        logger.error(method, error);
+                        reject('design doc could not be created: ' + error);
+                    }
+                    else {
+                        resolve(db);
+                    }
+                });
+            }
+            else {
+                resolve(db);
+            }
+        });
+    });
+}
+
 function createRedisClient() {
     var method = 'createRedisClient';
 
@@ -101,11 +134,11 @@ function createRedisClient() {
             bluebird.promisifyAll(redis.RedisClient.prototype);
             var client = redis.createClient(redisUrl);
 
-            client.on("connect", function () {
+            client.on('connect', function () {
                 resolve(client);
             });
 
-            client.on("error", function (err) {
+            client.on('error', function (err) {
                 logger.error(method, 'Error connecting to redis', err);
                 reject(err);
             });
@@ -130,7 +163,7 @@ function init(server) {
         }
     }
 
-    createTriggerDb()
+    createDatabase()
     .then(db => {
         nanoDb = db;
         return createRedisClient();
@@ -154,7 +187,8 @@ function init(server) {
         app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
 
         providerUtils.initAllTriggers();
-    }).catch(err => {
+    })
+    .catch(err => {
         logger.error(method, 'an error occurred creating database:', err);
     });
 
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index e45e204..f5d7915 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -3,8 +3,9 @@ const DEFAULT_MAX_TRIGGERS = -1;
 const RETRY_ATTEMPTS = 10;
 const RETRY_DELAY = 1000; //in milliseconds
 const REDIS_KEY = 'active';
-const DESIGN_DOC_NAME = 'triggers';
-const FILTER_FUNCTION = 'only_triggers_by_worker';
+const FILTERS_DESIGN_DOC = 'triggerFilters';
+const VIEWS_DESIGN_DOC = 'triggerViews';
+const TRIGGERS_BY_WORKER = 'triggers_by_worker';
 
 
 module.exports = {
@@ -13,6 +14,7 @@ module.exports = {
     RETRY_ATTEMPTS: RETRY_ATTEMPTS,
     RETRY_DELAY: RETRY_DELAY,
     REDIS_KEY: REDIS_KEY,
-    DESIGN_DOC_NAME: DESIGN_DOC_NAME,
-    FILTER_FUNCTION: FILTER_FUNCTION
+    FILTERS_DESIGN_DOC: FILTERS_DESIGN_DOC,
+    VIEWS_DESIGN_DOC: VIEWS_DESIGN_DOC,
+    TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER
 };
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 359126e..847b4cf 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -14,7 +14,7 @@ module.exports = function(
     this.triggers = {};
     this.endpointAuth = process.env.ENDPOINT_AUTH;
     this.routerHost = process.env.ROUTER_HOST || 'localhost';
-    this.worker = process.env.WORKER || "worker0";
+    this.worker = process.env.WORKER || 'worker0';
     this.host = process.env.HOST_INDEX || 'host0';
     this.hostMachine = process.env.HOST_MACHINE;
     this.activeHost = 'host0'; //default value on init (will be updated for existing redis)
@@ -24,55 +24,47 @@ module.exports = function(
 
     var retryDelay = constants.RETRY_DELAY;
     var retryAttempts = constants.RETRY_ATTEMPTS;
-    var ddname = constants.DESIGN_DOC_NAME;
-    var filter = constants.FILTER_FUNCTION;
+    var filterDDName = constants.FILTERS_DESIGN_DOC;
+    var viewDDName = constants.VIEWS_DESIGN_DOC;
+    var triggersByWorker = constants.TRIGGERS_BY_WORKER;
     var utils = this;
 
     this.createTrigger = function(triggerIdentifier, newTrigger) {
         var method = 'createTrigger';
 
         try {
-            var cronHandle;
             return new Promise(function(resolve, reject) {
 
-                // to avoid multiple cron jobs for the same trigger we will only create a cron job if
-                // the trigger is not already in the list of identified triggers
-                if (!(triggerIdentifier in utils.triggers)) {
-                    cronHandle = new CronJob(newTrigger.cron,
-                        function onTick() {
-                            if (utils.activeHost === utils.host) {
-                                var triggerHandle = utils.triggers[triggerIdentifier];
-                                if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
-                                    try {
-                                        utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
-                                    } catch (e) {
-                                        logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
-                                    }
+                var cronHandle = new CronJob(newTrigger.cron,
+                    function onTick() {
+                        if (utils.activeHost === utils.host) {
+                            var triggerHandle = utils.triggers[triggerIdentifier];
+                            if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
+                                try {
+                                    utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
+                                } catch (e) {
+                                    logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
                                 }
                             }
                         }
-                    );
-                    logger.info(method, triggerIdentifier, 'starting cron job');
-                    cronHandle.start();
-
-                    var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
-
-                    utils.triggers[triggerIdentifier] = {
-                        cron: newTrigger.cron,
-                        cronHandle: cronHandle,
-                        triggersLeft: maxTriggers,
-                        maxTriggers: maxTriggers,
-                        apikey: newTrigger.apikey,
-                        name: newTrigger.name,
-                        namespace: newTrigger.namespace
-                    };
-                }
-                else {
-                    logger.info(method, triggerIdentifier, 'already exists');
-                }
+                    }
+                );
+                logger.info(method, triggerIdentifier, 'starting cron job');
+                cronHandle.start();
+
+                var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
+
+                utils.triggers[triggerIdentifier] = {
+                    cron: newTrigger.cron,
+                    cronHandle: cronHandle,
+                    triggersLeft: maxTriggers,
+                    maxTriggers: maxTriggers,
+                    apikey: newTrigger.apikey,
+                    name: newTrigger.name,
+                    namespace: newTrigger.namespace
+                };
                 resolve(triggerIdentifier);
             });
-
         } catch (err) {
             return Promise.reject(err);
         }
@@ -82,7 +74,7 @@ module.exports = function(
         var method = 'fireTrigger';
 
         var triggerIdentifier = utils.getTriggerIdentifier(apikey, namespace, name);
-        var host = "https://" + utils.routerHost + ":443";
+        var host = 'https://' + utils.routerHost + ':443';
         var auth = apikey.split(':');
         var dataTrigger = utils.triggers[triggerIdentifier];
         var uri = host + '/api/v1/namespaces/' + namespace + '/triggers/' + name;
@@ -95,7 +87,8 @@ module.exports = function(
                 utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after reaching max triggers');
                 logger.error(method, 'no more triggers left, disabled', triggerIdentifier);
             }
-        }).catch(err => {
+        })
+        .catch(err => {
             logger.error(method, err);
         });
     };
@@ -133,7 +126,8 @@ module.exports = function(
                                     utils.postTrigger(dataTrigger, payload, uri, auth, (retryCount + 1))
                                     .then(triggerId => {
                                         resolve(triggerId);
-                                    }).catch(err => {
+                                    })
+                                    .catch(err => {
                                         reject(err);
                                     });
                                 }, retryDelay);
@@ -216,25 +210,26 @@ module.exports = function(
     this.initAllTriggers = function() {
         var method = 'initAllTriggers';
 
-        logger.info(method, 'resetting system from last state');
+        //follow the trigger DB
+        utils.setupFollow('now');
 
-        triggerDB.changes({ since: 0, include_docs: true, filter: ddname + '/' + filter, worker: utils.worker }, (err, changes) => {
+        logger.info(method, 'resetting system from last state');
+        triggerDB.view(viewDDName, triggersByWorker, {reduce: false, include_docs: true, key: utils.worker}, function(err, body) {
             if (!err) {
-                changes.results.forEach(function (change) {
-                    var triggerIdentifier = change.id;
-                    var doc = change.doc;
+                body.rows.forEach(function (trigger) {
+                    var triggerIdentifier = trigger.id;
+                    var doc = trigger.doc;
 
-                    if (!doc.status || doc.status.active === true) {
+                    if ((!doc.status || doc.status.active === true) && !(triggerIdentifier in utils.triggers)) {
                         //check if trigger still exists in whisk db
                         var namespace = doc.namespace;
                         var name = doc.name;
                         var apikey = doc.apikey;
-                        logger.info(method, 'Checking if trigger', triggerIdentifier, 'still exists');
-
                         var host = 'https://' + utils.routerHost + ':' + 443;
                         var triggerURL = host + '/api/v1/namespaces/' + namespace + '/triggers/' + name;
                         var auth = apikey.split(':');
 
+                        logger.info(method, 'Checking if trigger', triggerIdentifier, 'still exists');
                         request({
                             method: 'get',
                             url: triggerURL,
@@ -247,13 +242,14 @@ module.exports = function(
                             if (!error && utils.shouldDisableTrigger(response.statusCode)) {
                                 var message = 'Automatically disabled after receiving a ' + response.statusCode + ' status code on init trigger';
                                 utils.disableTrigger(triggerIdentifier, response.statusCode, message);
-                                logger.error(method, 'trigger', triggerIdentifier, 'has been disabled due to status code', response.statusCode);
+                                logger.error(method, 'trigger', triggerIdentifier, 'has been disabled due to status code:', response.statusCode);
                             }
                             else {
                                 utils.createTrigger(triggerIdentifier, doc)
                                 .then(triggerIdentifier => {
                                     logger.info(method, triggerIdentifier, 'created successfully');
-                                }).catch(err => {
+                                })
+                                .catch(err => {
                                     var message = 'Automatically disabled after receiving exception on init trigger: ' + err;
                                     utils.disableTrigger(triggerIdentifier, undefined, message);
                                     logger.error(method, 'Disabled trigger', triggerIdentifier, 'due to exception:', err);
@@ -261,11 +257,7 @@ module.exports = function(
                             }
                         });
                     }
-                    else {
-                        logger.info(method, 'ignoring trigger', triggerIdentifier, 'since it is disabled.');
-                    }
                 });
-                utils.setupFollow(changes.last_seq);
             } else {
                 logger.error(method, 'could not get latest state from database', err);
             }
@@ -279,7 +271,7 @@ module.exports = function(
             var feed = triggerDB.follow({
                 since: seq,
                 include_docs: true,
-                filter: ddname + '/' + filter,
+                filter: filterDDName + '/' + triggersByWorker,
                 query_params: {worker: utils.worker}
             });
 
@@ -300,7 +292,8 @@ module.exports = function(
                         utils.createTrigger(triggerIdentifier, doc)
                         .then(triggerIdentifier => {
                             logger.info(method, triggerIdentifier, 'created successfully');
-                        }).catch(err => {
+                        })
+                        .catch(err => {
                             var message = 'Automatically disabled after receiving exception on create trigger: ' + err;
                             utils.disableTrigger(triggerIdentifier, undefined, message);
                             logger.error(method, 'Disabled trigger', triggerIdentifier, 'due to exception:', err);
@@ -372,14 +365,14 @@ module.exports = function(
                 var subscriber = redisClient.duplicate();
 
                 //create a subscriber client that listens for requests to perform swap
-                subscriber.on("message", function (channel, message) {
+                subscriber.on('message', function (channel, message) {
                     if (message === 'host0' || message === 'host1') {
-                        logger.info(method, message, "set to active host in channel", channel);
+                        logger.info(method, message, 'set to active host in channel', channel);
                         utils.activeHost = message;
                     }
                 });
 
-                subscriber.on("error", function (err) {
+                subscriber.on('error', function (err) {
                     logger.error(method, 'Error connecting to redis', err);
                     reject(err);
                 });

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].