You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ea...@apache.org on 2016/10/24 19:22:01 UTC
[2/4] qpid-dispatch git commit: DISPATCH-536 Queue up large numbers
of requests on topology page
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fc43944/console/stand-alone/plugin/js/qdrService.js
----------------------------------------------------------------------
diff --git a/console/stand-alone/plugin/js/qdrService.js b/console/stand-alone/plugin/js/qdrService.js
index 7854cec..68001aa 100644
--- a/console/stand-alone/plugin/js/qdrService.js
+++ b/console/stand-alone/plugin/js/qdrService.js
@@ -26,13 +26,38 @@ var QDR = (function(QDR) {
QDR.module.factory("QDRService", ['$rootScope', '$http', '$timeout', '$resource', '$location', function($rootScope, $http, $timeout, $resource, $location) {
var self = {
- rhea: require("rhea"),
+ rhea: require("rhea"),
- timeout: 10,
+ timeout: 4, // seconds to wait before assuming a request has failed
+ updateInterval: 2000, // milliseconds between background updates
connectActions: [],
disconnectActions: [],
updatedActions: {},
- stop: undefined, // update interval handle
+ updating: false, // are we updating the node list in the background
+ maxCorrelatorDepth: 10, // max number of outstanding requests to allow
+ /**
+ * @property options
+ * Holds a reference to the connection options when
+ * a connection is started
+ */
+ options: undefined,
+
+ /*
+ * @property message
+ * The proton message that is used to send commands
+ * and receive responses
+ */
+ sender: undefined,
+ receiver: undefined,
+ sendable: false,
+
+ schema: undefined,
+
+ toAddress: undefined,
+ connected: false,
+ gotTopology: false,
+ errorText: undefined,
+ connectionError: undefined,
addConnectAction: function(action) {
if (angular.isFunction(action)) {
@@ -46,162 +71,133 @@ var QDR = (function(QDR) {
},
addUpdatedAction: function(key, action) {
if (angular.isFunction(action)) {
- self.updatedActions[key] = action;
+ self.updatedActions[key] = action;
}
},
delUpdatedAction: function(key) {
if (key in self.updatedActions)
- delete self.updatedActions[key];
+ delete self.updatedActions[key];
},
executeConnectActions: function() {
self.connectActions.forEach(function(action) {
//QDR.log.debug("executing connect action " + action);
- try {
- action.apply();
- } catch (e) {
- // in case the page that registered the handler has been unloaded
- }
+ try {
+ action.apply();
+ } catch (e) {
+ // in case the page that registered the handler has been unloaded
+ }
});
self.connectActions = [];
- if ($location.path().startsWith(QDR.pluginRoot)) {
- $timeout( function () {
-
- var searchObject = $location.search();
- var goto = "overview";
- if (searchObject.org && searchObject.org !== "connect") {
- goto = searchObject.org;
- }
- $location.search('org', null)
- $location.path(QDR.pluginRoot +"/" + goto);
- })
- }
-
},
executeDisconnectActions: function() {
self.disconnectActions.forEach(function(action) {
- try {
- action.apply();
- } catch (e) {
- // in case the page that registered the handler has been unloaded
- }
+ try {
+ action.apply();
+ } catch (e) {
+ // in case the page that registered the handler has been unloaded
+ }
});
self.disconnectActions = [];
},
executeUpdatedActions: function() {
for (action in self.updatedActions) {
- try {
- self.updatedActions[action].apply();
- } catch (e) {
- delete self.updatedActions[action]
- }
+// try {
+ self.updatedActions[action].apply();
+/* } catch (e) {
+QDR.log.debug("caught error executing updated actions")
+console.dump(e)
+ delete self.updatedActions[action]
+ }
+ */
}
},
- redirectWhenConnected: function (org) {
- $location.path("/" + QDR.pluginName + "/connect")
- $location.search('org', org);
- },
+ redirectWhenConnected: function(org) {
+ $location.path("/" + QDR.pluginName + "/connect")
+ $location.search('org', org);
+ },
notifyTopologyDone: function() {
- //QDR.log.debug("got Toplogy done notice");
-
- if (!angular.isDefined(self.schema))
- return;
- else if (self.topology._gettingTopo)
- return;
if (!self.gotTopology) {
- QDR.log.debug("topology was just initialized");
- self.gotTopology = true;
- self.executeConnectActions();
- $rootScope.$apply();
+ QDR.log.debug("topology was just initialized");
+ console.dump(self.topology._nodeInfo)
+ self.gotTopology = true;
+ //$rootScope.$apply();
} else {
- QDR.log.debug("topology model was just updated");
- self.executeUpdatedActions();
+ QDR.log.debug("topology model was just updated");
}
+ self.executeUpdatedActions();
},
- /**
- * @property options
- * Holds a reference to the connection options when
- * a connection is started
- */
- options: undefined,
-
- /*
- * @property message
- * The proton message that is used to send commands
- * and receive responses
- */
- sender: undefined,
- receiver: undefined,
- sendable: false,
-
- schema: undefined,
-
- toAddress: undefined,
- connected: false,
- gotTopology: false,
- errorText: undefined,
- connectionError: undefined,
isConnected: function() {
return self.connected;
},
- correlator: {
+ correlator: {
_objects: {},
- _corremationID: 0,
+ _correlationID: 0,
- corr: function () {
- var id = ++this._corremationID + "";
- this._objects[id] = {resolver: null}
- return id;
+ corr: function() {
+ var id = ++this._correlationID + "";
+ this._objects[id] = {
+ resolver: null
+ }
+ return id;
},
request: function() {
- //QDR.log.debug("correlator:request");
- return this;
+ //QDR.log.debug("correlator:request");
+ return this;
},
then: function(id, resolver, error) {
- //QDR.log.debug("registered then resolver for correlationID: " + id);
- if (error) {
- delete this._objects[id];
- return;
- }
- this._objects[id].resolver = resolver;
+ //QDR.log.debug("registered then resolver for correlationID: " + id);
+ if (error) {
+ //QDR.log.debug("then received an error. deleting correlator")
+ delete this._objects[id];
+ return;
+ }
+ this._objects[id].resolver = resolver;
},
// called by receiver's on('message') handler when a response arrives
resolve: function(context) {
- var correlationID = context.message.properties.correlation_id;
- this._objects[correlationID].resolver(context.message.body, context);
- delete this._objects[correlationID];
+ var correlationID = context.message.properties.correlation_id;
+ this._objects[correlationID].resolver(context.message.body, context);
+ delete this._objects[correlationID];
+ },
+ depth: function () {
+ return Object.keys(this._objects).length
}
- },
-
- onSubscription: function() {
- self.getSchema();
- },
-
- startUpdating: function () {
- self.stopUpdating();
+ },
+
+ onSubscription: function() {
+ self.executeConnectActions();
+ //self.getSchema();
+ },
+
+ startUpdating: function() {
+ self.stopUpdating(true);
QDR.log.info("startUpdating called")
+ self.updating = true;
self.topology.get();
- self.stop = setInterval(function() {
- self.topology.get();
- }, 2000);
- },
- stopUpdating: function () {
- if (angular.isDefined(self.stop)) {
- QDR.log.info("stopUpdating called")
- clearInterval(self.stop);
- self.stop = undefined;
+ },
+ stopUpdating: function(silent) {
+ if (self.topology._getTimer) {
+ clearTimeout(self.topology._getTimer)
+ self.topology._getTimer = null;
+ }
+ if (self.topology._waitTimer) {
+ clearTimeout(self.topology._waitTimer)
+ self.topology._waitTimer = null;
}
- },
+ if (!silent)
+ QDR.log.info("stopUpdating called")
+ self.updating = false;
+ },
initProton: function() {
//QDR.log.debug("*************QDR init proton called ************");
},
- cleanUp: function() {
- },
+ cleanUp: function() {},
error: function(line) {
if (line.num) {
QDR.log.debug("error - num: ", line.num, " message: ", line.message);
@@ -214,144 +210,186 @@ var QDR = (function(QDR) {
self.executeDisconnectActions();
},
- nameFromId: function (id) {
- return id.split('/')[3];
+ nameFromId: function(id) {
+ return id.split('/')[3];
},
- humanify: function (s) {
- if (!s || s.length === 0)
- return s;
- var t = s.charAt(0).toUpperCase() + s.substr(1).replace(/[A-Z]/g, ' $&');
- return t.replace(".", " ");
+ humanify: function(s) {
+ if (!s || s.length === 0)
+ return s;
+ var t = s.charAt(0).toUpperCase() + s.substr(1).replace(/[A-Z]/g, ' $&');
+ return t.replace(".", " ");
+ },
+ pretty: function(v) {
+ var formatComma = d3.format(",");
+ if (!isNaN(parseFloat(v)) && isFinite(v))
+ return formatComma(v);
+ return v;
},
- pretty: function(v) {
- var formatComma = d3.format(",");
- if (!isNaN(parseFloat(v)) && isFinite(v))
- return formatComma(v);
- return v;
- },
nodeNameList: function() {
var nl = [];
- // if we are in the middel of updating the topology
- // then use the last known node info
- var ni = self.topology._nodeInfo;
- if (self.topology._gettingTopo)
- ni = self.topology._lastNodeInfo;
- for (var id in ni) {
- nl.push(self.nameFromId(id));
+ for (var id in self.topology._nodeInfo) {
+ nl.push(self.nameFromId(id));
}
return nl.sort();
},
nodeIdList: function() {
var nl = [];
- // if we are in the middel of updating the topology
- // then use the last known node info
- var ni = self.topology._nodeInfo;
- if (self.topology._gettingTopo)
- ni = self.topology._lastNodeInfo;
- for (var id in ni) {
- nl.push(id);
+ for (var id in self.topology._nodeInfo) {
+ nl.push(id);
}
return nl.sort();
},
- nodeList: function () {
+ nodeList: function() {
var nl = [];
- var ni = self.topology._nodeInfo;
- if (self.topology._gettingTopo)
- ni = self.topology._lastNodeInfo;
- for (var id in ni) {
- nl.push({name: self.nameFromId(id), id: id});
+ for (var id in self.topology._nodeInfo) {
+ nl.push({
+ name: self.nameFromId(id),
+ id: id
+ });
}
return nl;
},
// given an attribute name array, find the value at the same index in the values array
- valFor: function (aAr, vAr, key) {
- var idx = aAr.indexOf(key);
- if ((idx > -1) && (idx < vAr.length)) {
- return vAr[idx];
+ valFor: function(aAr, vAr, key) {
+ var idx = aAr.indexOf(key);
+ if ((idx > -1) && (idx < vAr.length)) {
+ return vAr[idx];
+ }
+ return null;
+ },
+
+ isArtemis: function(d) {
+ return d.nodeType === 'on-demand' && !d.properties.product;
+ },
+
+ isQpid: function(d) {
+ return d.nodeType === 'on-demand' && (d.properties && d.properties.product === 'qpid-cpp');
+ },
+
+ isAConsole: function(properties, connectionId, nodeType, key) {
+ return self.isConsole({
+ properties: properties,
+ connectionId: connectionId,
+ nodeType: nodeType,
+ key: key
+ })
+ },
+ isConsole: function(d) {
+ // use connection properties if available
+ if (d && d['properties'] && d['properties']['console_identifier'] == 'Dispatch console')
+ return true;
+ return false;
+ },
+
+ flatten: function(attributes, result) {
+ var flat = {}
+ attributes.forEach(function(attr, i) {
+ if (result && result.length > i)
+ flat[attr] = result[i]
+ })
+ return flat;
+ },
+ isConsoleLink: function(link) {
+ // find the connection for this link
+ var conns = self.topology.nodeInfo()[link.nodeId]['.connection']
+ var connIndex = conns.attributeNames.indexOf("identity")
+ var linkCons = conns.results.filter(function(conn) {
+ return conn[connIndex] === link.connectionId;
+ })
+ var conn = self.flatten(conns.attributeNames, linkCons[0]);
+
+ return self.isConsole(conn)
+ },
+
+ quiesceLink: function(nodeId, name) {
+ function gotMethodResponse(nodeName, entity, response, context) {
+ var statusCode = context.message.application_properties.statusCode;
+ if (statusCode < 200 || statusCode >= 300) {
+ Core.notification('error', context.message.application_properties.statusDescription);
+ }
+ }
+ var attributes = {
+ adminStatus: 'disabled',
+ name: name
+ };
+ self.sendMethod(nodeId, "router.link", attributes, "UPDATE", undefined, gotMethodResponse)
+ },
+ addr_text: function(addr) {
+ if (!addr)
+ return "-"
+ if (addr[0] == 'M')
+ return addr.substring(2)
+ else
+ return addr.substring(1)
+ },
+ addr_class: function(addr) {
+ if (!addr) return "-"
+ if (addr[0] == 'M') return "mobile"
+ if (addr[0] == 'R') return "router"
+ if (addr[0] == 'A') return "area"
+ if (addr[0] == 'L') return "local"
+ if (addr[0] == 'C') return "link-incoming"
+ if (addr[0] == 'D') return "link-outgoing"
+ if (addr[0] == 'T') return "topo"
+ return "unknown: " + addr[0]
+ },
+ identity_clean: function(identity) {
+ if (!identity)
+ return "-"
+ var pos = identity.indexOf('/')
+ if (pos >= 0)
+ return identity.substring(pos + 1)
+ return identity
+ },
+
+ // check if all nodes have this entity. if not, get them
+ initEntity: function (entity, callback) {
+ var callNeeded = Object.keys(self.topology._nodeInfo).some( function (node) {
+ return !angular.isDefined(self.topology._nodeInfo[node][entity])
+ })
+ if (callNeeded) {
+ self.loadEntity(entity, callback)
+ } else
+ callback()
+ },
+
+ // get/refresh entities for all nodes
+ loadEntity: function (entities, callback) {
+ if (Object.prototype.toString.call(entities) !== '[object Array]') {
+ entities = [entities]
+ }
+ var qdepth = self.maxCorrelatorDepth - self.correlator.depth()
+ if (qdepth <= 0)
+ qdepth = 1;
+ var q = queue(qdepth)
+ for (node in self.topology._nodeInfo) {
+ for (var i=0; i<entities.length; ++i) {
+ var entity = entities[i]
+ q.defer(self.ensureNodeInfo, node, entity, [], q)
}
- return null;
- },
-
- isArtemis: function (d) {
- return d.nodeType ==='on-demand' && !d.properties.product;
- },
-
- isQpid: function (d) {
- return d.nodeType ==='on-demand' && (d.properties && d.properties.product === 'qpid-cpp');
- },
-
- isAConsole: function (properties, connectionId, nodeType, key) {
- return self.isConsole({properties: properties, connectionId: connectionId, nodeType: nodeType, key: key})
- },
- isConsole: function (d) {
- // use connection properties if available
- if (d && d['properties'] && d['properties']['console_identifier'] == 'Dispatch console')
- return true;
- return false;
- },
-
- flatten: function (attributes, result) {
- var flat = {}
- attributes.forEach( function (attr, i) {
- if (result && result.length > i)
- flat[attr] = result[i]
- })
- return flat;
- },
- isConsoleLink: function (link) {
- // find the connection for this link
- var conns = self.topology.nodeInfo()[link.nodeId]['.connection']
- var connIndex = conns.attributeNames.indexOf("identity")
- var linkCons = conns.results.filter ( function (conn) {
- return conn[connIndex] === link.connectionId;
- })
- var conn = self.flatten(conns.attributeNames, linkCons[0]);
-
- return self.isConsole(conn)
- },
-
- quiesceLink: function (nodeId, name) {
- function gotMethodResponse (nodeName, entity, response, context) {
- var statusCode = context.message.application_properties.statusCode;
- if (statusCode < 200 || statusCode >= 300) {
- Core.notification('error', context.message.application_properties.statusDescription);
- }
- }
- var attributes = {adminStatus: 'disabled', name: name};
- self.sendMethod(nodeId, "router.link", attributes, "UPDATE", undefined, gotMethodResponse)
- },
- addr_text: function (addr) {
- if (!addr)
- return "-"
- if (addr[0] == 'M')
- return addr.substring(2)
- else
- return addr.substring(1)
- },
- addr_class: function (addr) {
- if (!addr) return "-"
- if (addr[0] == 'M') return "mobile"
- if (addr[0] == 'R') return "router"
- if (addr[0] == 'A') return "area"
- if (addr[0] == 'L') return "local"
- if (addr[0] == 'C') return "link-incoming"
- if (addr[0] == 'D') return "link-outgoing"
- if (addr[0] == 'T') return "topo"
- return "unknown: " + addr[0]
- },
- identity_clean: function (identity) {
- if (!identity)
- return "-"
- var pos = identity.indexOf('/')
- if (pos >= 0)
- return identity.substring(pos + 1)
- return identity
- },
+ }
+ q.await(function (error) {
+ callback();
+ })
+ },
+
+ setUpdateEntities: function (entities) {
+ self.topology._autoUpdatedEntities = entities
+ },
+ addUpdateEntity: function (entity) {
+ if (self.topology._autoUpdatedEntities.indexOf(entity) == -1)
+ self.topology._autoUpdatedEntities.push(entity)
+ },
+ delUpdateEntity: function (entity) {
+ var index = self.topology._autoUpdatedEntities.indexOf(entity)
+ if (index != -1)
+ self.topology._autoUpdatedEntities.splice(index, 1)
+ },
/*
* send the management messages that build up the topology
@@ -363,559 +401,609 @@ var QDR = (function(QDR) {
_nodeInfo: {},
_lastNodeInfo: {},
_expected: {},
- _timerHandle: null,
+ _waitTimer: null,
+ _getTimer: null,
+ _autoUpdatedEntities: [],
+ _lastRequestTime: null,
- nodeInfo: function () {
- return this._gettingTopo ? this._lastNodeInfo : this._nodeInfo;
+ nodeInfo: function() {
+ return self.topology._nodeInfo
},
- get: function () {
- if (this._gettingTopo)
- return;
- if (!self.connected) {
- QDR.log.debug("topology get failed because !self.connected")
- return;
- }
- this._lastNodeInfo = angular.copy(this._nodeInfo);
- this._gettingTopo = true;
-
- self.errorText = undefined;
- this.cleanUp(this._nodeInfo);
- this._nodeInfo = {};
- this._expected = {};
-
- // get the list of nodes to query.
- // once this completes, we will get the info for each node returned
- self.getRemoteNodeInfo( function (response, context) {
- //QDR.log.debug("got remote node list of ");
- //console.dump(response);
- if( Object.prototype.toString.call( response ) === '[object Array]' ) {
- if (response.length === 0) {
- // there is only one router, get its node id from the reeciiver
- //"amqp:/_topo/0/Router.A/temp.aSO3+WGaoNUgGVx"
- var address = context.receiver.remote.attach.source.address;
- var addrParts = address.split('/')
- addrParts.splice(addrParts.length-1, 1, '$management')
- response = [addrParts.join('/')]
- }
- // we expect a response for each of these nodes
- self.topology.wait(self.timeout);
- for (var i=0; i<response.length; ++i) {
- self.makeMgmtCalls(response[i]);
- }
- };
- });
+ get: function() {
+ if (self.topology._gettingTopo) {
+ QDR.log.debug("asked to get topology but was already getting it")
+ return;
+ }
+ if (!self.connected) {
+ QDR.log.debug("topology get failed because !self.connected")
+ return;
+ }
+ if (self.topology._getTimer) {
+ clearTimeout(self.topology._getTimer)
+ self.topology._getTimer = null
+ }
+
+ //QDR.log.debug("starting get topology with correlator.depth of " + self.correlator.depth())
+ //self.topology._lastNodeInfo = angular.copy(self.topology._nodeInfo);
+ self.topology._gettingTopo = true;
+
+ self.errorText = undefined;
+ //self.topology.cleanUp(self.topology._nodeInfo);
+ //self.topology._nodeInfo = {};
+ self.topology._expected = {};
+
+ // get the list of nodes to query.
+ // once this completes, we will get the info for each node returned
+ self.getRemoteNodeInfo(function(response, context) {
+ //QDR.log.debug("got remote node list of ");
+ if (Object.prototype.toString.call(response) === '[object Array]') {
+ // remove dropped nodes
+ var keys = Object.keys(self.topology._nodeInfo)
+ for (var i=0; i<keys.length; ++i) {
+ if (response.indexOf(keys[i]) < 0) {
+ delete self.topology._nodeInfo[keys[i]]
+ }
+ }
+ // add any new nodes
+ for (var i=0; i<response.length; ++i) {
+ if (!angular.isDefined(self.topology._nodeInfo[response[i]])) {
+ self.topology._nodeInfo[response[i]] = {};
+ }
+ }
+
+ // also refresh any entities that were requested
+ var qdepth = self.maxCorrelatorDepth - self.correlator.depth()
+ if (qdepth <= 0)
+ qdepth = 1;
+ q = queue(qdepth)
+ for (node in self.topology._nodeInfo) {
+ for (var i=0; i<self.topology._autoUpdatedEntities.length; ++i) {
+ var entity = self.topology._autoUpdatedEntities[i]
+ self.topology.expect(node, entity)
+ q.defer(self.ensureNodeInfo, node, entity, [], q)
+ }
+ }
+ self.topology._lastRequestTime = Date.now()
+ self.topology._waitTimer = setTimeout(self.topology.timedOut, self.timeout * 1000, q);
+ q.await(function (error) {
+QDR.log.debug("Done awaiting for topology. error is " + error)
+ if (!error)
+ self.topology.ondone();
+ })
+ };
+ });
},
- cleanUp: function (obj) {
-/*
- for (var o in obj) {
- QDR.log.debug("cleaning up");
- console.dump(obj[o]);
- if (isNaN(parseInt(o)))
- this.cleanUp(obj[o]);
- }
-*/
- if (obj)
- delete obj;
+ cleanUp: function(obj) {
+ if (obj)
+ delete obj;
},
- wait: function (timeout) {
- this.timerHandle = setTimeout(this.timedOut, timeout * 1000);
- },
- timedOut: function () {
- // a node dropped out. this happens when the get-mgmt-nodex
- // results contains more nodes than actually respond within
- // the timeout. However, if the responses we get don't contain
- // the missing node, assume we are done.
- QDR.log.info("timed out waiting for management responses");
- // note: can't use 'this' in a timeout handler
- self.topology.miniDump("state at timeout");
- // check if _nodeInfo is consistent
- if (self.topology.isConsistent()) {
- //TODO: notify controllers which node was dropped
- // so they can keep an event log
- self.topology.ondone();
- return;
- }
- self.topology.onerror(Error("Timed out waiting for management responses"));
+ timedOut: function(q) {
+ // a node dropped out. this happens when the get-mgmt-nodex
+ // results contains more nodes than actually respond within
+ // the timeout. However, if the responses we get don't contain
+ // the missing node, assume we are done.
+ QDR.log.info("timed out waiting for management responses");
+ // note: can't use 'this' in a timeout handler
+ self.topology.miniDump("state at timeout");
+ // check if _nodeInfo is consistent
+ if (self.topology.isConsistent()) {
+ q.abort()
+ self.topology.ondone();
+ return;
+ }
+ self.topology.onerror(Error("management responses are not consistent"));
},
- isConsistent: function () {
- // see if the responses we have so far reference any nodes
- // for which we don't have a response
- var gotKeys = {};
- for (var id in this._nodeInfo) {
- var onode = this._nodeInfo[id];
- var conn = onode['.connection'];
- // get list of node names in the connection data
- if (conn) {
- var containerIndex = conn.attributeNames.indexOf('container');
- var connectionResults = conn.results;
- if (containerIndex >= 0)
- for (var j=0; j < connectionResults.length; ++j) {
- // inter-router connection to a valid dispatch connection name
- gotKeys[connectionResults[j][containerIndex]] = ""; // just add the key
- }
+ isConsistent: function() {
+ // see if the responses we have so far reference any nodes
+ // for which we don't have a response
+ var gotKeys = {};
+ for (var id in self.topology._nodeInfo) {
+ var onode = self.topology._nodeInfo[id];
+ var conn = onode['.connection'];
+ // get list of node names in the connection data
+ if (conn) {
+ var containerIndex = conn.attributeNames.indexOf('container');
+ var connectionResults = conn.results;
+ if (containerIndex >= 0)
+ for (var j = 0; j < connectionResults.length; ++j) {
+ // inter-router connection to a valid dispatch connection name
+ gotKeys[connectionResults[j][containerIndex]] = ""; // just add the key
}
}
- // gotKeys now contains all the container names that we have received
- // Are any of the keys that are still expected in the gotKeys list?
- var keys = Object.keys(gotKeys);
- for (var id in this._expected) {
- var key = self.nameFromId(id);
- if (key in keys)
- return false;
- }
- return true;
+ }
+ // gotKeys now contains all the container names that we have received
+ // Are any of the keys that are still expected in the gotKeys list?
+ var keys = Object.keys(gotKeys);
+ for (var id in self.topology._expected) {
+ var key = self.nameFromId(id);
+ if (key in keys)
+ return false;
+ }
+ return true;
},
-
- addNodeInfo: function (id, entity, values) {
- // save the results in the nodeInfo object
- if (id) {
- if (!(id in self.topology._nodeInfo)) {
- self.topology._nodeInfo[id] = {};
- }
- self.topology._nodeInfo[id][entity] = values;
+
+ addNodeInfo: function(id, entity, values) {
+ if (self.topology._waitTimer)
+ clearTimeout(self.topology._waitTimer)
+ self.topology._waitTimer = setTimeout(self.topology.timedOut, self.timeout * 1000, q);
+ // save the results in the nodeInfo object
+ if (id) {
+ if (!(id in self.topology._nodeInfo)) {
+ self.topology._nodeInfo[id] = {};
}
-
- // remove the id / entity from _expected
- if (id in self.topology._expected) {
- var entities = self.topology._expected[id];
- var idx = entities.indexOf(entity);
- if (idx > -1) {
- entities.splice(idx, 1);
- if (entities.length == 0)
- delete self.topology._expected[id];
- }
+ self.topology._nodeInfo[id][entity] = values;
+ }
+
+ // remove the id / entity from _expected
+ if (id in self.topology._expected) {
+ var entities = self.topology._expected[id];
+ var idx = entities.indexOf(entity);
+ if (idx > -1) {
+ entities.splice(idx, 1);
+ if (entities.length == 0)
+ delete self.topology._expected[id];
}
- // see if the expected obj is empty
- if (Object.getOwnPropertyNames(self.topology._expected).length == 0)
- self.topology.ondone();
- self.topology.cleanUp(values);
+ }
+ // see if the expected obj is empty
+ //if (Object.getOwnPropertyNames(self.topology._expected).length == 0)
+ // self.topology.ondone();
+ self.topology.cleanUp(values);
},
- expect: function (id, key) {
- if (!key || !id)
- return;
- if (!(id in this._expected))
- this._expected[id] = [];
- if (this._expected[id].indexOf(key) == -1)
- this._expected[id].push(key);
+ expect: function(id, key) {
+ if (!key || !id)
+ return;
+ if (!(id in self.topology._expected))
+ self.topology._expected[id] = [];
+ if (self.topology._expected[id].indexOf(key) == -1)
+ self.topology._expected[id].push(key);
},
- ondone: function () {
- clearTimeout(this.timerHandle);
- this._gettingTopo = false;
- //this.miniDump();
- //this.dump();
- self.notifyTopologyDone();
- },
- dump: function (prefix) {
- if (prefix)
- QDR.log.info(prefix);
- QDR.log.info("---");
- for (var key in this._nodeInfo) {
- QDR.log.info(key);
- console.dump(this._nodeInfo[key]);
- QDR.log.info("---");
- }
- QDR.log.debug("was still expecting:");
- console.dump(this._expected);
+ ondone: function() {
+ clearTimeout(self.topology._waitTimer);
+ self.topology._waitTimer = null;
+ self.topology._gettingTopo = false;
+ if (self.updating)
+ self.topology._getTimer = setTimeout(self.topology.get, self.updateInterval);
+ self.notifyTopologyDone();
},
- miniDump: function (prefix) {
- if (prefix)
- QDR.log.info(prefix);
- QDR.log.info("---");
- console.dump(Object.keys(this._nodeInfo));
+ dump: function(prefix) {
+ if (prefix)
+ QDR.log.info(prefix);
+ QDR.log.info("---");
+ for (var key in self.topology._nodeInfo) {
+ QDR.log.info(key);
+ console.dump(self.topology._nodeInfo[key]);
QDR.log.info("---");
+ }
+ QDR.log.debug("was still expecting:");
+ console.dump(self.topology._expected);
},
- onerror: function (err) {
- this._gettingTopo = false;
- QDR.log.debug("Err:" + err);
- self.executeDisconnectActions();
+ miniDump: function(prefix) {
+ if (prefix)
+ QDR.log.info(prefix);
+ QDR.log.info("---");
+ console.dump(Object.keys(self.topology._nodeInfo));
+ QDR.log.info("---");
+ },
+ onerror: function(err) {
+ self.topology._gettingTopo = false;
+ QDR.log.debug("Err:" + err);
+ self.executeDisconnectActions();
}
},
- getRemoteNodeInfo: function (callback) {
- //QDR.log.debug("getRemoteNodeInfo called");
+ getRemoteNodeInfo: function(callback) {
+ //QDR.log.debug("getRemoteNodeInfo called");
var ret;
// first get the list of remote node names
- self.correlator.request(
- ret = self.sendMgmtQuery('GET-MGMT-NODES')
- ).then(ret.id, function(response, context) {
- callback(response, context);
- self.topology.cleanUp(response);
- }, ret.error);
- },
-
- makeMgmtCalls: function (id) {
- var keys = [".router", ".connection", ".container", ".router.node", ".listener", ".router.link"];
- $.each(keys, function (i, key) {
- self.topology.expect(id, key);
- self.getNodeInfo(id, key, [], self.topology.addNodeInfo);
- });
- },
-
- getNodeInfo: function (nodeName, entity, attrs, callback) {
- //QDR.log.debug("getNodeInfo called with nodeName: " + nodeName + " and entity " + entity);
- var ret;
self.correlator.request(
- ret = self.sendQuery(nodeName, entity, attrs)
- ).then(ret.id, function(response) {
- callback(nodeName, entity, response);
- //self.topology.addNodeInfo(nodeName, entity, response);
- //self.topology.cleanUp(response);
+ ret = self.sendMgmtQuery('GET-MGMT-NODES')
+ ).then(ret.id, function(response, context) {
+ callback(response, context);
+ self.topology.cleanUp(response);
}, ret.error);
},
- getMultipleNodeInfo: function (nodeNames, entity, attrs, callback, selectedNodeId, aggregate) {
- if (!angular.isDefined(aggregate))
- aggregate = true;
- var responses = {};
- var gotNodesResult = function (nodeName, dotentity, response) {
- responses[nodeName] = response;
- if (Object.keys(responses).length == nodeNames.length) {
- if (aggregate)
- self.aggregateNodeInfo(nodeNames, entity, selectedNodeId, responses, callback);
- else {
- callback(nodeNames, entity, responses)
- }
- }
- }
-
- nodeNames.forEach( function (id) {
- self.getNodeInfo(id, '.'+entity, attrs, gotNodesResult);
- })
- //TODO: implement a timeout in case not all requests complete
- },
-
- aggregateNodeInfo: function (nodeNames, entity, selectedNodeId, responses, callback) {
- //QDR.log.debug("got all results for " + entity);
- // aggregate the responses
- var newResponse = {};
- var thisNode = responses[selectedNodeId];
- newResponse['attributeNames'] = thisNode.attributeNames;
- newResponse['results'] = thisNode.results;
- newResponse['aggregates'] = [];
- for (var i=0; i<thisNode.results.length; ++i) {
- var result = thisNode.results[i];
- var vals = [];
- result.forEach( function (val) {
- vals.push({sum: val, detail: []})
- })
- newResponse.aggregates.push(vals);
- }
- var nameIndex = thisNode.attributeNames.indexOf("name");
- var ent = self.schema.entityTypes[entity];
- var ids = Object.keys(responses);
- ids.sort();
- ids.forEach( function (id) {
- var response = responses[id];
- var results = response.results;
- results.forEach( function (result) {
- // find the matching result in the aggregates
- var found = newResponse.aggregates.some( function (aggregate, j) {
- if (aggregate[nameIndex].sum === result[nameIndex]) {
- // result and aggregate are now the same record, add the graphable values
- newResponse.attributeNames.forEach( function (key, i) {
- if (ent.attributes[key] && ent.attributes[key].graph) {
- if (id != selectedNodeId)
- aggregate[i].sum += result[i];
- }
- aggregate[i].detail.push({node: self.nameFromId(id)+':', val: result[i]})
- })
- return true; // stop looping
- }
- return false; // continute looking for the aggregate record
- })
- if (!found) {
- // this attribute was not found in the aggregates yet
- // because it was not in the selectedNodeId's results
- var vals = [];
- result.forEach( function (val) {
- vals.push({sum: val, detail: [{node: self.nameFromId(id), val: val}]})
- })
- newResponse.aggregates.push(vals)
- }
- })
- })
- callback(nodeNames, entity, newResponse);
- },
-
-
- getSchema: function () {
+ makeMgmtCalls: function(id) {
+QDR.log.debug("called makeMgmtCalls with id of " + id)
+ var keys = [".router", ".connection", ".container", ".router.node", ".listener", ".router.link"];
+ $.each(keys, function(i, key) {
+ self.topology.expect(id, key);
+ self.getNodeInfo(id, key, [], self.topology.addNodeInfo);
+ });
+ },
+
+ // should only be called from a q.defer() statement
+ ensureNodeInfo: function (nodeId, entity, attrs, q, callback) {
+ QDR.log.debug("queuing request for " + nodeId + " " + entity)
+ self.getNodeInfo(nodeId, entity, attrs, function (nodeName, dotentity, response) {
+ QDR.log.debug("got response for " + nodeId + " " + entity)
+ self.topology.addNodeInfo(nodeName, dotentity, response)
+ //self.topology._nodeInfo[nodeName][dotentity] = response
+ callback(null)
+ })
+ return {
+ abort: function() {
+ delete self.topology._nodeInfo[nodeId]
+ //self.topology._nodeInfo[nodeId][entity] = {attributeNames: [], results: [[]]};
+ }
+ }
+ },
+
+ getMultipleNodeInfo: function(nodeNames, entity, attrs, callback, selectedNodeId, aggregate) {
+ if (!angular.isDefined(aggregate))
+ aggregate = true;
+ var responses = {};
+ var gotNodesResult = function(nodeName, dotentity, response) {
+ responses[nodeName] = response;
+ if (Object.keys(responses).length == nodeNames.length) {
+ if (aggregate)
+ self.aggregateNodeInfo(nodeNames, entity, selectedNodeId, responses, callback);
+ else {
+ callback(nodeNames, entity, responses)
+ }
+ }
+ }
+
+ nodeNames.forEach(function(id) {
+ self.getNodeInfo(id, '.' + entity, attrs, gotNodesResult);
+ })
+ //TODO: implement a timeout in case not all requests complete
+ },
+
+ aggregateNodeInfo: function(nodeNames, entity, selectedNodeId, responses, callback) {
+ //QDR.log.debug("got all results for " + entity);
+ // aggregate the responses
+ var newResponse = {};
+ var thisNode = responses[selectedNodeId];
+ newResponse['attributeNames'] = thisNode.attributeNames;
+ newResponse['results'] = thisNode.results;
+ newResponse['aggregates'] = [];
+ for (var i = 0; i < thisNode.results.length; ++i) {
+ var result = thisNode.results[i];
+ var vals = [];
+ result.forEach(function(val) {
+ vals.push({
+ sum: val,
+ detail: []
+ })
+ })
+ newResponse.aggregates.push(vals);
+ }
+ var nameIndex = thisNode.attributeNames.indexOf("name");
+ var ent = self.schema.entityTypes[entity];
+ var ids = Object.keys(responses);
+ ids.sort();
+ ids.forEach(function(id) {
+ var response = responses[id];
+ var results = response.results;
+ results.forEach(function(result) {
+ // find the matching result in the aggregates
+ var found = newResponse.aggregates.some(function(aggregate, j) {
+ if (aggregate[nameIndex].sum === result[nameIndex]) {
+ // result and aggregate are now the same record, add the graphable values
+ newResponse.attributeNames.forEach(function(key, i) {
+ if (ent.attributes[key] && ent.attributes[key].graph) {
+ if (id != selectedNodeId)
+ aggregate[i].sum += result[i];
+ }
+ aggregate[i].detail.push({
+ node: self.nameFromId(id) + ':',
+ val: result[i]
+ })
+ })
+ return true; // stop looping
+ }
+ return false; // continute looking for the aggregate record
+ })
+ if (!found) {
+ // this attribute was not found in the aggregates yet
+ // because it was not in the selectedNodeId's results
+ var vals = [];
+ result.forEach(function(val) {
+ vals.push({
+ sum: val,
+ detail: [{
+ node: self.nameFromId(id),
+ val: val
+ }]
+ })
+ })
+ newResponse.aggregates.push(vals)
+ }
+ })
+ })
+ callback(nodeNames, entity, newResponse);
+ },
+
+
+ getSchema: function(callback) {
//QDR.log.debug("getting schema");
var ret;
self.correlator.request(
- ret = self.sendMgmtQuery('GET-SCHEMA')
+ ret = self.sendMgmtQuery('GET-SCHEMA')
).then(ret.id, function(response) {
- //QDR.log.debug("Got schema response");
- // remove deprecated
- for (var entityName in response.entityTypes) {
- var entity = response.entityTypes[entityName]
- if (entity.deprecated) {
- // deprecated entity
- delete response.entityTypes[entityName]
- } else {
- for (var attributeName in entity.attributes) {
- var attribute = entity.attributes[attributeName]
- if (attribute.deprecated) {
- // deprecated attribute
- delete response.entityTypes[entityName].attributes[attributeName]
- }
- }
- }
- }
- self.schema = response;
- self.topology.get();
+ //QDR.log.debug("Got schema response");
+ // remove deprecated
+ for (var entityName in response.entityTypes) {
+ var entity = response.entityTypes[entityName]
+ if (entity.deprecated) {
+ // deprecated entity
+ delete response.entityTypes[entityName]
+ } else {
+ for (var attributeName in entity.attributes) {
+ var attribute = entity.attributes[attributeName]
+ if (attribute.deprecated) {
+ // deprecated attribute
+ delete response.entityTypes[entityName].attributes[attributeName]
+ }
+ }
+ }
+ }
+ self.schema = response;
}, ret.error);
+ callback()
},
- getNodeInfo: function (nodeName, entity, attrs, callback) {
+ getNodeInfo: function(nodeName, entity, attrs, callback) {
//QDR.log.debug("getNodeInfo called with nodeName: " + nodeName + " and entity " + entity);
var ret;
self.correlator.request(
- ret = self.sendQuery(nodeName, entity, attrs)
+ ret = self.sendQuery(nodeName, entity, attrs)
).then(ret.id, function(response) {
- callback(nodeName, entity, response);
- //self.topology.addNodeInfo(nodeName, entity, response);
- //self.topology.cleanUp(response);
+ callback(nodeName, entity, response);
}, ret.error);
},
- sendMethod: function (nodeId, entity, attrs, operation, props, callback) {
- var ret;
- self.correlator.request(
- ret = self._sendMethod(nodeId, entity, attrs, operation, props)
- ).then(ret.id, function (response, context) {
- callback(nodeId, entity, response, context);
- }, ret.error);
- },
+ sendMethod: function(nodeId, entity, attrs, operation, props, callback) {
+ var ret;
+ self.correlator.request(
+ ret = self._sendMethod(nodeId, entity, attrs, operation, props)
+ ).then(ret.id, function(response, context) {
+ callback(nodeId, entity, response, context);
+ }, ret.error);
+ },
- _fullAddr: function (toAddr) {
+ _fullAddr: function(toAddr) {
var toAddrParts = toAddr.split('/');
if (toAddrParts.shift() != "amqp:") {
- self.topology.error(Error("unexpected format for router address: " + toAddr));
- return;
+ self.topology.error(Error("unexpected format for router address: " + toAddr));
+ return;
}
//var fullAddr = self.toAddress + "/" + toAddrParts.join('/');
- var fullAddr = toAddrParts.join('/');
- return fullAddr;
- },
-
- _sendMethod: function (toAddr, entity, attrs, operation, props) {
- var fullAddr = self._fullAddr(toAddr);
- var ret = {id: self.correlator.corr()};
- if (!self.sender || !self.sendable) {
- ret.error = "no sender"
- return ret;
- }
- try {
- var application_properties = {
- operation: operation
- }
- if (entity) {
- var ent = self.schema.entityTypes[entity];
- var fullyQualifiedType = ent ? ent.fullyQualifiedType : entity;
- application_properties.type = fullyQualifiedType || entity;
- }
- if (attrs.name)
- application_properties.name = attrs.name;
- if (props) {
- jQuery.extend(application_properties, props);
- }
- var msg = {
- body: attrs,
- properties: {
- to: fullAddr,
- reply_to: self.receiver.remote.attach.source.address,
- correlation_id: ret.id
- },
- application_properties: application_properties
- }
- self.sender.send( msg );
- console.dump("------- method called -------")
- console.dump (msg)
- }
- catch (e) {
- error = "error sending: " + e;
- QDR.log.error(error)
- ret.error = error;
- }
- return ret;
- },
-
- sendQuery: function(toAddr, entity, attrs, operation) {
+ var fullAddr = toAddrParts.join('/');
+ return fullAddr;
+ },
+
+ _sendMethod: function(toAddr, entity, attrs, operation, props) {
+ var ret = {
+ id: self.correlator.corr()
+ };
+ var fullAddr = self._fullAddr(toAddr);
+ if (!self.sender || !self.sendable) {
+ ret.error = "no sender"
+ return ret;
+ }
+ try {
+ var application_properties = {
+ operation: operation
+ }
+ if (entity) {
+ var ent = self.schema.entityTypes[entity];
+ var fullyQualifiedType = ent ? ent.fullyQualifiedType : entity;
+ application_properties.type = fullyQualifiedType || entity;
+ }
+ if (attrs.name)
+ application_properties.name = attrs.name;
+ if (props) {
+ jQuery.extend(application_properties, props);
+ }
+ var msg = {
+ body: attrs,
+ properties: {
+ to: fullAddr,
+ reply_to: self.receiver.remote.attach.source.address,
+ correlation_id: ret.id
+ },
+ application_properties: application_properties
+ }
+ self.sender.send(msg);
+ console.dump("------- method called -------")
+ console.dump(msg)
+ } catch (e) {
+ error = "error sending: " + e;
+ QDR.log.error(error)
+ ret.error = error;
+ }
+ return ret;
+ },
+
+ sendQuery: function(toAddr, entity, attrs, operation) {
operation = operation || "QUERY"
- var fullAddr = self._fullAddr(toAddr);
+ var fullAddr = self._fullAddr(toAddr);
- var body;
- if (attrs)
- body = {
- "attributeNames": attrs,
- }
- else
- body = {
- "attributeNames": [],
- }
- if (entity[0] === '.')
- entity = entity.substr(1, entity.length-1)
- var prefix = "org.apache.qpid.dispatch."
- var configs = ["address", "autoLink", "linkRoute"]
- if (configs.indexOf(entity) > -1)
- prefix += "router.config."
- return self._send(body, fullAddr, operation, prefix + entity);
- },
-
- sendMgmtQuery: function (operation) {
- return self._send([], "/$management", operation);
- },
-
- _send: function (body, to, operation, entityType) {
- var ret = {id: self.correlator.corr()};
- if (!self.sender || !self.sendable) {
- ret.error = "no sender"
- return ret;
- }
- try {
- var application_properties = {
- operation: operation,
- type: "org.amqp.management",
- name: "self"
- };
- if (entityType)
- application_properties.entityType = entityType;
-
- self.sender.send({
- body: body,
- properties: {
- to: to,
- reply_to: self.receiver.remote.attach.source.address,
- correlation_id: ret.id
- },
- application_properties: application_properties
- })
- }
- catch (e) {
- error = "error sending: " + e;
- QDR.log.error(error)
- ret.error = error;
- }
- return ret;
- },
+ var body;
+ if (attrs) {
+ body = {
+ "attributeNames": attrs,
+ }
+ } else {
+ body = {
+ "attributeNames": [],
+ }
+ }
+ if (entity[0] === '.')
+ entity = entity.substr(1, entity.length - 1)
+ var prefix = "org.apache.qpid.dispatch."
+ var configs = ["address", "autoLink", "linkRoute"]
+ if (configs.indexOf(entity) > -1)
+ prefix += "router.config."
+ return self._send(body, fullAddr, operation, prefix + entity);
+ },
+
+ sendMgmtQuery: function(operation) {
+ return self._send([], "/$management", operation);
+ },
+
+ _send: function(body, to, operation, entityType) {
+ var ret = {
+ id: self.correlator.corr()
+ };
+ if (!self.sender || !self.sendable) {
+ ret.error = "no sender"
+ return ret;
+ }
+ try {
+ var application_properties = {
+ operation: operation,
+ type: "org.amqp.management",
+ name: "self"
+ };
+ if (entityType)
+ application_properties.entityType = entityType;
+
+ self.sender.send({
+ body: body,
+ properties: {
+ to: to,
+ reply_to: self.receiver.remote.attach.source.address,
+ correlation_id: ret.id
+ },
+ application_properties: application_properties
+ })
+ } catch (e) {
+ error = "error sending: " + e;
+ QDR.log.error(error)
+ ret.error = error;
+ }
+ return ret;
+ },
disconnect: function() {
self.connection.close();
- self.errorText = "Disconnected."
+ self.errorText = "Disconnected."
},
connect: function(options) {
self.options = options;
self.topologyInitialized = false;
- if (!self.connected) {
- var okay = {connection: false, sender: false, receiver: false}
- var port = options.port || 5673;
- var baseAddress = options.address + ':' + port;
- var ws = self.rhea.websocket_connect(WebSocket);
- self.toAddress = "amqp://" + baseAddress;
- self.connectionError = undefined;
-
- var stop = function (context) {
- //self.stopUpdating();
- okay.sender = false;
- okay.receiver = false;
- okay.connected = false;
- self.connected = false;
- self.sender = null;
- self.receiver = null;
- self.sendable = false;
- self.gotTopology = false;
- }
- var maybeStart = function () {
- if (okay.connection && okay.sender && okay.receiver && self.sendable && !self.connected) {
- QDR.log.info("okay to start")
- self.connected = true;
- self.connection = connection;
- self.sender = sender;
- self.receiver = receiver;
- self.onSubscription();
- self.gotTopology = false;
- }
- }
- var onDisconnect = function () {
- //QDR.log.warn("Disconnected");
- self.connectionError = true;
- stop();
- self.executeDisconnectActions();
- }
-
- QDR.log.debug("****** calling rhea.connect ********")
- var connection;
- try {
- connection = self.rhea.connect({
- connection_details:ws('ws://' + baseAddress, ["binary", "base64", "AMQWSB10"]),
- reconnect:true,
- properties: {console_identifier: 'Dispatch console'}
- });
- }
- catch (e) {
- QDR.log.debug("exception caught on connect")
- self.errorText = "Connection failed"
- onDisconnect();
- }
- if (!self.connectionError) {
- connection.on('connection_open', function (context) {
- QDR.log.debug("connection_opened")
- okay.connection = true;
- okay.receiver = false;
- okay.sender = false;
- })
- connection.on('disconnected', function (context) {
- QDR.log.debug("connection disconnected")
- self.errorText = "Unable to connect"
- onDisconnect();
- })
- connection.on('connection_close', function (context) {
- QDR.log.debug("connection closed")
- self.errorText = "Disconnected"
- onDisconnect();
- })
-
- var sender = connection.open_sender();
- sender.on('sender_open', function (context) {
- QDR.log.debug("sender_opened")
- okay.sender = true
- maybeStart()
- })
- sender.on('sendable', function (context) {
- //QDR.log.debug("sendable")
- self.sendable = true;
- maybeStart();
- })
-
- var receiver = connection.open_receiver({source: {dynamic: true}});
- receiver.on('receiver_open', function (context) {
- QDR.log.debug("receiver_opened")
- okay.receiver = true;
- maybeStart()
- })
- receiver.on("message", function (context) {
- self.correlator.resolve(context);
- });
- }
- }
+ if (!self.connected) {
+ var okay = {
+ connection: false,
+ sender: false,
+ receiver: false
+ }
+ var port = options.port || 5673;
+ var baseAddress = options.address + ':' + port;
+ var ws = self.rhea.websocket_connect(WebSocket);
+ self.toAddress = "amqp://" + baseAddress;
+ self.connectionError = undefined;
+
+ var stop = function(context) {
+ //self.stopUpdating();
+ okay.sender = false;
+ okay.receiver = false;
+ okay.connected = false;
+ self.connected = false;
+ self.sender = null;
+ self.receiver = null;
+ self.sendable = false;
+ self.gotTopology = false;
+ }
+ var maybeStart = function() {
+ if (okay.connection && okay.sender && okay.receiver && self.sendable && !self.connected) {
+ QDR.log.info("okay to start")
+ self.connected = true;
+ self.connection = connection;
+ self.sender = sender;
+ self.receiver = receiver;
+ self.onSubscription();
+ self.gotTopology = false;
+ }
+ }
+ var onDisconnect = function() {
+ //QDR.log.warn("Disconnected");
+ self.connectionError = true;
+ stop();
+ self.executeDisconnectActions();
+ }
+
+ QDR.log.debug("****** calling rhea.connect ********")
+ var connection;
+ try {
+ connection = self.rhea.connect({
+ connection_details: ws('ws://' + baseAddress, ["binary", "base64", "AMQWSB10"]),
+ reconnect: true,
+ properties: {
+ console_identifier: 'Dispatch console'
+ }
+ });
+ } catch (e) {
+ QDR.log.debug("exception caught on connect")
+ self.errorText = "Connection failed"
+ onDisconnect();
+ }
+ if (!self.connectionError) {
+ connection.on('connection_open', function(context) {
+ QDR.log.debug("connection_opened")
+ okay.connection = true;
+ okay.receiver = false;
+ okay.sender = false;
+ })
+ connection.on('disconnected', function(context) {
+ QDR.log.debug("connection disconnected")
+ self.errorText = "Unable to connect"
+ onDisconnect();
+ })
+ connection.on('connection_close', function(context) {
+ QDR.log.debug("connection closed")
+ self.errorText = "Disconnected"
+ onDisconnect();
+ })
+
+ var sender = connection.open_sender();
+ sender.on('sender_open', function(context) {
+ QDR.log.debug("sender_opened")
+ okay.sender = true
+ maybeStart()
+ })
+ sender.on('sendable', function(context) {
+ //QDR.log.debug("sendable")
+ self.sendable = true;
+ maybeStart();
+ })
+
+ var receiver = connection.open_receiver({
+ source: {
+ dynamic: true
+ }
+ });
+ receiver.on('receiver_open', function(context) {
+ QDR.log.debug("receiver_opened")
+ okay.receiver = true;
+ maybeStart()
+ })
+ receiver.on("message", function(context) {
+ self.correlator.resolve(context);
+ });
+ }
+ }
}
}
- return self;
+ return self;
}]);
return QDR;
}(QDR || {}));
(function() {
- console.dump = function(object) {
- if (window.JSON && window.JSON.stringify)
- QDR.log.info(JSON.stringify(object,undefined,2));
- else
- console.log(object);
- };
+ console.dump = function(object) {
+ if (window.JSON && window.JSON.stringify)
+ QDR.log.info(JSON.stringify(object, undefined, 2));
+ else
+ console.log(object);
+ };
})();
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fc43944/console/stand-alone/plugin/js/qdrSettings.js
----------------------------------------------------------------------
diff --git a/console/stand-alone/plugin/js/qdrSettings.js b/console/stand-alone/plugin/js/qdrSettings.js
index e81463a..f81818b 100644
--- a/console/stand-alone/plugin/js/qdrSettings.js
+++ b/console/stand-alone/plugin/js/qdrSettings.js
@@ -19,7 +19,7 @@ under the License.
/**
* @module QDR
*/
-var QDR = (function (QDR) {
+var QDR = (function(QDR) {
/**
* @method SettingsController
@@ -36,7 +36,13 @@ var QDR = (function (QDR) {
$scope.connectionErrorText = undefined;
$scope.forms = {};
- $scope.formEntity = angular.fromJson(localStorage[QDR.SETTINGS_KEY]) || {address: '', port: '', username: '', password: '', autostart: false};
+ $scope.formEntity = angular.fromJson(localStorage[QDR.SETTINGS_KEY]) || {
+ address: '',
+ port: '',
+ username: '',
+ password: '',
+ autostart: false
+ };
$scope.$watch('formEntity', function(newValue, oldValue) {
if (newValue !== oldValue) {
@@ -53,99 +59,118 @@ var QDR = (function (QDR) {
};
$scope.connect = function() {
- if (QDRService.connected) {
- QDRService.disconnect();
- return;
- }
-
- if ($scope.settings.$valid) {
- $scope.connectionError = false;
- $scope.connecting = true;
- $timeout( doConnect ) // timeout so connecting animation can display
- }
- }
-
- var doConnect = function () {
- if (!$scope.formEntity.address)
- $scope.formEntity.address = "localhost"
-
- console.log("attempting to connect to " + $scope.formEntity.address + ':' + $scope.formEntity.port);
- QDRService.addDisconnectAction(function () {
- $timeout( function () {
- QDR.log.debug("disconnect action called");
- $scope.connecting = false;
- $scope.connectionErrorText = QDRService.errorText;
- $scope.connectionError = true;
- })
- });
- QDRService.addConnectAction(function() {
- //QDR.log.debug("got connection notification");
- $timeout( function () {
- $scope.connecting = false;
- })
- });
- QDRService.connect($scope.formEntity);
+ if (QDRService.connected) {
+ QDRService.disconnect();
+ return;
+ }
+
+ if ($scope.settings.$valid) {
+ $scope.connectionError = false;
+ $scope.connecting = true;
+ $timeout(doConnect) // timeout so connecting animation can display
}
+ }
+
+ var doConnect = function() {
+ if (!$scope.formEntity.address)
+ $scope.formEntity.address = "localhost"
+
+ console.log("attempting to connect to " + $scope.formEntity.address + ':' + $scope.formEntity.port);
+ QDRService.addDisconnectAction(function() {
+ $timeout(function() {
+ QDR.log.debug("disconnect action called");
+ $scope.connecting = false;
+ $scope.connectionErrorText = QDRService.errorText;
+ $scope.connectionError = true;
+ })
+ });
+ QDRService.addConnectAction(function() {
+ QDRService.getSchema(function () {
+ QDR.log.debug("got schema after connection")
+ QDRService.addUpdatedAction("initialized", function () {
+ QDRService.delUpdatedAction("initialized")
+ QDR.log.debug("got initial topology")
+ $timeout(function() {
+QDR.log.debug("changing location to ")
+ $scope.connecting = false;
+ if ($location.path().startsWith(QDR.pluginRoot)) {
+ var searchObject = $location.search();
+ var goto = "overview";
+ if (searchObject.org && searchObject.org !== "connect") {
+ goto = searchObject.org;
+ }
+ $location.search('org', null)
+ $location.path(QDR.pluginRoot + "/" + goto);
+QDR.log.debug(QDR.pluginRoot + "/" + goto)
+ }
+ })
+ })
+ QDR.log.debug("requesting a topology")
+ QDRService.topology.get()
+ })
+ });
+ QDRService.connect($scope.formEntity);
+ }
}]);
-QDR.module.directive('posint', function (){
- return {
- require: 'ngModel',
-
- link: function(scope, elem, attr, ctrl) {
- // input type number allows + and - but we don't want them so filter them out
- elem.bind('keypress', function (event) {
- var nkey = !event.charCode ? event.which : event.charCode;
- var skey = String.fromCharCode(nkey);
- var nono = "-+.,"
- if (nono.indexOf(skey) >= 0) {
- event.preventDefault();
- return false;
- }
- // firefox doesn't filter out non-numeric input. it just sets the ctrl to invalid
- if (/[\!\@\#\$\%^&*\(\)]/.test(skey) && event.shiftKey || // prevent shift numbers
- !( // prevent all but the following
- nkey <= 0 || // arrows
- nkey == 8 || // delete|backspace
- nkey == 13 || // enter
- (nkey >= 37 && nkey <=40) || // arrows
- event.ctrlKey || event.altKey || // ctrl-v, etc.
- /[0-9]/.test(skey)) // numbers
- ) {
- event.preventDefault();
- return false;
- }
- })
- // check the current value of input
- var _isPortInvalid = function (value) {
- var port = value + ''
- var isErrRange = false;
- // empty string is valid
- if (port.length !== 0) {
- var n = ~~Number(port);
- if (n < 1 || n > 65535) {
- isErrRange = true;
- }
- }
- ctrl.$setValidity('range', !isErrRange)
- return isErrRange;
- }
-
- //For DOM -> model validation
- ctrl.$parsers.unshift(function(value) {
- return _isPortInvalid(value) ? undefined : value;
- });
-
- //For model -> DOM validation
- ctrl.$formatters.unshift(function(value) {
- _isPortInvalid(value);
- return value;
- });
+ QDR.module.directive('posint', function() {
+ return {
+ require: 'ngModel',
+
+ link: function(scope, elem, attr, ctrl) {
+ // input type number allows + and - but we don't want them so filter them out
+ elem.bind('keypress', function(event) {
+ var nkey = !event.charCode ? event.which : event.charCode;
+ var skey = String.fromCharCode(nkey);
+ var nono = "-+.,"
+ if (nono.indexOf(skey) >= 0) {
+ event.preventDefault();
+ return false;
+ }
+ // firefox doesn't filter out non-numeric input. it just sets the ctrl to invalid
+ if (/[\!\@\#\$\%^&*\(\)]/.test(skey) && event.shiftKey || // prevent shift numbers
+ !( // prevent all but the following
+ nkey <= 0 || // arrows
+ nkey == 8 || // delete|backspace
+ nkey == 13 || // enter
+ (nkey >= 37 && nkey <= 40) || // arrows
+ event.ctrlKey || event.altKey || // ctrl-v, etc.
+ /[0-9]/.test(skey)) // numbers
+ ) {
+ event.preventDefault();
+ return false;
+ }
+ })
+ // check the current value of input
+ var _isPortInvalid = function(value) {
+ var port = value + ''
+ var isErrRange = false;
+ // empty string is valid
+ if (port.length !== 0) {
+ var n = ~~Number(port);
+ if (n < 1 || n > 65535) {
+ isErrRange = true;
+ }
+ }
+ ctrl.$setValidity('range', !isErrRange)
+ return isErrRange;
+ }
+
+ //For DOM -> model validation
+ ctrl.$parsers.unshift(function(value) {
+ return _isPortInvalid(value) ? undefined : value;
+ });
+
+ //For model -> DOM validation
+ ctrl.$formatters.unshift(function(value) {
+ _isPortInvalid(value);
+ return value;
+ });
}
- };
-});
+ };
+ });
return QDR;
}(QDR || {}));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org