You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/08/07 18:55:21 UTC
[incubator-openwhisk-package-cloudant] branch master updated:
Applying filter on all db changes since 0 takes too long (#121)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-cloudant.git
The following commit(s) were added to refs/heads/master by this push:
new 5da607a Applying filter on all db changes since 0 takes too long (#121)
5da607a is described below
commit 5da607ae78082acb0732a0387a3f15e01a032226
Author: Jason Peterson <ja...@us.ibm.com>
AuthorDate: Mon Aug 7 14:55:20 2017 -0400
Applying filter on all db changes since 0 takes too long (#121)
---
provider/app.js | 160 ++++++++++++++++++++++++++++------------------
provider/lib/constants.js | 10 +--
provider/lib/utils.js | 52 ++++++++-------
3 files changed, 131 insertions(+), 91 deletions(-)
diff --git a/provider/app.js b/provider/app.js
index 4e37f85..2746a54 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -23,7 +23,7 @@ app.use(bodyParser.urlencoded({ extended: false }));
app.set('port', process.env.PORT || 8080);
// Allow invoking servers with self-signed certificates.
-process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
+process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
// If it does not already exist, create the triggers database. This is the database that will
// store the managed triggers.
@@ -34,7 +34,8 @@ var dbProtocol = process.env.DB_PROTOCOL;
var dbPrefix = process.env.DB_PREFIX;
var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
var redisUrl = process.env.REDIS_URL;
-var ddname = '_design/' + constants.DESIGN_DOC_NAME;
+var filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
+var viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
// Create the Provider Server
var server = http.createServer(app);
@@ -42,56 +43,88 @@ server.listen(app.get('port'), function() {
logger.info('server.listen', 'Express server listening on port ' + app.get('port'));
});
-function createDatabase(nanop) {
+function createDatabase() {
var method = 'createDatabase';
logger.info(method, 'creating the trigger database');
- return new Promise(function(resolve, reject) {
- nanop.db.create(databaseName, function (err, body) {
- if (!err) {
- logger.info(method, 'created trigger database:', databaseName);
- }
- else if (err.statusCode !== 412) {
- logger.info(method, 'failed to create trigger database:', databaseName, err);
- }
- var db = nanop.db.use(databaseName);
+ var nano = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
+
+ if (nano !== null) {
+ return new Promise(function (resolve, reject) {
+ nano.db.create(databaseName, function (err, body) {
+ if (!err) {
+ logger.info(method, 'created trigger database:', databaseName);
+ }
+ else if (err.statusCode !== 412) {
+ logger.info(method, 'failed to create trigger database:', databaseName, err);
+ }
- var only_triggers_by_worker = function(doc, req) {
- return doc.maxTriggers && ((!doc.worker && req.query.worker === 'worker0') || (doc.worker === req.query.worker));
- }.toString();
+ var viewDD = {
+ views: {
+ triggers_by_worker: {
+ map: function (doc) {
+ if (doc.maxTriggers) {
+ emit(doc.worker || 'worker0', 1);
+ }
+ }.toString(),
+ reduce: '_count'
+ }
+ }
+ };
- db.get(ddname, function (error, body) {
- if (error) {
- //new design doc
- db.insert({
+ createDesignDoc(nano.db.use(databaseName), viewDDName, viewDD)
+ .then((db) => {
+ var filterDD = {
filters: {
- only_triggers_by_worker: only_triggers_by_worker
- },
- }, ddname, function (error, body) {
- if (error && error.statusCode !== 409) {
- reject("filter could not be created: " + error);
+ triggers_by_worker:
+ function (doc, req) {
+ return doc.maxTriggers && ((!doc.worker && req.query.worker === 'worker0') ||
+ (doc.worker === req.query.worker));
+ }.toString()
}
- resolve(db);
- });
- }
- else {
+ };
+ return createDesignDoc(db, filterDDName, filterDD);
+ })
+ .then((db) => {
resolve(db);
- }
+ })
+ .catch(err => {
+ reject(err);
+ });
+
});
});
- });
-}
-
-function createTriggerDb() {
- var nanop = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
- if (nanop !== null) {
- return createDatabase(nanop);
}
else {
Promise.reject('nano provider did not get created. check db URL: ' + dbHost);
}
}
+function createDesignDoc(db, ddName, designDoc) {
+ var method = 'createDesignDoc';
+
+ return new Promise(function(resolve, reject) {
+
+ db.get(ddName, function (error, body) {
+ if (error) {
+ //new design doc
+ db.insert(designDoc, ddName, function (error, body) {
+ if (error && error.statusCode !== 409) {
+ logger.error(method, error);
+ reject('design doc could not be created: ' + error);
+ }
+ else {
+ resolve(db);
+ }
+ });
+ }
+ else {
+ resolve(db);
+ }
+ });
+ });
+}
+
function createRedisClient() {
var method = 'createRedisClient';
@@ -101,11 +134,11 @@ function createRedisClient() {
bluebird.promisifyAll(redis.RedisClient.prototype);
var client = redis.createClient(redisUrl);
- client.on("connect", function () {
+ client.on('connect', function () {
resolve(client);
});
- client.on("error", function (err) {
+ client.on('error', function (err) {
logger.error(method, 'Error connecting to redis', err);
reject(err);
});
@@ -130,31 +163,32 @@ function init(server) {
}
}
- createTriggerDb()
- .then(db => {
- nanoDb = db;
- return createRedisClient();
- })
- .then(client => {
- providerUtils = new ProviderUtils(logger, nanoDb, client);
- return providerUtils.initRedis();
- })
- .then(() => {
- var providerRAS = new ProviderRAS();
- var providerHealth = new ProviderHealth(providerUtils);
- var providerActivation = new ProviderActivation(logger, providerUtils);
-
- // RAS Endpoint
- app.get(providerRAS.endPoint, providerRAS.ras);
-
- // Health Endpoint
- app.get(providerHealth.endPoint, providerUtils.authorize, providerHealth.health);
-
- // Activation Endpoint
- app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
-
- providerUtils.initAllTriggers();
- }).catch(err => {
+ createDatabase()
+ .then(db => {
+ nanoDb = db;
+ return createRedisClient();
+ })
+ .then(client => {
+ providerUtils = new ProviderUtils(logger, nanoDb, client);
+ return providerUtils.initRedis();
+ })
+ .then(() => {
+ var providerRAS = new ProviderRAS();
+ var providerHealth = new ProviderHealth(providerUtils);
+ var providerActivation = new ProviderActivation(logger, providerUtils);
+
+ // RAS Endpoint
+ app.get(providerRAS.endPoint, providerRAS.ras);
+
+ // Health Endpoint
+ app.get(providerHealth.endPoint, providerUtils.authorize, providerHealth.health);
+
+ // Activation Endpoint
+ app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
+
+ providerUtils.initAllTriggers();
+ })
+ .catch(err => {
logger.error(method, 'an error occurred creating database:', err);
});
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index 2f2eca1..0203027 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -3,8 +3,9 @@ const DEFAULT_MAX_TRIGGERS = -1;
const RETRY_ATTEMPTS = 12;
const RETRY_DELAY = 1000; //in milliseconds
const REDIS_KEY = 'active';
-const DESIGN_DOC_NAME = 'triggers';
-const FILTER_FUNCTION = 'only_triggers_by_worker';
+const FILTERS_DESIGN_DOC = 'triggerFilters';
+const VIEWS_DESIGN_DOC = 'triggerViews';
+const TRIGGERS_BY_WORKER = 'triggers_by_worker';
module.exports = {
@@ -13,6 +14,7 @@ module.exports = {
RETRY_ATTEMPTS: RETRY_ATTEMPTS,
RETRY_DELAY: RETRY_DELAY,
REDIS_KEY: REDIS_KEY,
- DESIGN_DOC_NAME: DESIGN_DOC_NAME,
- FILTER_FUNCTION: FILTER_FUNCTION
+ FILTERS_DESIGN_DOC: FILTERS_DESIGN_DOC,
+ VIEWS_DESIGN_DOC: VIEWS_DESIGN_DOC,
+ TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER
};
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 31ba55b..5c631ff 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -3,6 +3,7 @@ var request = require('request');
var HttpStatus = require('http-status-codes');
var constants = require('./constants.js');
+
module.exports = function(
logger,
triggerDB,
@@ -12,7 +13,7 @@ module.exports = function(
this.triggers = {};
this.endpointAuth = process.env.ENDPOINT_AUTH;
this.routerHost = process.env.ROUTER_HOST || 'localhost';
- this.worker = process.env.WORKER || "worker0";
+ this.worker = process.env.WORKER || 'worker0';
this.host = process.env.HOST_INDEX || 'host0';
this.hostMachine = process.env.HOST_MACHINE;
this.activeHost = 'host0'; //default value on init (will be updated for existing redis)
@@ -21,8 +22,9 @@ module.exports = function(
this.redisKey = constants.REDIS_KEY;
var retryAttempts = constants.RETRY_ATTEMPTS;
- var ddname = constants.DESIGN_DOC_NAME;
- var filter = constants.FILTER_FUNCTION;
+ var filterDDName = constants.FILTERS_DESIGN_DOC;
+ var viewDDName = constants.VIEWS_DESIGN_DOC;
+ var triggersByWorker = constants.TRIGGERS_BY_WORKER;
var utils = this;
// Add a trigger: listen for changes and dispatch.
@@ -102,12 +104,12 @@ module.exports = function(
id: newTrigger.id,
host: newTrigger.host,
port: newTrigger.port,
- protocol: newTrigger.protocol || "https",
+ protocol: newTrigger.protocol || 'https',
dbname: newTrigger.dbname,
user: newTrigger.user,
pass: newTrigger.pass,
apikey: newTrigger.apikey,
- since: newTrigger.since || "now",
+ since: newTrigger.since || 'now',
maxTriggers: maxTriggers,
triggersLeft: maxTriggers,
filter: newTrigger.filter,
@@ -193,7 +195,8 @@ module.exports = function(
utils.disableTrigger(dataTrigger.id, undefined, 'Automatically disabled after reaching max triggers');
logger.error(method, 'no more triggers left, disabled', dataTrigger.id);
}
- }).catch(err => {
+ })
+ .catch(err => {
logger.error(method, err);
});
};
@@ -230,7 +233,8 @@ module.exports = function(
utils.postTrigger(dataTrigger, form, uri, auth, (retryCount + 1))
.then(triggerId => {
resolve(triggerId);
- }).catch(err => {
+ })
+ .catch(err => {
reject(err);
});
}, timeout);
@@ -257,15 +261,17 @@ module.exports = function(
this.initAllTriggers = function() {
var method = 'initAllTriggers';
- logger.info(method, 'resetting system from last state');
+ //follow the trigger DB
+ utils.setupFollow('now');
- triggerDB.changes({ since: 0, include_docs: true, filter: ddname + '/' + filter, worker: utils.worker }, (err, changes) => {
+ logger.info(method, 'resetting system from last state');
+ triggerDB.view(viewDDName, triggersByWorker, {reduce: false, include_docs: true, key: utils.worker}, function(err, body) {
if (!err) {
- changes.results.forEach(function (change) {
- var triggerIdentifier = change.id;
- var doc = change.doc;
+ body.rows.forEach(function (trigger) {
+ var triggerIdentifier = trigger.id;
+ var doc = trigger.doc;
- if (!doc.status || doc.status.active === true) {
+ if ((!doc.status || doc.status.active === true) && !(triggerIdentifier in utils.triggers)) {
//check if trigger still exists in whisk db
var triggerObj = utils.parseQName(triggerIdentifier);
var host = 'https://' + utils.routerHost + ':' + 443;
@@ -290,8 +296,9 @@ module.exports = function(
else {
utils.createTrigger(utils.initTrigger(doc))
.then(triggerIdentifier => {
- logger.info(method, 'Trigger was added.', triggerIdentifier);
- }).catch(err => {
+ logger.info(method, triggerIdentifier, 'created successfully');
+ })
+ .catch(err => {
var message = 'Automatically disabled after receiving exception on init trigger: ' + err;
utils.disableTrigger(triggerIdentifier, undefined, message);
logger.error(method, 'Disabled trigger', triggerIdentifier, 'due to exception:', err);
@@ -299,11 +306,7 @@ module.exports = function(
}
});
}
- else {
- logger.info(method, 'ignoring trigger', triggerIdentifier, 'since it is disabled.');
- }
});
- utils.setupFollow(changes.last_seq);
} else {
logger.error(method, 'could not get latest state from database', err);
}
@@ -317,7 +320,7 @@ module.exports = function(
var feed = triggerDB.follow({
since: seq,
include_docs: true,
- filter: ddname + '/' + filter,
+ filter: filterDDName + '/' + triggersByWorker,
query_params: {worker: utils.worker}
});
@@ -338,7 +341,8 @@ module.exports = function(
utils.createTrigger(utils.initTrigger(doc))
.then(triggerIdentifier => {
logger.info(method, triggerIdentifier, 'created successfully');
- }).catch(err => {
+ })
+ .catch(err => {
var message = 'Automatically disabled after receiving exception on create trigger: ' + err;
utils.disableTrigger(triggerIdentifier, undefined, message);
logger.error(method, 'Disabled trigger', triggerIdentifier, 'due to exception:', err);
@@ -425,14 +429,14 @@ module.exports = function(
var subscriber = redisClient.duplicate();
//create a subscriber client that listens for requests to perform swap
- subscriber.on("message", function (channel, message) {
+ subscriber.on('message', function (channel, message) {
if (message === 'host0' || message === 'host1') {
- logger.info(method, message, "set to active host in channel", channel);
+ logger.info(method, message, 'set to active host in channel', channel);
utils.activeHost = message;
}
});
- subscriber.on("error", function (err) {
+ subscriber.on('error', function (err) {
logger.error(method, 'Error connecting to redis', err);
reject(err);
});
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].