You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/09/27 12:13:43 UTC
svn commit: r1627942 [2/4] - in
/qpid/proton/branches/fadams-javascript-binding:
examples/messenger/javascript/ proton-c/bindings/javascript/
tests/javascript/
Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js?rev=1627942&r1=1627941&r2=1627942&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js Sat Sep 27 10:13:43 2014
@@ -37,1472 +37,1475 @@
*/
// Check if the environment is Node.js and if not log an error and exit.
-if (!exports) {
- console.error("qpid-config.js should be run in Node.js");
- return;
-}
-
-var qmf = {}; // Create qmf namespace object.
-qmf.Console = function() { // qmf.Console Constructor.
- var proton = require("qpid-proton");
- var message = new proton.Message();
- var messenger = new proton.Messenger();
-
- var brokerAddress = '';
- var replyTo = '';
+if (typeof process === 'object' && typeof require === 'function') {
- /**
- * The correlator object is a mechanism used to correlate requests with their
- * aynchronous responses. It might possible be better to make use of Promises
- * to implement part of this behaviour but a mechanism would still be needed to
- * correlate a request with its response callback in order to wrap things up in
- * a Promise, so much of the behaviour of this object would still be required.
- * In addition it seemed to make sense to make this QMF2 implementation fairly
- * free of dependencies and using Promises would require external libraries.
- * Instead the correlator implements "Promise-like" semantics, you might call it
- * a broken Promise :-)
- * <p>
- * in particular the request method behaves a *bit* like Promise.all() though it
- * is mostly fake and takes an array of functions that call the add() method
- * which is really the method used to associate response objects by correlationID.
- * The then method is used to register a listener that will be called when all
- * the requests that have been registered have received responses.
- * TODO error/timeout handling.
- */
- var correlator = {
- _resolve: null,
- _objects: {},
- add: function(id) {
- this._objects[id] = {complete: false, list: null};
- },
- request: function() {
- this._resolve = function() {console.log("Warning: No resolver has been set")};
- return this;
- },
- then: function(resolver) {
- this._resolve = resolver ? resolver : this._resolve;
- },
- resolve: function() {
- var opcode = message.properties['qmf.opcode'];
- var correlationID = message.getCorrelationID();
- var resp = this._objects[correlationID];
- if (opcode === '_query_response') {
- if (resp.list) {
- Array.prototype.push.apply(resp.list, message.body); // This is faster than concat.
- } else {
+ var qmf = {}; // Create qmf namespace object.
+ qmf.Console = function() { // qmf.Console Constructor.
+ var proton = require("qpid-proton");
+ var message = new proton.Message();
+ var messenger = new proton.Messenger();
+
+ var brokerAddress = '';
+ var replyTo = '';
+
+ /**
+ * The correlator object is a mechanism used to correlate requests with
+ * their aynchronous responses. It might possible be better to make use
+ * of Promises to implement part of this behaviour but a mechanism would
+ * still be needed to correlate a request with its response callback in
+ * order to wrap things up in a Promise, so much of the behaviour of this
+ * object would still be required. In addition it seemed to make sense to
+ * make this QMF2 implementation fairly free of dependencies and using
+ * Promises would require external libraries. Instead the correlator
+ * implements "Promise-like" semantics, you might say a broken Promise :-)
+ * <p>
+ * in particular the request method behaves a *bit* like Promise.all()
+ * though it is mostly fake and takes an array of functions that call
+ * the add() method which is really the method used to associate response
+ * objects by correlationID. The then method is used to register a
+ * listener that will be called when all the requests that have been
+ * registered have received responses.
+ * TODO error/timeout handling.
+ */
+ var correlator = {
+ _resolve: null,
+ _objects: {},
+ add: function(id) {
+ this._objects[id] = {complete: false, list: null};
+ },
+ request: function() {
+ this._resolve = function() {console.log("Warning: No resolver has been set")};
+ return this;
+ },
+ then: function(resolver) {
+ this._resolve = resolver ? resolver : this._resolve;
+ },
+ resolve: function() {
+ var opcode = message.properties['qmf.opcode'];
+ var correlationID = message.getCorrelationID();
+ var resp = this._objects[correlationID];
+ if (opcode === '_query_response') {
+ if (resp.list) {
+ Array.prototype.push.apply(resp.list, message.body); // This is faster than concat.
+ } else {
+ resp.list = message.body;
+ }
+
+ var partial = message.properties['partial'];
+ if (!partial) {
+ resp.complete = true;
+ }
+
+ this._objects[correlationID] = resp;
+ this._checkComplete();
+ } else if (opcode === '_method_response' || opcode === '_exception') {
resp.list = message.body;
- }
-
- var partial = message.properties['partial'];
- if (!partial) {
resp.complete = true;
+ this._objects[correlationID] = resp;
+ this._checkComplete();
+ } else {
+ console.error("Bad Message response, qmf.opcode = " + opcode);
+ }
+ },
+ _checkComplete: function() {
+ var response = {};
+ for (var id in this._objects) {
+ var object = this._objects[id];
+ if (object.complete) {
+ response[id] = object.list;
+ } else {
+ return;
+ }
}
- this._objects[correlationID] = resp;
- this._checkComplete();
- } else if (opcode === '_method_response' || opcode === '_exception') {
- resp.list = message.body;
- resp.complete = true;
- this._objects[correlationID] = resp;
- this._checkComplete();
- } else {
- console.error("Bad Message response, qmf.opcode = " + opcode);
+ this._objects = {}; // Clear state ready for next call.
+ this._resolve(response.method ? response.method : response);
}
- },
- _checkComplete: function() {
- var response = {};
- for (var id in this._objects) {
- var object = this._objects[id];
- if (object.complete) {
- response[id] = object.list;
- } else {
- return;
- }
+ }; // End of correlator object definition.
+
+ var pumpData = function() {
+ while (messenger.incoming()) {
+ // The second parameter forces Binary payloads to be decoded as
+ // strings this is useful because the broker QMF Agent encodes
+ // strings as AMQP binary unfortunately.
+ var t = messenger.get(message, true);
+ correlator.resolve();
+ messenger.accept(t);
}
- this._objects = {}; // Clear state ready for next call.
- this._resolve(response.method ? response.method : response);
- }
- };
-
- var pumpData = function() {
- while (messenger.incoming()) {
- // The second parameter forces Binary payloads to be decoded as strings
- // this is useful because the broker QMF Agent encodes strings as AMQP
- // binary, which is a right pain from an interoperability perspective.
- var t = messenger.get(message, true);
- correlator.resolve();
- messenger.accept(t);
- }
-
- if (messenger.isStopped()) {
- message.free();
- messenger.free();
- }
- };
-
- this.getObjects = function(packageName, className) {
- message.setAddress(brokerAddress);
- message.setSubject('broker');
- message.setReplyTo(replyTo);
- message.setCorrelationID(className);
- message.properties = {
- "routing-key": "broker", // Added for Java Broker
- "x-amqp-0-10.app-id": "qmf2",
- "method": "request",
- "qmf.opcode": "_query_request",
- };
- message.body = {
- "_what": "OBJECT",
- "_schema_id": {
- "_package_name": packageName,
- "_class_name": className
+ if (messenger.isStopped()) {
+ message.free();
+ messenger.free();
}
};
-
- correlator.add(className);
- messenger.put(message);
- };
-
- this.invokeMethod = function(object, method, arguments) {
- var correlationID = 'method';
- message.setAddress(brokerAddress);
- message.setSubject('broker');
- message.setReplyTo(replyTo);
- message.setCorrelationID(correlationID);
- message.properties = {
- "routing-key": "broker", // Added for Java Broker
- "x-amqp-0-10.app-id": "qmf2",
- "method": "request",
- "qmf.opcode": "_method_request",
- };
- message.body = {
- "_object_id": object._object_id,
- "_method_name" : method,
- "_arguments" : arguments
+
+ this.getObjects = function(packageName, className) {
+ message.setAddress(brokerAddress);
+ message.setSubject('broker');
+ message.setReplyTo(replyTo);
+ message.setCorrelationID(className);
+ message.properties = {
+ "routing-key": "broker", // Added for Java Broker
+ "x-amqp-0-10.app-id": "qmf2",
+ "method": "request",
+ "qmf.opcode": "_query_request",
+ };
+ message.body = {
+ "_what": "OBJECT",
+ "_schema_id": {
+ "_package_name": packageName,
+ "_class_name": className
+ }
+ };
+
+ correlator.add(className);
+ messenger.put(message);
};
- correlator.add(correlationID);
- messenger.put(message);
- };
-
- this.addConnection = function(addr, callback) {
- brokerAddress = addr + '/qmf.default.direct';
- var replyAddress = addr + '/#';
-
- messenger.on('subscription', function(subscription) {
- var subscriptionAddress = subscription.getAddress();
- var splitAddress = subscriptionAddress.split('/');
- replyTo = splitAddress[splitAddress.length - 1];
- callback();
- });
-
- messenger.subscribe(replyAddress);
- }
-
- this.destroy = function() {
- messenger.stop();
- }
-
- this.request = function() {return correlator.request();}
-
- messenger.on('error', function(error) {console.log(error);});
- messenger.on('work', pumpData);
- messenger.setOutgoingWindow(1024);
- messenger.recv(); // Receive as many messages as messenger can buffer.
- messenger.start();
-};
-
-
-/************************* qpid-config business logic ************************/
-
-var brokerAgent = new qmf.Console();
-
-var _usage =
-'Usage: qpid-config [OPTIONS]\n' +
-' qpid-config [OPTIONS] exchanges [filter-string]\n' +
-' qpid-config [OPTIONS] queues [filter-string]\n' +
-' qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]\n' +
-' qpid-config [OPTIONS] del exchange <name>\n' +
-' qpid-config [OPTIONS] add queue <name> [AddQueueOptions]\n' +
-' qpid-config [OPTIONS] del queue <name> [DelQueueOptions]\n' +
-' qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]\n' +
-' <for type xml> [-f -|filename]\n' +
-' <for type header> [all|any] k1=v1 [, k2=v2...]\n' +
-' qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]\n' +
-' qpid-config [OPTIONS] reload-acl\n' +
-' qpid-config [OPTIONS] add <type> <name> [--argument <property-name>=<property-value>]\n' +
-' qpid-config [OPTIONS] del <type> <name>\n' +
-' qpid-config [OPTIONS] list <type> [--show-property <property-name>]\n';
-
-var usage = function() {
- console.log(_usage);
- process.exit(-1);
-};
-
-var _description =
-'Examples:\n' +
-'\n' +
-'$ qpid-config add queue q\n' +
-'$ qpid-config add exchange direct d -a localhost:5672\n' +
-'$ qpid-config exchanges -b 10.1.1.7:10000\n' +
-'$ qpid-config queues -b guest/guest@broker-host:10000\n' +
-'\n' +
-'Add Exchange <type> values:\n' +
-'\n' +
-' direct Direct exchange for point-to-point communication\n' +
-' fanout Fanout exchange for broadcast communication\n' +
-' topic Topic exchange that routes messages using binding keys with wildcards\n' +
-' headers Headers exchange that matches header fields against the binding keys\n' +
-' xml XML Exchange - allows content filtering using an XQuery\n' +
-'\n' +
-'\n' +
-'Queue Limit Actions:\n' +
-'\n' +
-' none (default) - Use broker\'s default policy\n' +
-' reject - Reject enqueued messages\n' +
-' ring - Replace oldest unacquired message with new\n' +
-'\n' +
-'Replication levels:\n' +
-'\n' +
-' none - no replication\n' +
-' configuration - replicate queue and exchange existence and bindings, but not messages.\n' +
-' all - replicate configuration and messages\n';
-
-var _options =
-'Options:\n' +
-' -h, --help show this help message and exit\n' +
-'\n' +
-' General Options:\n' +
-' -t <secs>, --timeout=<secs>\n' +
-' Maximum time to wait for broker connection (in\n' +
-' seconds)\n' +
-' -r, --recursive Show bindings in queue or exchange list\n' +
-' -b <address>, --broker=<address>\n' +
-' Address of qpidd broker with syntax:\n' +
-' [username/password@] hostname | ip-address [:<port>]\n' +
-' -a <address>, --broker-addr=<address>\n' +
-/* TODO Connection options
-' --sasl-mechanism=<mech>\n' +
-' SASL mechanism for authentication (e.g. EXTERNAL,\n' +
-' ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n' +
-' automatically picks the most secure available\n' +
-' mechanism - use this option to override.\n' +
-' --ssl-certificate=<cert>\n' +
-' Client SSL certificate (PEM Format)\n' +
-' --ssl-key=<key> Client SSL private key (PEM Format)\n' +
-' --ha-admin Allow connection to a HA backup broker.\n' +
-*/
-'\n' +
-' Options for Listing Exchanges and Queues:\n' +
-' --ignore-default Ignore the default exchange in exchange or queue list\n' +
-'\n' +
-' Options for Adding Exchanges and Queues:\n' +
-' --alternate-exchange=<aexname>\n' +
-' Name of the alternate-exchange for the new queue or\n' +
-' exchange. Exchanges route messages to the alternate\n' +
-' exchange if they are unable to route them elsewhere.\n' +
-' Queues route messages to the alternate exchange if\n' +
-' they are rejected by a subscriber or orphaned by queue\n' +
-' deletion.\n' +
-' --durable The new queue or exchange is durable.\n' +
-' --replicate=<level>\n' +
-' Enable automatic replication in a HA cluster. <level>\n' +
-' is \'none\', \'configuration\' or \'all\').\n' +
-'\n' +
-' Options for Adding Queues:\n' +
-' --file-count=<n> Number of files in queue\'s persistence journal\n' +
-' --file-size=<n> File size in pages (64KiB/page)\n' +
-' --max-queue-size=<n>\n' +
-' Maximum in-memory queue size as bytes\n' +
-' --max-queue-count=<n>\n' +
-' Maximum in-memory queue size as a number of messages\n' +
-' --limit-policy=<policy>\n' +
-' Action to take when queue limit is reached\n' +
-' --lvq-key=<key> Last Value Queue key\n' +
-' --generate-queue-events=<n>\n' +
-' If set to 1, every enqueue will generate an event that\n' +
-' can be processed by registered listeners (e.g. for\n' +
-' replication). If set to 2, events will be generated\n' +
-' for enqueues and dequeues.\n' +
-' --flow-stop-size=<n>\n' +
-' Turn on sender flow control when the number of queued\n' +
-' bytes exceeds this value.\n' +
-' --flow-resume-size=<n>\n' +
-' Turn off sender flow control when the number of queued\n' +
-' bytes drops below this value.\n' +
-' --flow-stop-count=<n>\n' +
-' Turn on sender flow control when the number of queued\n' +
-' messages exceeds this value.\n' +
-' --flow-resume-count=<n>\n' +
-' Turn off sender flow control when the number of queued\n' +
-' messages drops below this value.\n' +
-' --group-header=<header-name>\n' +
-' Enable message groups. Specify name of header that\n' +
-' holds group identifier.\n' +
-' --shared-groups Allow message group consumption across multiple\n' +
-' consumers.\n' +
-' --argument=<NAME=VALUE>\n' +
-' Specify a key-value pair to add to queue arguments\n' +
-' --start-replica=<broker-url>\n' +
-' Start replication from the same-named queue at\n' +
-' <broker-url>\n' +
-'\n' +
-' Options for Adding Exchanges:\n' +
-' --sequence Exchange will insert a \'qpid.msg_sequence\' field in\n' +
-' the message header\n' +
-' --ive Exchange will behave as an \'initial-value-exchange\',\n' +
-' keeping a reference to the last message forwarded and\n' +
-' enqueuing that message to newly bound queues.\n' +
-'\n' +
-' Options for Deleting Queues:\n' +
-' --force Force delete of queue even if it\'s currently used or\n' +
-' it\'s not empty\n' +
-' --force-if-not-empty\n' +
-' Force delete of queue even if it\'s not empty\n' +
-' --force-if-used Force delete of queue even if it\'s currently used\n' +
-'\n' +
-' Options for Declaring Bindings:\n' +
-' -f <file.xq>, --file=<file.xq>\n' +
-' For XML Exchange bindings - specifies the name of a\n' +
-' file containing an XQuery.\n' +
-'\n' +
-' Formatting options for \'list\' action:\n' +
-' --show-property=<property-name>\n' +
-' Specify a property of an object to be included in\n' +
-' output\n';
+ this.invokeMethod = function(object, method, arguments) {
+ var correlationID = 'method';
+ message.setAddress(brokerAddress);
+ message.setSubject('broker');
+ message.setReplyTo(replyTo);
+ message.setCorrelationID(correlationID);
+ message.properties = {
+ "routing-key": "broker", // Added for Java Broker
+ "x-amqp-0-10.app-id": "qmf2",
+ "method": "request",
+ "qmf.opcode": "_method_request",
+ };
+ message.body = {
+ "_object_id": object._object_id,
+ "_method_name" : method,
+ "_arguments" : arguments
+ };
+
+ correlator.add(correlationID);
+ messenger.put(message);
+ };
-var REPLICATE_LEVELS = {"none" : true, "configuration": true, "all": true};
-var DEFAULT_PROPERTIES = {"exchange": {"name": true, "type": true, "durable": true},
- "queue": {"name": true, "durable": true, "autoDelete": true}};
+ this.addConnection = function(addr, callback) {
+ brokerAddress = addr + '/qmf.default.direct';
+ var replyAddress = addr + '/#';
+
+ messenger.on('subscription', function(subscription) {
+ var subscriptionAddress = subscription.getAddress();
+ var splitAddress = subscriptionAddress.split('/');
+ replyTo = splitAddress[splitAddress.length - 1];
+ callback();
+ });
-var getValue = function(r) {
- var value = null;
- if (r.length === 2) {
- value = r[1];
- if (!isNaN(value)) {
- value = parseInt(value);
+ messenger.subscribe(replyAddress);
}
- }
-
- return value;
-};
-
-var config = {
- _recursive : false,
- _host : 'guest:guest@localhost:5673', // Note 5673 not 5672 as we use WebSocket transport.
- _connTimeout : 10,
- _ignoreDefault : false,
- _altern_ex : null,
- _durable : false,
- _replicate : null,
- _if_empty : true,
- _if_unused : true,
- _fileCount : null,
- _fileSize : null,
- _maxQueueSize : null,
- _maxQueueCount : null,
- _limitPolicy : null,
- _msgSequence : false,
- _lvq_key : null,
- _ive : null,
- _eventGeneration: null,
- _file : null,
- _flowStopCount : null,
- _flowResumeCount: null,
- _flowStopSize : null,
- _flowResumeSize : null,
- _msgGroupHeader : null,
- _sharedMsgGroup : false,
- _extra_arguments: [],
- _start_replica : null,
- _returnCode : 0,
- _list_properties: null,
- getOptions: function() {
- var options = {};
- for (var a = 0; a < this._extra_arguments.length; a++) {
- var r = this._extra_arguments[a].split('=');
- options[r[0]] = getValue(r);
+ this.destroy = function() {
+ messenger.stop();
}
- return options;
- }
-};
-var FILECOUNT = 'qpid.file_count';
-var FILESIZE = 'qpid.file_size';
-var MAX_QUEUE_SIZE = 'qpid.max_size';
-var MAX_QUEUE_COUNT = 'qpid.max_count';
-var POLICY_TYPE = 'qpid.policy_type';
-var LVQ_KEY = 'qpid.last_value_queue_key';
-var MSG_SEQUENCE = 'qpid.msg_sequence';
-var IVE = 'qpid.ive';
-var QUEUE_EVENT_GENERATION = 'qpid.queue_event_generation';
-var FLOW_STOP_COUNT = 'qpid.flow_stop_count';
-var FLOW_RESUME_COUNT = 'qpid.flow_resume_count';
-var FLOW_STOP_SIZE = 'qpid.flow_stop_size';
-var FLOW_RESUME_SIZE = 'qpid.flow_resume_size';
-var MSG_GROUP_HDR_KEY = 'qpid.group_header_key';
-var SHARED_MSG_GROUP = 'qpid.shared_msg_group';
-var REPLICATE = 'qpid.replicate';
+ this.request = function() {return correlator.request();}
-/**
- * There are various arguments to declare that have specific program
- * options in this utility. However there is now a generic mechanism for
- * passing arguments as well. The SPECIAL_ARGS list contains the
- * arguments for which there are specific program options defined
- * i.e. the arguments for which there is special processing on add and
- * list
-*/
-var SPECIAL_ARGS={};
-SPECIAL_ARGS[FILECOUNT] = true;
-SPECIAL_ARGS[FILESIZE] = true;
-SPECIAL_ARGS[MAX_QUEUE_SIZE] = true;
-SPECIAL_ARGS[MAX_QUEUE_COUNT] = true;
-SPECIAL_ARGS[POLICY_TYPE] = true;
-SPECIAL_ARGS[LVQ_KEY] = true;
-SPECIAL_ARGS[MSG_SEQUENCE] = true;
-SPECIAL_ARGS[IVE] = true;
-SPECIAL_ARGS[QUEUE_EVENT_GENERATION] = true;
-SPECIAL_ARGS[FLOW_STOP_COUNT] = true;
-SPECIAL_ARGS[FLOW_RESUME_COUNT] = true;
-SPECIAL_ARGS[FLOW_STOP_SIZE] = true;
-SPECIAL_ARGS[FLOW_RESUME_SIZE] = true;
-SPECIAL_ARGS[MSG_GROUP_HDR_KEY] = true;
-SPECIAL_ARGS[SHARED_MSG_GROUP] = true;
-SPECIAL_ARGS[REPLICATE] = true;
+ messenger.on('error', function(error) {console.log(error);});
+ messenger.on('work', pumpData);
+ messenger.setOutgoingWindow(1024);
+ messenger.recv(); // Receive as many messages as messenger can buffer.
+ messenger.start();
+ }; // End of qmf.Console
-var oid = function(id) {
- return id._agent_epoch + ':' + id._object_name
-};
+/************************* qpid-config business logic ************************/
-var filterMatch = function(name, filter) {
- if (filter === '') {
- return true;
- }
- if (name.indexOf(filter) === -1) {
- return false;
- }
- return true;
-};
+ var brokerAgent = new qmf.Console();
-var idMap = function(list) {
- var map = {};
- for (var i = 0; i < list.length; i++) {
- var item = list[i];
- map[oid(item._object_id)] = item;
- }
- return map;
-};
+ var _usage =
+ 'Usage: qpid-config [OPTIONS]\n' +
+ ' qpid-config [OPTIONS] exchanges [filter-string]\n' +
+ ' qpid-config [OPTIONS] queues [filter-string]\n' +
+ ' qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]\n' +
+ ' qpid-config [OPTIONS] del exchange <name>\n' +
+ ' qpid-config [OPTIONS] add queue <name> [AddQueueOptions]\n' +
+ ' qpid-config [OPTIONS] del queue <name> [DelQueueOptions]\n' +
+ ' qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]\n' +
+ ' <for type xml> [-f -|filename]\n' +
+ ' <for type header> [all|any] k1=v1 [, k2=v2...]\n' +
+ ' qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]\n' +
+ ' qpid-config [OPTIONS] reload-acl\n' +
+ ' qpid-config [OPTIONS] add <type> <name> [--argument <property-name>=<property-value>]\n' +
+ ' qpid-config [OPTIONS] del <type> <name>\n' +
+ ' qpid-config [OPTIONS] list <type> [--show-property <property-name>]\n';
-var renderObject = function(obj, list) {
- if (!obj) {
- return '';
- }
- var string = '';
- var addComma = false;
- for (var prop in obj) {
- if (addComma) {
- string += ', ';
- }
- if (obj.hasOwnProperty(prop)) {
- if (list) {
- if (SPECIAL_ARGS[prop]) continue;
- string += " --argument " + prop + "=" + obj[prop];
- } else {
- string += "'" + prop + "'" + ": '" + obj[prop] + "'";
- addComma = true;
- }
- }
- }
+ var usage = function() {
+ console.log(_usage);
+ process.exit(-1);
+ };
- if (addComma) {
- return '{' + string + '}';
- } else {
- if (list) {
- return string;
- } else {
- return '';
+ var _description =
+ 'Examples:\n' +
+ '\n' +
+ '$ qpid-config add queue q\n' +
+ '$ qpid-config add exchange direct d -a localhost:5672\n' +
+ '$ qpid-config exchanges -b 10.1.1.7:10000\n' +
+ '$ qpid-config queues -b guest/guest@broker-host:10000\n' +
+ '\n' +
+ 'Add Exchange <type> values:\n' +
+ '\n' +
+ ' direct Direct exchange for point-to-point communication\n' +
+ ' fanout Fanout exchange for broadcast communication\n' +
+ ' topic Topic exchange that routes messages using binding keys with wildcards\n' +
+ ' headers Headers exchange that matches header fields against the binding keys\n' +
+ ' xml XML Exchange - allows content filtering using an XQuery\n' +
+ '\n' +
+ '\n' +
+ 'Queue Limit Actions:\n' +
+ '\n' +
+ ' none (default) - Use broker\'s default policy\n' +
+ ' reject - Reject enqueued messages\n' +
+ ' ring - Replace oldest unacquired message with new\n' +
+ '\n' +
+ 'Replication levels:\n' +
+ '\n' +
+ ' none - no replication\n' +
+ ' configuration - replicate queue and exchange existence and bindings, but not messages.\n' +
+ ' all - replicate configuration and messages\n';
+
+ var _options =
+ 'Options:\n' +
+ ' -h, --help show this help message and exit\n' +
+ '\n' +
+ ' General Options:\n' +
+ ' -t <secs>, --timeout=<secs>\n' +
+ ' Maximum time to wait for broker connection (in\n' +
+ ' seconds)\n' +
+ ' -r, --recursive Show bindings in queue or exchange list\n' +
+ ' -b <address>, --broker=<address>\n' +
+ ' Address of qpidd broker with syntax:\n' +
+ ' [username/password@] hostname | ip-address [:<port>]\n' +
+ ' -a <address>, --broker-addr=<address>\n' +
+ /* TODO Connection options
+ ' --sasl-mechanism=<mech>\n' +
+ ' SASL mechanism for authentication (e.g. EXTERNAL,\n' +
+ ' ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n' +
+ ' automatically picks the most secure available\n' +
+ ' mechanism - use this option to override.\n' +
+ ' --ssl-certificate=<cert>\n' +
+ ' Client SSL certificate (PEM Format)\n' +
+ ' --ssl-key=<key> Client SSL private key (PEM Format)\n' +
+ ' --ha-admin Allow connection to a HA backup broker.\n' +
+ */
+ '\n' +
+ ' Options for Listing Exchanges and Queues:\n' +
+ ' --ignore-default Ignore the default exchange in exchange or queue list\n' +
+ '\n' +
+ ' Options for Adding Exchanges and Queues:\n' +
+ ' --alternate-exchange=<aexname>\n' +
+ ' Name of the alternate-exchange for the new queue or\n' +
+ ' exchange. Exchanges route messages to the alternate\n' +
+ ' exchange if they are unable to route them elsewhere.\n' +
+ ' Queues route messages to the alternate exchange if\n' +
+ ' they are rejected by a subscriber or orphaned by queue\n' +
+ ' deletion.\n' +
+ ' --durable The new queue or exchange is durable.\n' +
+ ' --replicate=<level>\n' +
+ ' Enable automatic replication in a HA cluster. <level>\n' +
+ ' is \'none\', \'configuration\' or \'all\').\n' +
+ '\n' +
+ ' Options for Adding Queues:\n' +
+ ' --file-count=<n> Number of files in queue\'s persistence journal\n' +
+ ' --file-size=<n> File size in pages (64KiB/page)\n' +
+ ' --max-queue-size=<n>\n' +
+ ' Maximum in-memory queue size as bytes\n' +
+ ' --max-queue-count=<n>\n' +
+ ' Maximum in-memory queue size as a number of messages\n' +
+ ' --limit-policy=<policy>\n' +
+ ' Action to take when queue limit is reached\n' +
+ ' --lvq-key=<key> Last Value Queue key\n' +
+ ' --generate-queue-events=<n>\n' +
+ ' If set to 1, every enqueue will generate an event that\n' +
+ ' can be processed by registered listeners (e.g. for\n' +
+ ' replication). If set to 2, events will be generated\n' +
+ ' for enqueues and dequeues.\n' +
+ ' --flow-stop-size=<n>\n' +
+ ' Turn on sender flow control when the number of queued\n' +
+ ' bytes exceeds this value.\n' +
+ ' --flow-resume-size=<n>\n' +
+ ' Turn off sender flow control when the number of queued\n' +
+ ' bytes drops below this value.\n' +
+ ' --flow-stop-count=<n>\n' +
+ ' Turn on sender flow control when the number of queued\n' +
+ ' messages exceeds this value.\n' +
+ ' --flow-resume-count=<n>\n' +
+ ' Turn off sender flow control when the number of queued\n' +
+ ' messages drops below this value.\n' +
+ ' --group-header=<header-name>\n' +
+ ' Enable message groups. Specify name of header that\n' +
+ ' holds group identifier.\n' +
+ ' --shared-groups Allow message group consumption across multiple\n' +
+ ' consumers.\n' +
+ ' --argument=<NAME=VALUE>\n' +
+ ' Specify a key-value pair to add to queue arguments\n' +
+ ' --start-replica=<broker-url>\n' +
+ ' Start replication from the same-named queue at\n' +
+ ' <broker-url>\n' +
+ '\n' +
+ ' Options for Adding Exchanges:\n' +
+ ' --sequence Exchange will insert a \'qpid.msg_sequence\' field in\n' +
+ ' the message header\n' +
+ ' --ive Exchange will behave as an \'initial-value-exchange\',\n' +
+ ' keeping a reference to the last message forwarded and\n' +
+ ' enqueuing that message to newly bound queues.\n' +
+ '\n' +
+ ' Options for Deleting Queues:\n' +
+ ' --force Force delete of queue even if it\'s currently used or\n' +
+ ' it\'s not empty\n' +
+ ' --force-if-not-empty\n' +
+ ' Force delete of queue even if it\'s not empty\n' +
+ ' --force-if-used Force delete of queue even if it\'s currently used\n' +
+ '\n' +
+ ' Options for Declaring Bindings:\n' +
+ ' -f <file.xq>, --file=<file.xq>\n' +
+ ' For XML Exchange bindings - specifies the name of a\n' +
+ ' file containing an XQuery.\n' +
+ '\n' +
+ ' Formatting options for \'list\' action:\n' +
+ ' --show-property=<property-name>\n' +
+ ' Specify a property of an object to be included in\n' +
+ ' output\n';
+
+ var REPLICATE_LEVELS = {"none" : true, "configuration": true, "all": true};
+ var DEFAULT_PROPERTIES = {"exchange": {"name": true, "type": true, "durable": true},
+ "queue": {"name": true, "durable": true, "autoDelete": true}};
+
+ var getValue = function(r) {
+ var value = null;
+ if (r.length === 2) {
+ value = r[1];
+ if (!isNaN(value)) {
+ value = parseInt(value);
+ }
}
- }
-};
-
-/**
- * The following methods illustrate the QMF2 class query mechanism which returns
- * the list of QMF Objects for the specified class that are currently present
- * on the Broker. The Schema <qpid>/cpp/src/qpid/broker/management-schema.xml
- * describes the properties and statistics of each Management Object.
- * <p>
- * One slightly subtle part of QMF is that certain Objects are associated via
- * references, for example Binding contains queueRef and exchangeRef, which lets
- * Objects link to each other using their _object_id property.
- * <p>
- * The implementation of these methods attempts to follow the same general flow
- * as the equivalent method in the "canonical" python based qpid-config version
- * but has the added complication that JavaScript is entirely asynchronous.
- * The approach that has been taken is to use the correlator object that lets a
- * callback function be registered via the "then" method and actually calls the
- * callback when all of the requests specified in the request method have
- * returned their results (which get passed as the callback parameter).
- */
-
-var overview = function() {
- brokerAgent.request(
- // Send the QMF query requests for the specified classes.
- brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
- brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
- ).then(function(objects) {
- var exchanges = objects.exchange;
- var queues = objects.queue;
- console.log("Total Exchanges: " + exchanges.length);
- var etype = {};
- for (var i = 0; i < exchanges.length; i++) {
- var exchange = exchanges[i]._values;
- if (!etype[exchange.type]) {
- etype[exchange.type] = 1;
- } else {
- etype[exchange.type]++;
+
+ return value;
+ };
+
+ var config = {
+ _recursive : false,
+ _host : 'guest:guest@localhost:5673', // Note 5673 not 5672 as we use WebSocket transport.
+ _connTimeout : 10,
+ _ignoreDefault : false,
+ _altern_ex : null,
+ _durable : false,
+ _replicate : null,
+ _if_empty : true,
+ _if_unused : true,
+ _fileCount : null,
+ _fileSize : null,
+ _maxQueueSize : null,
+ _maxQueueCount : null,
+ _limitPolicy : null,
+ _msgSequence : false,
+ _lvq_key : null,
+ _ive : null,
+ _eventGeneration: null,
+ _file : null,
+ _flowStopCount : null,
+ _flowResumeCount: null,
+ _flowStopSize : null,
+ _flowResumeSize : null,
+ _msgGroupHeader : null,
+ _sharedMsgGroup : false,
+ _extra_arguments: [],
+ _start_replica : null,
+ _returnCode : 0,
+ _list_properties: null,
+
+ getOptions: function() {
+ var options = {};
+ for (var a = 0; a < this._extra_arguments.length; a++) {
+ var r = this._extra_arguments[a].split('=');
+ options[r[0]] = getValue(r);
}
+ return options;
}
- for (var typ in etype) {
- var pad = Array(16 - typ.length).join(' ');
- console.log(pad + typ + ": " + etype[typ]);
+ };
+
+ var FILECOUNT = 'qpid.file_count';
+ var FILESIZE = 'qpid.file_size';
+ var MAX_QUEUE_SIZE = 'qpid.max_size';
+ var MAX_QUEUE_COUNT = 'qpid.max_count';
+ var POLICY_TYPE = 'qpid.policy_type';
+ var LVQ_KEY = 'qpid.last_value_queue_key';
+ var MSG_SEQUENCE = 'qpid.msg_sequence';
+ var IVE = 'qpid.ive';
+ var QUEUE_EVENT_GENERATION = 'qpid.queue_event_generation';
+ var FLOW_STOP_COUNT = 'qpid.flow_stop_count';
+ var FLOW_RESUME_COUNT = 'qpid.flow_resume_count';
+ var FLOW_STOP_SIZE = 'qpid.flow_stop_size';
+ var FLOW_RESUME_SIZE = 'qpid.flow_resume_size';
+ var MSG_GROUP_HDR_KEY = 'qpid.group_header_key';
+ var SHARED_MSG_GROUP = 'qpid.shared_msg_group';
+ var REPLICATE = 'qpid.replicate';
+
+ /**
+ * There are various arguments to declare that have specific program
+ * options in this utility. However there is now a generic mechanism for
+ * passing arguments as well. The SPECIAL_ARGS list contains the
+ * arguments for which there are specific program options defined
+ * i.e. the arguments for which there is special processing on add and
+ * list
+ */
+ var SPECIAL_ARGS={};
+ SPECIAL_ARGS[FILECOUNT] = true;
+ SPECIAL_ARGS[FILESIZE] = true;
+ SPECIAL_ARGS[MAX_QUEUE_SIZE] = true;
+ SPECIAL_ARGS[MAX_QUEUE_COUNT] = true;
+ SPECIAL_ARGS[POLICY_TYPE] = true;
+ SPECIAL_ARGS[LVQ_KEY] = true;
+ SPECIAL_ARGS[MSG_SEQUENCE] = true;
+ SPECIAL_ARGS[IVE] = true;
+ SPECIAL_ARGS[QUEUE_EVENT_GENERATION] = true;
+ SPECIAL_ARGS[FLOW_STOP_COUNT] = true;
+ SPECIAL_ARGS[FLOW_RESUME_COUNT] = true;
+ SPECIAL_ARGS[FLOW_STOP_SIZE] = true;
+ SPECIAL_ARGS[FLOW_RESUME_SIZE] = true;
+ SPECIAL_ARGS[MSG_GROUP_HDR_KEY] = true;
+ SPECIAL_ARGS[SHARED_MSG_GROUP] = true;
+ SPECIAL_ARGS[REPLICATE] = true;
+
+ // Returns a String representation of an ObjectID.
+ var oid = function(id) {
+ return id._agent_epoch + ':' + id._object_name
+ };
+
+ // Check if the supplied name contains the supplied filter String.
+ var filterMatch = function(name, filter) {
+ if (filter === '') {
+ return true;
}
-
- console.log("\n Total Queues: " + queues.length);
- var durable = 0;
- for (var i = 0; i < queues.length; i++) {
- var queue = queues[i]._values;
- if (queue.durable) {
- durable++;
- }
+ if (name.indexOf(filter) === -1) {
+ return false;
}
- console.log(" durable: " + durable);
- console.log(" non-durable: " + (queues.length - durable));
- brokerAgent.destroy();
- });
-};
-
-var exchangeList = function(filter) {
- brokerAgent.request(
- // Send the QMF query requests for the specified classes.
- brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
- ).then(function(objects) {
- var exchanges = objects.exchange;
- var exMap = idMap(exchanges);
- var caption1 = "Type ";
- var caption2 = "Exchange Name";
- var maxNameLen = caption2.length;
- var found = false;
- for (var i = 0; i < exchanges.length; i++) {
- var exchange = exchanges[i]._values;
- if (filterMatch(exchange.name, filter)) {
- if (exchange.name.length > maxNameLen) {
- maxNameLen = exchange.name.length;
- }
- found = true;
- }
+ return true;
+ };
+
+ // Take the supplied List of QMF2 Objects and return a Map keyed by ObjectID.
+ var idMap = function(list) {
+ var map = {};
+ for (var i = 0; i < list.length; i++) {
+ var item = list[i];
+ map[oid(item._object_id)] = item;
}
- if (!found) {
- config._returnCode = 1;
- return;
+ return map;
+ };
+
+ // Pretty-print the supplied Object.
+ var renderObject = function(obj, list) {
+ if (!obj) {
+ return '';
}
-
- var pad = Array(maxNameLen + 1 - caption2.length).join(' ');
- console.log(caption1 + caption2 + pad + " Attributes");
- console.log(Array(maxNameLen + caption1.length + 13).join('='));
-
- for (var i = 0; i < exchanges.length; i++) {
- var exchange = exchanges[i]._values;
- if (config._ignoreDefault && !exchange.name) continue;
- if (filterMatch(exchange.name, filter)) {
- var pad1 = Array(11 - exchange.type.length).join(' ');
- var pad2 = Array(maxNameLen + 2 - exchange.name.length).join(' ');
- var string = exchange.type + pad1 + exchange.name + pad2;
- var args = exchange.arguments ? exchange.arguments : {};
- if (exchange.durable) {
- string += ' --durable';
- }
- if (args[REPLICATE]) {
- string += ' --replicate=' + args[REPLICATE];
- }
- if (args[MSG_SEQUENCE]) {
- string += ' --sequence';
- }
- if (args[IVE]) {
- string += ' --ive';
- }
- if (exchange.altExchange) {
- string += ' --alternate-exchange=' + exMap[oid(exchange.altExchange)]._values.name;
+ var string = '';
+ var addComma = false;
+ for (var prop in obj) {
+ if (addComma) {
+ string += ', ';
+ }
+ if (obj.hasOwnProperty(prop)) {
+ if (list) {
+ if (SPECIAL_ARGS[prop]) continue;
+ string += " --argument " + prop + "=" + obj[prop];
+ } else {
+ string += "'" + prop + "'" + ": '" + obj[prop] + "'";
+ addComma = true;
}
- console.log(string);
}
}
- brokerAgent.destroy();
- });
-};
-
-var exchangeListRecurse = function(filter) {
- brokerAgent.request(
- // Send the QMF query requests for the specified classes.
- brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
- brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'),
- brokerAgent.getObjects('org.apache.qpid.broker', 'binding')
- ).then(function(objects) {
- var exchanges = objects.exchange;
- var bindings = objects.binding;
- var queues = idMap(objects.queue);
-
- for (var i = 0; i < exchanges.length; i++) {
- var exchange = exchanges[i];
- var exchangeId = oid(exchange._object_id);
- exchange = exchange._values;
-
- if (config._ignoreDefault && !exchange.name) continue;
- if (filterMatch(exchange.name, filter)) {
- console.log("Exchange '" + exchange.name + "' (" + exchange.type + ")");
- for (var j = 0; j < bindings.length; j++) {
- var bind = bindings[j]._values;
- var exchangeRef = oid(bind.exchangeRef);
-
- if (exchangeRef === exchangeId) {
- var queue = queues[oid(bind.queueRef)];
- var queueName = queue ? queue._values.name : "<unknown>";
- console.log(" bind [" + bind.bindingKey + "] => " + queueName +
- " " + renderObject(bind.arguments));
- }
- }
+
+ if (addComma) {
+ return '{' + string + '}';
+ } else {
+ if (list) {
+ return string;
+ } else {
+ return '';
}
}
- brokerAgent.destroy();
- });
-};
-
-var queueList = function(filter) {
- brokerAgent.request(
- // Send the QMF query requests for the specified classes.
- brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
- brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
- ).then(function(objects) {
- var queues = objects.queue;
- var exMap = idMap(objects.exchange);
- var caption = "Queue Name";
- var maxNameLen = caption.length;
- var found = false;
- for (var i = 0; i < queues.length; i++) {
- var queue = queues[i]._values;
- if (filterMatch(queue.name, filter)) {
- if (queue.name.length > maxNameLen) {
- maxNameLen = queue.name.length;
+ };
+
+ /**
+ * The following methods illustrate the QMF2 class query mechanism which returns
+ * the list of QMF Objects for the specified class that are currently present
+ * on the Broker. The Schema <qpid>/cpp/src/qpid/broker/management-schema.xml
+ * describes the properties and statistics of each Management Object.
+ * <p>
+ * One slightly subtle part of QMF is that certain Objects are associated via
+ * references, for example Binding contains queueRef and exchangeRef, which lets
+ * Objects link to each other using their _object_id property.
+ * <p>
+ * The implementation of these methods attempts to follow the same general flow
+ * as the equivalent method in the "canonical" python based qpid-config version
+ * but has the added complication that JavaScript is entirely asynchronous.
+ * The approach that has been taken is to use the correlator object that lets a
+ * callback function be registered via the "then" method and actually calls the
+ * callback when all of the requests specified in the request method have
+ * returned their results (which get passed as the callback parameter).
+ */
+
+ var overview = function() {
+ brokerAgent.request(
+ // Send the QMF query requests for the specified classes.
+ brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
+ brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
+ ).then(function(objects) {
+ var exchanges = objects.exchange;
+ var queues = objects.queue;
+ console.log("Total Exchanges: " + exchanges.length);
+ var etype = {};
+ for (var i = 0; i < exchanges.length; i++) {
+ var exchange = exchanges[i]._values;
+ if (!etype[exchange.type]) {
+ etype[exchange.type] = 1;
+ } else {
+ etype[exchange.type]++;
}
- found = true;
}
- }
- if (!found) {
- config._returnCode = 1;
- return;
- }
-
- var pad = Array(maxNameLen + 1 - caption.length).join(' ');
- console.log(caption + pad + " Attributes");
- console.log(Array(maxNameLen + caption.length + 3).join('='));
-
- for (var i = 0; i < queues.length; i++) {
- var queue = queues[i]._values;
- if (filterMatch(queue.name, filter)) {
- var pad2 = Array(maxNameLen + 2 - queue.name.length).join(' ');
- var string = queue.name + pad2;
- var args = queue.arguments ? queue.arguments : {};
+ for (var typ in etype) {
+ var pad = Array(16 - typ.length).join(' ');
+ console.log(pad + typ + ": " + etype[typ]);
+ }
+
+ console.log("\n Total Queues: " + queues.length);
+ var durable = 0;
+ for (var i = 0; i < queues.length; i++) {
+ var queue = queues[i]._values;
if (queue.durable) {
- string += ' --durable';
- }
- if (args[REPLICATE]) {
- string += ' --replicate=' + args[REPLICATE];
- }
- if (queue.autoDelete) {
- string += ' auto-del';
+ durable++;
}
- if (queue.exclusive) {
- string += ' excl';
- }
- if (args[FILESIZE]) {
- string += ' --file-size=' + args[FILESIZE];
- }
- if (args[FILECOUNT]) {
- string += ' --file-count=' + args[FILECOUNT];
- }
- if (args[MAX_QUEUE_SIZE]) {
- string += ' --max-queue-size=' + args[MAX_QUEUE_SIZE];
- }
- if (args[MAX_QUEUE_COUNT]) {
- string += ' --max-queue-count=' + args[MAX_QUEUE_COUNT];
- }
- if (args[POLICY_TYPE]) {
- string += ' --limit-policy=' + args[POLICY_TYPE].replace("_", "-");
- }
- if (args[LVQ_KEY]) {
- string += ' --lvq-key=' + args[LVQ_KEY];
- }
- if (args[QUEUE_EVENT_GENERATION]) {
- string += ' --generate-queue-events=' + args[QUEUE_EVENT_GENERATION];
- }
- if (queue.altExchange) {
- string += ' --alternate-exchange=' + exMap[oid(queue.altExchange)]._values.name;
- }
- if (args[FLOW_STOP_SIZE]) {
- string += ' --flow-stop-size=' + args[FLOW_STOP_SIZE];
- }
- if (args[FLOW_RESUME_SIZE]) {
- string += ' --flow-resume-size=' + args[FLOW_RESUME_SIZE];
+ }
+ console.log(" durable: " + durable);
+ console.log(" non-durable: " + (queues.length - durable));
+ brokerAgent.destroy();
+ });
+ };
+
+ var exchangeList = function(filter) {
+ brokerAgent.request(
+ // Send the QMF query requests for the specified classes.
+ brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
+ ).then(function(objects) {
+ var exchanges = objects.exchange;
+ var exMap = idMap(exchanges);
+ var caption1 = "Type ";
+ var caption2 = "Exchange Name";
+ var maxNameLen = caption2.length;
+ var found = false;
+ for (var i = 0; i < exchanges.length; i++) {
+ var exchange = exchanges[i]._values;
+ if (filterMatch(exchange.name, filter)) {
+ if (exchange.name.length > maxNameLen) {
+ maxNameLen = exchange.name.length;
+ }
+ found = true;
}
- if (args[FLOW_STOP_COUNT]) {
- string += ' --flow-stop-count=' + args[FLOW_STOP_COUNT];
+ }
+ if (!found) {
+ config._returnCode = 1;
+ return;
+ }
+
+ var pad = Array(maxNameLen + 1 - caption2.length).join(' ');
+ console.log(caption1 + caption2 + pad + " Attributes");
+ console.log(Array(maxNameLen + caption1.length + 13).join('='));
+
+ for (var i = 0; i < exchanges.length; i++) {
+ var exchange = exchanges[i]._values;
+ if (config._ignoreDefault && !exchange.name) continue;
+ if (filterMatch(exchange.name, filter)) {
+ var pad1 = Array(11 - exchange.type.length).join(' ');
+ var pad2 = Array(maxNameLen + 2 - exchange.name.length).join(' ');
+ var string = exchange.type + pad1 + exchange.name + pad2;
+ var args = exchange.arguments ? exchange.arguments : {};
+ if (exchange.durable) {
+ string += ' --durable';
+ }
+ if (args[REPLICATE]) {
+ string += ' --replicate=' + args[REPLICATE];
+ }
+ if (args[MSG_SEQUENCE]) {
+ string += ' --sequence';
+ }
+ if (args[IVE]) {
+ string += ' --ive';
+ }
+ if (exchange.altExchange) {
+ string += ' --alternate-exchange=' + exMap[oid(exchange.altExchange)]._values.name;
+ }
+ console.log(string);
}
- if (args[FLOW_RESUME_COUNT]) {
- string += ' --flow-resume-count=' + args[FLOW_RESUME_COUNT];
+ }
+ brokerAgent.destroy();
+ });
+ };
+
+ var exchangeListRecurse = function(filter) {
+ brokerAgent.request(
+ // Send the QMF query requests for the specified classes.
+ brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
+ brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'),
+ brokerAgent.getObjects('org.apache.qpid.broker', 'binding')
+ ).then(function(objects) {
+ var exchanges = objects.exchange;
+ var bindings = objects.binding;
+ var queues = idMap(objects.queue);
+
+ for (var i = 0; i < exchanges.length; i++) {
+ var exchange = exchanges[i];
+ var exchangeId = oid(exchange._object_id);
+ exchange = exchange._values;
+
+ if (config._ignoreDefault && !exchange.name) continue;
+ if (filterMatch(exchange.name, filter)) {
+ console.log("Exchange '" + exchange.name + "' (" + exchange.type + ")");
+ for (var j = 0; j < bindings.length; j++) {
+ var bind = bindings[j]._values;
+ var exchangeRef = oid(bind.exchangeRef);
+
+ if (exchangeRef === exchangeId) {
+ var queue = queues[oid(bind.queueRef)];
+ var queueName = queue ? queue._values.name : "<unknown>";
+ console.log(" bind [" + bind.bindingKey + "] => " + queueName +
+ " " + renderObject(bind.arguments));
+ }
+ }
}
- if (args[MSG_GROUP_HDR_KEY]) {
- string += ' --group-header=' + args[MSG_GROUP_HDR_KEY];
+ }
+ brokerAgent.destroy();
+ });
+ };
+
+ var queueList = function(filter) {
+ brokerAgent.request(
+ // Send the QMF query requests for the specified classes.
+ brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
+ brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
+ ).then(function(objects) {
+ var queues = objects.queue;
+ var exMap = idMap(objects.exchange);
+ var caption = "Queue Name";
+ var maxNameLen = caption.length;
+ var found = false;
+ for (var i = 0; i < queues.length; i++) {
+ var queue = queues[i]._values;
+ if (filterMatch(queue.name, filter)) {
+ if (queue.name.length > maxNameLen) {
+ maxNameLen = queue.name.length;
+ }
+ found = true;
}
- if (args[SHARED_MSG_GROUP] === 1) {
- string += ' --shared-groups';
+ }
+ if (!found) {
+ config._returnCode = 1;
+ return;
+ }
+
+ var pad = Array(maxNameLen + 1 - caption.length).join(' ');
+ console.log(caption + pad + " Attributes");
+ console.log(Array(maxNameLen + caption.length + 3).join('='));
+
+ for (var i = 0; i < queues.length; i++) {
+ var queue = queues[i]._values;
+ if (filterMatch(queue.name, filter)) {
+ var pad2 = Array(maxNameLen + 2 - queue.name.length).join(' ');
+ var string = queue.name + pad2;
+ var args = queue.arguments ? queue.arguments : {};
+ if (queue.durable) {
+ string += ' --durable';
+ }
+ if (args[REPLICATE]) {
+ string += ' --replicate=' + args[REPLICATE];
+ }
+ if (queue.autoDelete) {
+ string += ' auto-del';
+ }
+ if (queue.exclusive) {
+ string += ' excl';
+ }
+ if (args[FILESIZE]) {
+ string += ' --file-size=' + args[FILESIZE];
+ }
+ if (args[FILECOUNT]) {
+ string += ' --file-count=' + args[FILECOUNT];
+ }
+ if (args[MAX_QUEUE_SIZE]) {
+ string += ' --max-queue-size=' + args[MAX_QUEUE_SIZE];
+ }
+ if (args[MAX_QUEUE_COUNT]) {
+ string += ' --max-queue-count=' + args[MAX_QUEUE_COUNT];
+ }
+ if (args[POLICY_TYPE]) {
+ string += ' --limit-policy=' + args[POLICY_TYPE].replace("_", "-");
+ }
+ if (args[LVQ_KEY]) {
+ string += ' --lvq-key=' + args[LVQ_KEY];
+ }
+ if (args[QUEUE_EVENT_GENERATION]) {
+ string += ' --generate-queue-events=' + args[QUEUE_EVENT_GENERATION];
+ }
+ if (queue.altExchange) {
+ string += ' --alternate-exchange=' + exMap[oid(queue.altExchange)]._values.name;
+ }
+ if (args[FLOW_STOP_SIZE]) {
+ string += ' --flow-stop-size=' + args[FLOW_STOP_SIZE];
+ }
+ if (args[FLOW_RESUME_SIZE]) {
+ string += ' --flow-resume-size=' + args[FLOW_RESUME_SIZE];
+ }
+ if (args[FLOW_STOP_COUNT]) {
+ string += ' --flow-stop-count=' + args[FLOW_STOP_COUNT];
+ }
+ if (args[FLOW_RESUME_COUNT]) {
+ string += ' --flow-resume-count=' + args[FLOW_RESUME_COUNT];
+ }
+ if (args[MSG_GROUP_HDR_KEY]) {
+ string += ' --group-header=' + args[MSG_GROUP_HDR_KEY];
+ }
+ if (args[SHARED_MSG_GROUP] === 1) {
+ string += ' --shared-groups';
+ }
+ string += ' ' + renderObject(args, true);
+ console.log(string);
}
- string += ' ' + renderObject(args, true);
- console.log(string);
}
- }
- brokerAgent.destroy();
- });
-};
-
-var queueListRecurse = function(filter) {
- brokerAgent.request(
- // Send the QMF query requests for the specified classes.
- brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
- brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'),
- brokerAgent.getObjects('org.apache.qpid.broker', 'binding')
- ).then(function(objects) {
- var queues = objects.queue;
- var bindings = objects.binding;
- var exchanges = idMap(objects.exchange);
-
- for (var i = 0; i < queues.length; i++) {
- var queue = queues[i];
- var queueId = oid(queue._object_id);
- queue = queue._values;
-
- if (filterMatch(queue.name, filter)) {
- console.log("Queue '" + queue.name + "'");
- for (var j = 0; j < bindings.length; j++) {
- var bind = bindings[j]._values;
- var queueRef = oid(bind.queueRef);
-
- if (queueRef === queueId) {
- var exchange = exchanges[oid(bind.exchangeRef)];
- var exchangeName = "<unknown>";
- if (exchange) {
- exchangeName = exchange._values.name;
- if (exchangeName === '') {
- if (config._ignoreDefault) continue;
- exchangeName = "''";
+ brokerAgent.destroy();
+ });
+ };
+
+ var queueListRecurse = function(filter) {
+ brokerAgent.request(
+ // Send the QMF query requests for the specified classes.
+ brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
+ brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'),
+ brokerAgent.getObjects('org.apache.qpid.broker', 'binding')
+ ).then(function(objects) {
+ var queues = objects.queue;
+ var bindings = objects.binding;
+ var exchanges = idMap(objects.exchange);
+
+ for (var i = 0; i < queues.length; i++) {
+ var queue = queues[i];
+ var queueId = oid(queue._object_id);
+ queue = queue._values;
+
+ if (filterMatch(queue.name, filter)) {
+ console.log("Queue '" + queue.name + "'");
+ for (var j = 0; j < bindings.length; j++) {
+ var bind = bindings[j]._values;
+ var queueRef = oid(bind.queueRef);
+
+ if (queueRef === queueId) {
+ var exchange = exchanges[oid(bind.exchangeRef)];
+ var exchangeName = "<unknown>";
+ if (exchange) {
+ exchangeName = exchange._values.name;
+ if (exchangeName === '') {
+ if (config._ignoreDefault) continue;
+ exchangeName = "''";
+ }
}
- }
-
- console.log(" bind [" + bind.bindingKey + "] => " + exchangeName +
- " " + renderObject(bind.arguments));
- }
+
+ console.log(" bind [" + bind.bindingKey + "] => " + exchangeName +
+ " " + renderObject(bind.arguments));
+ }
+ }
}
}
+ brokerAgent.destroy();
+ });
+ };
+
+ /**
+ * The following methods implement adding and deleting various Broker Management
+ * Objects via QMF. Although <qpid>/cpp/src/qpid/broker/management-schema.xml
+ * describes the basic method schema, for example:
+ * <method name="create" desc="Create an object of the specified type">
+ * <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
+ * <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/>
+ * <arg name="properties" dir="I" type="map" desc="Type specific object properties"/>
+ * <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/ >
+ * </method>
+ *
+ * <method name="delete" desc="Delete an object of the specified type">
+ * <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/>
+ * <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/>
+ * <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>
+ * </method>
+ *
+ * What the schema doesn't do however is to explain what the properties/options
+ * Map values actually mean, unfortunately these aren't documented anywhere so
+ * the only option is to look in the code, the best place to look is in:
+ * <qpid>/cpp/src/qpid/broker/Broker.cpp, the method Broker::ManagementMethod is
+ * the best place to start, then Broker::createObject and Broker::deleteObject
+ * even then it's pretty hard to figure out all that is possible.
+ */
+
+ var handleMethodResponse = function(response, dontStop) {
+ if (response._arguments) {
+ //console.log(response._arguments);
+ } if (response._values) {
+ console.error("Exception from Agent: " + renderObject(response._values));
+ }
+ // Mostly we want to stop the Messenger Event loop and exit when a QMF method
+ // returns, but sometimes we don't, the dontStop flag prevents this behaviour.
+ if (!dontStop) {
+ brokerAgent.destroy();
}
- brokerAgent.destroy();
- });
-};
-
-/**
- * The following methods implement adding and deleting various Broker Management
- * Objects via QMF. Although <qpid>/cpp/src/qpid/broker/management-schema.xml
- * describes the basic method schema, for example:
- * <method name="create" desc="Create an object of the specified type">
- * <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
- * <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/>
- * <arg name="properties" dir="I" type="map" desc="Type specific object properties"/>
- * <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/>
- * </method>
- *
- * <method name="delete" desc="Delete an object of the specified type">
- * <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/>
- * <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/>
- * <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>
- * </method>
- *
- * What the schema doesn't do however is to explain what the properties/options
- * Map values actually mean, unfortunately these aren't documented anywhere so
- * the only option is to look in the code, the best place to look is in:
- * <qpid>/cpp/src/qpid/broker/Broker.cpp, the method Broker::ManagementMethod is
- * the best place to start, then Broker::createObject and Broker::deleteObject
- * even then it's pretty hard to figure out all that is possible.
- */
-
-var handleMethodResponse = function(response, dontStop) {
- if (response._arguments) {
- //console.log(response._arguments);
- } if (response._values) {
- console.error("Exception from Agent: " + renderObject(response._values));
- }
- // Mostly we want to stop the Messenger Event loop and exit when a QMF method
- // returns, but sometimes we don't, the dontStop flag prevents this behaviour.
- if (!dontStop) {
- brokerAgent.destroy();
- }
-}
-
-var addExchange = function(args) {
- if (args.length < 2) {
- usage();
- }
-
- var etype = args[0];
- var ename = args[1];
- var declArgs = {};
-
- declArgs['exchange-type'] = etype;
-
- for (var a = 0; a < config._extra_arguments.length; a++) {
- var r = config._extra_arguments[a].split('=');
- declArgs[r[0]] = getValue(r);
- }
-
- if (config._msgSequence) {
- declArgs[MSG_SEQUENCE] = 1;
- }
-
- if (config._ive) {
- declArgs[IVE] = 1;
- }
-
- if (config._altern_ex) {
- declArgs['alternate-exchange'] = config._altern_ex;
- }
-
- if (config._durable) {
- declArgs['durable'] = 1;
- }
-
- if (config._replicate) {
- declArgs[REPLICATE] = config._replicate;
}
-
- brokerAgent.request(
- // We invoke the CRUD methods on the broker object.
- brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
- ).then(function(objects) {
- var broker = objects.broker[0];
- brokerAgent.request(
- brokerAgent.invokeMethod(broker, 'create', {
- "type": "exchange",
- "name": ename,
- "properties": declArgs,
- "strict": true})
- ).then(handleMethodResponse);
- });
-};
-
-var delExchange = function(args) {
- if (args.length < 1) {
- usage();
- }
-
- var ename = args[0];
-
- brokerAgent.request(
- // We invoke the CRUD methods on the broker object.
- brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
- ).then(function(objects) {
- var broker = objects.broker[0];
- brokerAgent.request(
- brokerAgent.invokeMethod(broker, 'delete', {
- "type": "exchange",
- "name": ename})
- ).then(handleMethodResponse);
- });
-};
-
-var addQueue = function(args) {
- if (args.length < 1) {
- usage();
- }
-
- var qname = args[0];
- var declArgs = {};
-
- for (var a = 0; a < config._extra_arguments.length; a++) {
- var r = config._extra_arguments[a].split('=');
- declArgs[r[0]] = getValue(r);
- }
-
- if (config._durable) {
- // allow the default fileCount and fileSize specified
- // in qpid config file to take prededence
- if (config._fileCount) {
- declArgs[FILECOUNT] = config._fileCount;
+
+ var addExchange = function(args) {
+ if (args.length < 2) {
+ usage();
}
- if (config._fileSize) {
- declArgs[FILESIZE] = config._fileSize;
+
+ var etype = args[0];
+ var ename = args[1];
+ var declArgs = {};
+
+ declArgs['exchange-type'] = etype;
+
+ for (var a = 0; a < config._extra_arguments.length; a++) {
+ var r = config._extra_arguments[a].split('=');
+ declArgs[r[0]] = getValue(r);
}
- }
-
- if (config._maxQueueSize != null) {
- declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize;
- }
-
- if (config._maxQueueCount != null) {
- declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount;
- }
- if (config._limitPolicy) {
- if (config._limitPolicy === 'none') {
- } else if (config._limitPolicy === 'reject') {
- declArgs[POLICY_TYPE] = 'reject';
- } else if (config._limitPolicy === 'ring') {
- declArgs[POLICY_TYPE] = 'ring';
+ if (config._msgSequence) {
+ declArgs[MSG_SEQUENCE] = 1;
}
- }
-
- if (config._lvq_key) {
- declArgs[LVQ_KEY] = config._lvq_key;
- }
-
- if (config._eventGeneration) {
- declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration;
- }
-
- if (config._flowStopSize != null) {
- declArgs[FLOW_STOP_SIZE] = config._flowStopSize;
- }
-
- if (config._flowResumeSize != null) {
- declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize;
- }
-
- if (config._flowStopCount != null) {
- declArgs[FLOW_STOP_COUNT] = config._flowStopCount;
- }
-
- if (config._flowResumeCount != null) {
- declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount;
- }
-
- if (config._msgGroupHeader) {
- declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader;
- }
-
- if (config._sharedMsgGroup) {
- declArgs[SHARED_MSG_GROUP] = 1;
- }
-
- if (config._altern_ex) {
- declArgs['alternate-exchange'] = config._altern_ex;
- }
-
- if (config._durable) {
- declArgs['durable'] = 1;
- }
-
- if (config._replicate) {
- declArgs[REPLICATE] = config._replicate;
- }
-
- // This block is a little complex and untidy, the real issue is that the
- // correlator object isn't as good as a real Promise and doesn't support
- // chaining of "then" calls, so where we have complex dependencies we still
- // get somewhat into "callback hell". TODO improve the correlator.
- brokerAgent.request(
- // We invoke the CRUD methods on the broker object.
- brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
- ).then(function(objects) {
- var broker = objects.broker[0];
+
+ if (config._ive) {
+ declArgs[IVE] = 1;
+ }
+
+ if (config._altern_ex) {
+ declArgs['alternate-exchange'] = config._altern_ex;
+ }
+
+ if (config._durable) {
+ declArgs['durable'] = 1;
+ }
+
+ if (config._replicate) {
+ declArgs[REPLICATE] = config._replicate;
+ }
+
brokerAgent.request(
- brokerAgent.invokeMethod(broker, 'create', {
- "type": "queue",
- "name": qname,
- "properties": declArgs,
- "strict": true})
- ).then(function(response) {
- if (config._start_replica) {
- handleMethodResponse(response, true); // The second parameter prevents exiting.
- // TODO test this stuff!
- brokerAgent.request(
- brokerAgent.getObjects('org.apache.qpid.ha', 'habroker') // Not sure if this is correct
- ).then(function(objects) {
- if (objects.habroker.length > 0) {
- var habroker = objects.habroker[0];
- brokerAgent.request(
- brokerAgent.invokeMethod(habroker, 'replicate', {
- "broker": config._start_replica,
- "queue": qname})
- ).then(handleMethodResponse);
- } else {
- brokerAgent.destroy();
- }
- });
- } else {
- handleMethodResponse(response);
- }
+ // We invoke the CRUD methods on the broker object.
+ brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
+ ).then(function(objects) {
+ var broker = objects.broker[0];
+ brokerAgent.request(
+ brokerAgent.invokeMethod(broker, 'create', {
+ "type": "exchange",
+ "name": ename,
+ "properties": declArgs,
+ "strict": true})
+ ).then(handleMethodResponse);
});
- });
-};
-
-var delQueue = function(args) {
- if (args.length < 1) {
- usage();
- }
-
- var qname = args[0];
-
- brokerAgent.request(
- // We invoke the CRUD methods on the broker object.
- brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
- ).then(function(objects) {
- var broker = objects.broker[0];
+ };
+
+ var delExchange = function(args) {
+ if (args.length < 1) {
+ usage();
+ }
+
+ var ename = args[0];
+
brokerAgent.request(
- brokerAgent.invokeMethod(broker, 'delete', {
- "type": "queue",
- "name": qname,
- "options": {"if_empty": config._if_empty,
- "if_unused": config._if_unused}})
- ).then(handleMethodResponse);
- });
-};
-
-var snarf_header_args = function(args) {
- if (args.length < 2) {
- console.log("Invalid args to bind headers: need 'any'/'all' plus conditions");
- return false;
- }
-
- var op = args[0];
- if (op === 'all' || op === 'any') {
- kv = {};
- var bindings = Array.prototype.slice.apply(args, [1]);
- for (var i = 0; i < bindings.length; i++) {
- var binding = bindings[i];
- binding = binding.split(",")[0];
- binding = binding.split("=");
- kv[binding[0]] = binding[1];
- }
- kv['x-match'] = op;
- return kv;
- } else {
- console.log("Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'");
- return false;
- }
-};
-
-var bind = function(args) {
- if (args.length < 2) {
- usage();
- }
-
- var ename = args[0];
- var qname = args[1];
- var key = '';
-
- if (args.length > 2) {
- key = args[2];
- }
-
- brokerAgent.request(
- // We invoke the CRUD methods on the broker object.
- brokerAgent.getObjects('org.apache.qpid.broker', 'broker'),
- brokerAgent.getObjects('org.apache.qpid.broker', 'exchange') // Get exchanges to look up exchange type.
- ).then(function(objects) {
- var exchanges = objects.exchange;
-
- var etype = '';
- for (var i = 0; i < exchanges.length; i++) {
- var exchange = exchanges[i]._values;
- if (exchange.name === ename) {
- etype = exchange.type;
- break;
- }
+ // We invoke the CRUD methods on the broker object.
+ brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
+ ).then(function(objects) {
+ var broker = objects.broker[0];
+ brokerAgent.request(
+ brokerAgent.invokeMethod(broker, 'delete', {
+ "type": "exchange",
+ "name": ename})
+ ).then(handleMethodResponse);
+ });
+ };
+
+ var addQueue = function(args) {
+ if (args.length < 1) {
+ usage();
}
-
- // type of the xchg determines the processing of the rest of
- // argv. if it's an xml xchg, we want to find a file
- // containing an x-query, and pass that. if it's a headers
- // exchange, we need to pass either "any" or all, followed by a
- // map containing key/value pairs. if neither of those, extra
- // args are ignored.
+
+ var qname = args[0];
var declArgs = {};
- if (etype === 'xml') {
-
-
- } else if (etype === 'headers') {
- declArgs = snarf_header_args(Array.prototype.slice.apply(args, [3]));
+
+ for (var a = 0; a < config._extra_arguments.length; a++) {
+ var r = config._extra_arguments[a].split('=');
+ declArgs[r[0]] = getValue(r);
}
-//console.log(declArgs);
-
- if (typeof declArgs !== 'object') {
- process.exit(1);
+
+ if (config._durable) {
+ // allow the default fileCount and fileSize specified
+ // in qpid config file to take prededence
+ if (config._fileCount) {
+ declArgs[FILECOUNT] = config._fileCount;
+ }
+ if (config._fileSize) {
+ declArgs[FILESIZE] = config._fileSize;
+ }
}
-
- var broker = objects.broker[0];
- brokerAgent.request(
- brokerAgent.invokeMethod(broker, 'create', {
- "type": "binding",
- "name": ename + '/' + qname + '/' + key,
- "properties": declArgs,
- "strict": true})
- ).then(handleMethodResponse);
- });
-
-/*
-
- ok = True
- _args = {}
- if not res:
- pass
- elif res.type == "xml":
- # this checks/imports the -f arg
- [ok, xquery] = snarf_xquery_args()
- _args = { "xquery" : xquery }
- else:
- if res.type == "headers":
- [ok, op, kv] = snarf_header_args(args[3:])
- _args = kv
- _args["x-match"] = op
-
- if not ok:
- sys.exit(1)
-
- self.broker.bind(ename, qname, key, _args)
-*/
-
-};
-
-var unbind = function(args) {
- if (args.length < 2) {
- usage();
- }
-
- var ename = args[0];
- var qname = args[1];
- var key = '';
-
- if (args.length > 2) {
- key = args[2];
- }
-
- brokerAgent.request(
- // We invoke the CRUD methods on the broker object.
- brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
- ).then(function(objects) {
- var broker = objects.broker[0];
- brokerAgent.request(
- brokerAgent.invokeMethod(broker, 'delete', {
- "type": "binding",
- "name": ename + '/' + qname + '/' + key})
- ).then(handleMethodResponse);
- });
-};
-
-/**
- * The following methods are "generic" create and delete methods to for arbitrary
- * Management Objects e.g. Incoming, Outgoing, Domain, Topic, QueuePolicy,
- * TopicPolicy etc. use --argument k1=v1 --argument k2=v2 --argument k3=v3 to
- * pass arbitrary arguments as key/value pairs to the Object being created/deleted,
- * for example to add a topic object that uses the fanout exchange:
- * ./qpid-config.js add topic fanout --argument exchange=amq.fanout \
- * --argument qpid.max_size=1000000 --argument qpid.policy_type=ring
- */
-
-var createObject = function(type, name, args) {
- brokerAgent.request(
- // We invoke the CRUD methods on the broker object.
- brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
- ).then(function(objects) {
- var broker = objects.broker[0];
- brokerAgent.request(
- // Create an object of the specified type.
- brokerAgent.invokeMethod(broker, 'create', {
- "type": type,
- "name": name,
- "properties": args,
- "strict": true})
- ).then(handleMethodResponse);
- });
-};
-
-var deleteObject = function(type, name, args) {
- brokerAgent.request(
- // We invoke the CRUD methods on the broker object.
- brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
- ).then(function(objects) {
- var broker = objects.broker[0];
+
+ if (config._maxQueueSize != null) {
+ declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize;
+ }
+
+ if (config._maxQueueCount != null) {
+ declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount;
+ }
+
+ if (config._limitPolicy) {
+ if (config._limitPolicy === 'none') {
+ } else if (config._limitPolicy === 'reject') {
+ declArgs[POLICY_TYPE] = 'reject';
+ } else if (config._limitPolicy === 'ring') {
+ declArgs[POLICY_TYPE] = 'ring';
+ }
+ }
+
+ if (config._lvq_key) {
+ declArgs[LVQ_KEY] = config._lvq_key;
+ }
+
+ if (config._eventGeneration) {
+ declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration;
+ }
+
+ if (config._flowStopSize != null) {
+ declArgs[FLOW_STOP_SIZE] = config._flowStopSize;
+ }
+
+ if (config._flowResumeSize != null) {
+ declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize;
+ }
+
+ if (config._flowStopCount != null) {
+ declArgs[FLOW_STOP_COUNT] = config._flowStopCount;
+ }
+
+ if (config._flowResumeCount != null) {
+ declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount;
+ }
+
+ if (config._msgGroupHeader) {
+ declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader;
+ }
+
+ if (config._sharedMsgGroup) {
+ declArgs[SHARED_MSG_GROUP] = 1;
+ }
+
+ if (config._altern_ex) {
+ declArgs['alternate-exchange'] = config._altern_ex;
+ }
+
+ if (config._durable) {
+ declArgs['durable'] = 1;
+ }
+
+ if (config._replicate) {
+ declArgs[REPLICATE] = config._replicate;
+ }
+
+ // This block is a little complex and untidy, the real issue is that the
+ // correlator object isn't as good as a real Promise and doesn't support
+ // chaining of "then" calls, so where we have complex dependencies we still
+ // get somewhat into "callback hell". TODO improve the correlator.
brokerAgent.request(
- // Create an object of the specified type and name.
- brokerAgent.invokeMethod(broker, 'delete', {
- "type": type,
- "name": name,
- "options": args})
- ).then(handleMethodResponse);
- });
-};
-
-/**
- * This is a "generic" mechanism for listing arbitrary Management Objects.
- */
-var listObjects = function(type) {
- brokerAgent.request(
- brokerAgent.getObjects('org.apache.qpid.broker', type)
- ).then(function(objects) {
- // The correlator passes an object containing responses for all of the
- // supplied requests so we index it by the supplied type to get our response.
- objects = objects[type];
-
- // Collect available attributes, stringify the values and compute the max
- // length of the value of each attribute so that we can later create a table.
- var attributes = {};
- var lengths = {};
- for (var i = 0; i < objects.length; i++) {
- var object = objects[i];
- object = object._values;
- for (var prop in object) {
- if (typeof object[prop] === 'object') { // Stringify Object properties.
- // Check if property is an ObjectID (reference property),
- // if so replace with the "name" part of the OID.
- if (object[prop]['_object_name']) {
- var parts = object[prop]['_object_name'].split(':');
- object[prop] = parts[parts.length - 1];
- } else {
- // Stringify general Object properties.
- object[prop] = renderObject(object[prop]);
- }
+ // We invoke the CRUD methods on the broker object.
+ brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
+ ).then(function(objects) {
+ var broker = objects.broker[0];
+ brokerAgent.request(
+ brokerAgent.invokeMethod(broker, 'create', {
+ "type": "queue",
+ "name": qname,
+ "properties": declArgs,
+ "strict": true})
+ ).then(function(response) {
+ if (config._start_replica) {
+ handleMethodResponse(response, true); // The second parameter prevents exiting.
+ // TODO test this stuff!
+ brokerAgent.request(
[... 680 lines stripped ...]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org