You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/09/13 15:46:29 UTC

svn commit: r1624737 - in /qpid/proton/branches/fadams-javascript-binding: examples/messenger/javascript/ proton-c/bindings/javascript/ tests/javascript/

Author: fadams
Date: Sat Sep 13 13:46:29 2014
New Revision: 1624737

URL: http://svn.apache.org/r1624737
Log:
Improve tests and demos, in particular improve the encapsulation around the QMF handling in qpid-config.js to make it potentially reusable

Modified:
    qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js
    qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js
    qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js
    qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js
    qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js
    qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html
    qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js
    qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js
    qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js
    qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js
    qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js
    qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js
    qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js

Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js Sat Sep 13 13:46:29 2014
@@ -21,11 +21,14 @@
 
 // Simple client for use with server.js illustrating request/response
 
-// Check if the environment is Node.js and if so import the required library.
-if (typeof exports !== "undefined" && exports !== null) {
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("client.js should be run in Node.js");
+    return;
 }
 
+var proton = require("qpid-proton");
+
 var address = "amqp://0.0.0.0";
 var subject = "UK.WEATHER";
 var replyTo = "~/replies";

Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js Sat Sep 13 13:46:29 2014
@@ -19,11 +19,14 @@
  *
  */
 
-// Check if the environment is Node.js and if so import the required library.
-if (typeof exports !== "undefined" && exports !== null) {
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("drain.js should be run in Node.js");
+    return;
 }
 
+var proton = require("qpid-proton");
+
 console.log("drain not implemented yet");
 process.exit(0);
 

Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js Sat Sep 13 13:46:29 2014
@@ -35,6 +35,12 @@
  * @file
  */
 
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("proxy.js should be run in Node.js");
+    return;
+}
+
 var proxy = require('./ws2tcp.js');
 
 var lport = 5673;

Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js Sat Sep 13 13:46:29 2014
@@ -20,7 +20,7 @@
  */
 
 /**
- * Port of qpid-config to JavaScript for node.js, mainly intended as a demo to
+ * Port of qpid-config to JavaScript for Node.js, mainly intended as a demo to
  * illustrate using QMF2 in JavaScript using the proton.Messenger JS binding.
  * It illustrates a few things including how to use Messenger completely
  * asynchronously including using an async request/response pattern with
@@ -36,180 +36,188 @@
  * for complication best illustrated by the need for the correlator object.
  */
 
-// Check if the environment is Node.js and if so import the required library.
-if (typeof exports !== "undefined" && exports !== null) {
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("qpid-config.js should be run in Node.js");
+    return;
 }
 
-var addr = 'guest:guest@localhost:5673';
-//var addr = 'localhost:5673';
-var address = 'amqp://' + addr + '/qmf.default.direct';
-console.log(address);
-
-var replyTo = '';
-var subscription;
-var subscribed = false;
-
-var message = new proton.Message();
-var messenger = new proton.Messenger();
-
-/**
- * The correlator object is a mechanism used to correlate requests with their
- * aynchronous responses. It might possible be better to make use of Promises
- * to implement part of this behaviour but a mechanism would still be needed to
- * correlate a request with its response callback in order to wrap things up in
- * a Promise, so much of the behaviour of this object would still be required.
- * In addition it seemed to make sense to make this QMF2 implementation fairly
- * free of dependencies and using Promises would require external libraries.
- * Instead the correlator implements "Promise-like" semantics, you might call it
- * a broken Promise :-)
- * <p>
- * in particular the request method behaves a *bit* like Promise.all() though it
- * is mostly fake and takes an array of functions that call the add() method
- * which is really the method used to associate response objects by correlationID.
- * The then method is used to register a listener that will be called when all
- * the requests that have been registered have received responses.
- * TODO error/timeout handling.
- */
-var correlator = {
-    _resolve: null,
-    _objects: {},
-    add: function(id) {
-        this._objects[id] = {complete: false, list: null};
-    },
-    request: function() {
-        this._resolve = function() {console.log("Warning: No resolver has been set")};
-        return this;
-    },
-    then: function(resolver) {
-        this._resolve = resolver ? resolver : this._resolve;
-    },
-    resolve: function() {
-        var opcode = message.properties['qmf.opcode'];
-        var correlationID = message.getCorrelationID();
-        var resp = this._objects[correlationID];
-        if (opcode === '_query_response') {
-            if (resp.list) {
-                Array.prototype.push.apply(resp.list, message.body); // This is faster than concat.
-            } else {
+var qmf = {}; // Create qmf namespace object.
+qmf.Console = function() { // qmf.Console Constructor.
+    var proton = require("qpid-proton");
+    var message = new proton.Message();
+    var messenger = new proton.Messenger();
+
+    var brokerAddress = '';
+    var replyTo = '';
+
+    /**
+     * The correlator object is a mechanism used to correlate requests with their
+     * aynchronous responses. It might possible be better to make use of Promises
+     * to implement part of this behaviour but a mechanism would still be needed to
+     * correlate a request with its response callback in order to wrap things up in
+     * a Promise, so much of the behaviour of this object would still be required.
+     * In addition it seemed to make sense to make this QMF2 implementation fairly
+     * free of dependencies and using Promises would require external libraries.
+     * Instead the correlator implements "Promise-like" semantics, you might call it
+     * a broken Promise :-)
+     * <p>
+     * in particular the request method behaves a *bit* like Promise.all() though it
+     * is mostly fake and takes an array of functions that call the add() method
+     * which is really the method used to associate response objects by correlationID.
+     * The then method is used to register a listener that will be called when all
+     * the requests that have been registered have received responses.
+     * TODO error/timeout handling.
+     */
+    var correlator = {
+        _resolve: null,
+        _objects: {},
+        add: function(id) {
+            this._objects[id] = {complete: false, list: null};
+        },
+        request: function() {
+            this._resolve = function() {console.log("Warning: No resolver has been set")};
+            return this;
+        },
+        then: function(resolver) {
+            this._resolve = resolver ? resolver : this._resolve;
+        },
+        resolve: function() {
+            var opcode = message.properties['qmf.opcode'];
+            var correlationID = message.getCorrelationID();
+            var resp = this._objects[correlationID];
+            if (opcode === '_query_response') {
+                if (resp.list) {
+                    Array.prototype.push.apply(resp.list, message.body); // This is faster than concat.
+                } else {
+                    resp.list = message.body;
+                }
+    
+                var partial = message.properties['partial'];
+                if (!partial) {
+                    resp.complete = true;
+                }
+    
+                this._objects[correlationID] = resp;
+                this._checkComplete();
+            } else if (opcode === '_method_response' || opcode === '_exception') {
                 resp.list = message.body;
-            }
-
-            var partial = message.properties['partial'];
-            if (!partial) {
                 resp.complete = true;
+                this._objects[correlationID] = resp;
+                this._checkComplete();
+            } else {
+                console.error("Bad Message response, qmf.opcode = " + opcode);
+            }
+        },
+        _checkComplete: function() {
+            var response = {};
+            for (var id in this._objects) {
+                var object = this._objects[id];
+                if (object.complete) {
+                    response[id] = object.list;
+                } else {
+                    return;
+                }
             }
+    
+            this._objects = {}; // Clear state ready for next call.
+            this._resolve(response.method ? response.method : response);
+        }
+    };  
 
-            this._objects[correlationID] = resp;
-            this._checkComplete();
-        } else if (opcode === '_method_response' || opcode === '_exception') {
-            resp.list = message.body;
-            resp.complete = true;
-            this._objects[correlationID] = resp;
-            this._checkComplete();
-        } else {
-            console.error("Bad Message response, qmf.opcode = " + opcode);
+    var pumpData = function() {
+        while (messenger.incoming()) {
+            // The second parameter forces Binary payloads to be decoded as strings
+            // this is useful because the broker QMF Agent encodes strings as AMQP
+            // binary, which is a right pain from an interoperability perspective.
+            var t = messenger.get(message, true);
+            correlator.resolve();
+            messenger.accept(t);
+        }
+
+        if (messenger.isStopped()) {
+            message.free();
+            messenger.free();
         }
-    },
-    _checkComplete: function() {
-        var response = {};
-        for (var id in this._objects) {
-            var object = this._objects[id];
-            if (object.complete) {
-                response[id] = object.list;
-            } else {
-                return;
+    };
+
+    this.getObjects = function(packageName, className) {
+        message.setAddress(brokerAddress);
+        message.setSubject('broker');
+        message.setReplyTo(replyTo);
+        message.setCorrelationID(className);
+        message.properties = {
+            "routing-key": "broker", // Added for Java Broker
+            "x-amqp-0-10.app-id": "qmf2",
+            "method": "request",
+            "qmf.opcode": "_query_request",
+        };
+        message.body = {
+            "_what": "OBJECT",
+            "_schema_id": {
+                "_package_name": packageName,
+                "_class_name": className
             }
-        }
+        };
 
-        this._objects = {}; // Clear state ready for next call.
-        this._resolve(response.method ? response.method : response);
-    }
-};
+        correlator.add(className);
+        messenger.put(message);
+    };
+
+    this.invokeMethod = function(object, method, arguments) {
+        var correlationID = 'method';
+        message.setAddress(brokerAddress);
+        message.setSubject('broker');
+        message.setReplyTo(replyTo);
+        message.setCorrelationID(correlationID);
+        message.properties = {
+            "routing-key": "broker", // Added for Java Broker
+            "x-amqp-0-10.app-id": "qmf2",
+            "method": "request",
+            "qmf.opcode": "_method_request",
+        };
+        message.body = {
+            "_object_id": object._object_id,
+            "_method_name" : method,
+            "_arguments"   : arguments
+        };
+
+        correlator.add(correlationID);
+        messenger.put(message);
+    };
 
-var pumpData = function() {
-    if (!subscribed) {
-        var subscriptionAddress = subscription.getAddress();
-        if (subscriptionAddress) {
-            subscribed = true;
+    this.addConnection = function(addr, callback) {
+        brokerAddress = addr + '/qmf.default.direct';
+        var replyAddress = addr + '/#';
+
+        messenger.on('subscription', function(subscription) {
+            var subscriptionAddress = subscription.getAddress();
             var splitAddress = subscriptionAddress.split('/');
             replyTo = splitAddress[splitAddress.length - 1];
+            callback();
+        });
 
-            onSubscription();
-        }
-    }
-
-    while (messenger.incoming()) {
-        // The second parameter forces Binary payloads to be decoded as strings
-        // this is useful because the broker QMF Agent encodes strings as AMQP
-        // binary, which is a right pain from an interoperability perspective.
-        var t = messenger.get(message, true);
-        correlator.resolve();
-        messenger.accept(t);
+        messenger.subscribe(replyAddress);
     }
 
-    if (messenger.isStopped()) {
-        message.free();
-        messenger.free();
+    this.destroy = function() {
+        messenger.stop(); 
     }
-};
 
-var getObjects = function(packageName, className) {
-    message.setAddress(address);
-    message.setSubject('broker');
-    message.setReplyTo(replyTo);
-    message.setCorrelationID(className);
-    message.properties = {
-        "routing-key": "broker", // Added for Java Broker
-        "x-amqp-0-10.app-id": "qmf2",
-        "method": "request",
-        "qmf.opcode": "_query_request",
-    };
-    message.body = {
-        "_what": "OBJECT",
-        "_schema_id": {
-            "_package_name": packageName,
-            "_class_name": className
-        }
-    };
+    this.request = function() {return correlator.request();}
 
-    correlator.add(className);
-    messenger.put(message);
+    messenger.on('error', function(error) {console.log(error);});
+    messenger.on('work', pumpData);
+    messenger.setOutgoingWindow(1024);
+    messenger.recv(); // Receive as many messages as messenger can buffer.
+    messenger.start();
 };
 
-var invokeMethod = function(object, method, arguments) {
-    var correlationID = 'method';
-    message.setAddress(address);
-    message.setSubject('broker');
-    message.setReplyTo(replyTo);
-    message.setCorrelationID(correlationID);
-    message.properties = {
-        "routing-key": "broker", // Added for Java Broker
-        "x-amqp-0-10.app-id": "qmf2",
-        "method": "request",
-        "qmf.opcode": "_method_request",
-    };
-    message.body = {
-        "_object_id": object._object_id,
-        "_method_name" : method,
-        "_arguments"   : arguments
-    };
-
-    correlator.add(correlationID);
-    messenger.put(message);
-};
-
-messenger.on('error', function(error) {console.log(error);});
-messenger.on('work', pumpData);
-messenger.setOutgoingWindow(1024);
-messenger.start();
-
-subscription = messenger.subscribe('amqp://' + addr + '/#');
-messenger.recv(); // Receive as many messages as messenger can buffer.
-
 
 /************************* qpid-config business logic ************************/
 
+var brokerAgent = new qmf.Console();
+
 var _usage =
 'Usage:  qpid-config [OPTIONS]\n' +
 '        qpid-config [OPTIONS] exchanges [filter-string]\n' +
@@ -382,7 +390,7 @@ var getValue = function(r) {
 
 var config = {
     _recursive      : false,
-    _host           : 'localhost:5673', // Note 5673 not 5672 as we use WebSocket transport.
+    _host           : 'guest:guest@localhost:5673', // Note 5673 not 5672 as we use WebSocket transport.
     _connTimeout    : 10,
     _ignoreDefault  : false,
     _altern_ex      : null,
@@ -539,10 +547,10 @@ var renderObject = function(obj, list) {
  */
 
 var overview = function() {
-    correlator.request(
+    brokerAgent.request(
         // Send the QMF query requests for the specified classes.
-        getObjects('org.apache.qpid.broker', 'queue'),
-        getObjects('org.apache.qpid.broker', 'exchange')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
+        brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
     ).then(function(objects) {
         var exchanges = objects.exchange;
         var queues = objects.queue;
@@ -571,14 +579,14 @@ var overview = function() {
         }
         console.log("        durable: " + durable);
         console.log("    non-durable: " + (queues.length - durable));
-        messenger.stop();
+        brokerAgent.destroy();
     });
 };
 
 var exchangeList = function(filter) {
-    correlator.request(
+    brokerAgent.request(
         // Send the QMF query requests for the specified classes.
-        getObjects('org.apache.qpid.broker', 'exchange')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
     ).then(function(objects) {
         var exchanges = objects.exchange;
         var exMap = idMap(exchanges);
@@ -630,16 +638,16 @@ var exchangeList = function(filter) {
                 console.log(string);
             }
         }
-        messenger.stop();
+        brokerAgent.destroy();
     });
 };
 
 var exchangeListRecurse = function(filter) {
-    correlator.request(
+    brokerAgent.request(
         // Send the QMF query requests for the specified classes.
-        getObjects('org.apache.qpid.broker', 'queue'),
-        getObjects('org.apache.qpid.broker', 'exchange'),
-        getObjects('org.apache.qpid.broker', 'binding')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
+        brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'),
+        brokerAgent.getObjects('org.apache.qpid.broker', 'binding')
     ).then(function(objects) {
         var exchanges = objects.exchange;
         var bindings = objects.binding;
@@ -666,15 +674,15 @@ var exchangeListRecurse = function(filte
                 }
             }
         }
-        messenger.stop();
+        brokerAgent.destroy();
     });
 };
 
 var queueList = function(filter) {
-    correlator.request(
+    brokerAgent.request(
         // Send the QMF query requests for the specified classes.
-        getObjects('org.apache.qpid.broker', 'queue'),
-        getObjects('org.apache.qpid.broker', 'exchange')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
+        brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
     ).then(function(objects) {
         var queues = objects.queue;
         var exMap = idMap(objects.exchange);
@@ -763,16 +771,16 @@ var queueList = function(filter) {
                 console.log(string);
             }
         }
-        messenger.stop();
+        brokerAgent.destroy();
     });
 };
 
 var queueListRecurse = function(filter) {
-    correlator.request(
+    brokerAgent.request(
         // Send the QMF query requests for the specified classes.
-        getObjects('org.apache.qpid.broker', 'queue'),
-        getObjects('org.apache.qpid.broker', 'exchange'),
-        getObjects('org.apache.qpid.broker', 'binding')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
+        brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'),
+        brokerAgent.getObjects('org.apache.qpid.broker', 'binding')
     ).then(function(objects) {
         var queues = objects.queue;
         var bindings = objects.binding;
@@ -806,7 +814,7 @@ var queueListRecurse = function(filter) 
                 }
             }
         }
-        messenger.stop();
+        brokerAgent.destroy();
     });
 };
 
@@ -836,7 +844,6 @@ var queueListRecurse = function(filter) 
  */
 
 var handleMethodResponse = function(response, dontStop) {
-console.log("Method result");
     if (response._arguments) {
         //console.log(response._arguments);
     } if (response._values) {
@@ -845,7 +852,7 @@ console.log("Method result");
     // Mostly we want to stop the Messenger Event loop and exit when a QMF method
     // returns, but sometimes we don't, the dontStop flag prevents this behaviour.
     if (!dontStop) {
-        messenger.stop();
+        brokerAgent.destroy();
     }
 }
 
@@ -885,13 +892,13 @@ var addExchange = function(args) {
         declArgs[REPLICATE] = config._replicate;
     }
 
-    correlator.request(
+    brokerAgent.request(
         // We invoke the CRUD methods on the broker object.
-        getObjects('org.apache.qpid.broker', 'broker')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
     ).then(function(objects) {
         var broker = objects.broker[0];
-        correlator.request(
-            invokeMethod(broker, 'create', {
+        brokerAgent.request(
+            brokerAgent.invokeMethod(broker, 'create', {
                 "type":      "exchange",
                 "name":       ename,
                 "properties": declArgs,
@@ -907,13 +914,13 @@ var delExchange = function(args) {
 
     var ename = args[0];
 
-    correlator.request(
+    brokerAgent.request(
         // We invoke the CRUD methods on the broker object.
-        getObjects('org.apache.qpid.broker', 'broker')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
     ).then(function(objects) {
         var broker = objects.broker[0];
-        correlator.request(
-            invokeMethod(broker, 'delete', {
+        brokerAgent.request(
+            brokerAgent.invokeMethod(broker, 'delete', {
                 "type":   "exchange",
                 "name":    ename})
         ).then(handleMethodResponse);
@@ -1009,13 +1016,13 @@ var addQueue = function(args) {
     // correlator object isn't as good as a real Promise and doesn't support
     // chaining of "then" calls, so where we have complex dependencies we still
     // get somewhat into "callback hell". TODO improve the correlator.
-    correlator.request(
+    brokerAgent.request(
         // We invoke the CRUD methods on the broker object.
-        getObjects('org.apache.qpid.broker', 'broker')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
     ).then(function(objects) {
         var broker = objects.broker[0];
-        correlator.request(
-            invokeMethod(broker, 'create', {
+        brokerAgent.request(
+            brokerAgent.invokeMethod(broker, 'create', {
                 "type":      "queue",
                 "name":       qname,
                 "properties": declArgs,
@@ -1024,18 +1031,18 @@ var addQueue = function(args) {
             if (config._start_replica) {
                 handleMethodResponse(response, true); // The second parameter prevents exiting.
                 // TODO test this stuff!
-                correlator.request(
-                    getObjects('org.apache.qpid.ha', 'habroker') // Not sure if this is correct
+                brokerAgent.request(
+                    brokerAgent.getObjects('org.apache.qpid.ha', 'habroker') // Not sure if this is correct
                 ).then(function(objects) {
                     if (objects.habroker.length > 0) {
                         var habroker = objects.habroker[0];
-                        correlator.request(
-                            invokeMethod(habroker, 'replicate', {
+                        brokerAgent.request(
+                            brokerAgent.invokeMethod(habroker, 'replicate', {
                                 "broker": config._start_replica,
                                 "queue":  qname})
                         ).then(handleMethodResponse);
                     } else {
-                        messenger.stop();
+                        brokerAgent.destroy();
                     }
                 });
             } else {
@@ -1052,13 +1059,13 @@ var delQueue = function(args) {
 
     var qname = args[0];
 
-    correlator.request(
+    brokerAgent.request(
         // We invoke the CRUD methods on the broker object.
-        getObjects('org.apache.qpid.broker', 'broker')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
     ).then(function(objects) {
         var broker = objects.broker[0];
-        correlator.request(
-            invokeMethod(broker, 'delete', {
+        brokerAgent.request(
+            brokerAgent.invokeMethod(broker, 'delete', {
                 "type":   "queue",
                 "name":    qname,
                 "options": {"if_empty":  config._if_empty,
@@ -1092,9 +1099,6 @@ var snarf_header_args = function(args) {
 };
 
 var bind = function(args) {
-console.log("bind");
-console.log(args);
-
     if (args.length < 2) {
         usage();
     }
@@ -1107,10 +1111,10 @@ console.log(args);
         key = args[2];
     }
 
-    correlator.request(
+    brokerAgent.request(
         // We invoke the CRUD methods on the broker object.
-        getObjects('org.apache.qpid.broker', 'broker'),
-        getObjects('org.apache.qpid.broker', 'exchange') // Get exchanges to look up exchange type.
+        brokerAgent.getObjects('org.apache.qpid.broker', 'broker'),
+        brokerAgent.getObjects('org.apache.qpid.broker', 'exchange') // Get exchanges to look up exchange type.
     ).then(function(objects) {
         var exchanges = objects.exchange;
 
@@ -1136,15 +1140,15 @@ console.log(args);
         } else if (etype === 'headers') {
             declArgs = snarf_header_args(Array.prototype.slice.apply(args, [3]));
         }
-console.log(declArgs);
+//console.log(declArgs);
 
         if (typeof declArgs !== 'object') {
             process.exit(1);
         }
 
         var broker = objects.broker[0];
-        correlator.request(
-            invokeMethod(broker, 'create', {
+        brokerAgent.request(
+            brokerAgent.invokeMethod(broker, 'create', {
                 "type":   "binding",
                 "name":    ename + '/' + qname + '/' + key,
                 "properties": declArgs,
@@ -1177,9 +1181,6 @@ console.log(declArgs);
 };
 
 var unbind = function(args) {
-console.log("unbind");
-console.log(args);
-
     if (args.length < 2) {
         usage();
     }
@@ -1192,13 +1193,13 @@ console.log(args);
         key = args[2];
     }
 
-    correlator.request(
+    brokerAgent.request(
         // We invoke the CRUD methods on the broker object.
-        getObjects('org.apache.qpid.broker', 'broker')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
     ).then(function(objects) {
         var broker = objects.broker[0];
-        correlator.request(
-            invokeMethod(broker, 'delete', {
+        brokerAgent.request(
+            brokerAgent.invokeMethod(broker, 'delete', {
                 "type":   "binding",
                 "name":    ename + '/' + qname + '/' + key})
         ).then(handleMethodResponse);
@@ -1216,14 +1217,14 @@ console.log(args);
  */
 
 var createObject = function(type, name, args) {
-    correlator.request(
+    brokerAgent.request(
         // We invoke the CRUD methods on the broker object.
-        getObjects('org.apache.qpid.broker', 'broker')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
     ).then(function(objects) {
         var broker = objects.broker[0];
-        correlator.request(
+        brokerAgent.request(
             // Create an object of the specified type.
-            invokeMethod(broker, 'create', {
+            brokerAgent.invokeMethod(broker, 'create', {
                 "type":       type,
                 "name":       name,
                 "properties": args,
@@ -1233,14 +1234,14 @@ var createObject = function(type, name, 
 };
 
 var deleteObject = function(type, name, args) {
-    correlator.request(
+    brokerAgent.request(
         // We invoke the CRUD methods on the broker object.
-        getObjects('org.apache.qpid.broker', 'broker')
+        brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
     ).then(function(objects) {
         var broker = objects.broker[0];
-        correlator.request(
+        brokerAgent.request(
             // Create an object of the specified type and name.
-            invokeMethod(broker, 'delete', {
+            brokerAgent.invokeMethod(broker, 'delete', {
                 "type":    type,
                 "name":    name,
                 "options": args})
@@ -1252,8 +1253,8 @@ var deleteObject = function(type, name, 
  * This is a "generic" mechanism for listing arbitrary Management Objects.
  */
 var listObjects = function(type) {
-    correlator.request(
-        getObjects('org.apache.qpid.broker', type)
+    brokerAgent.request(
+        brokerAgent.getObjects('org.apache.qpid.broker', type)
     ).then(function(objects) {
         // The correlator passes an object containing responses for all of the
         // supplied requests so we index it by the supplied type to get our response.
@@ -1325,23 +1326,23 @@ var listObjects = function(type) {
             console.log(string);
         }
 
-        messenger.stop();
+        brokerAgent.destroy();
     });
 };
 
 var reloadAcl = function() {
-    correlator.request(
-        getObjects('org.apache.qpid.acl', 'acl')
+    brokerAgent.request(
+        brokerAgent.getObjects('org.apache.qpid.acl', 'acl')
     ).then(function(objects) {
         if (objects.acl.length > 0) {
             var acl = objects.acl[0];
-            correlator.request(
+            brokerAgent.request(
                 // Create an object of the specified type.
-                invokeMethod(acl, 'reloadACLFile', {})
+                brokerAgent.invokeMethod(acl, 'reloadACLFile', {})
             ).then(handleMethodResponse);
         } else {
             console.log("Failed: No ACL Loaded in Broker");
-            messenger.stop();
+            brokerAgent.destroy();
         }
     });
 };
@@ -1502,9 +1503,6 @@ if (params.length > 0) {
 }
 
 //console.log(config._host);
+brokerAgent.addConnection(config._host, command);
 
 
-var onSubscription = function() {
-    command();
-};
-

Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js Sat Sep 13 13:46:29 2014
@@ -19,11 +19,14 @@
  *
  */
 
-// Check if the environment is Node.js and if so import the required library.
-if (typeof exports !== "undefined" && exports !== null) {
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("recv.js should be run in Node.js");
+    return;
 }
 
+var proton = require("qpid-proton");
+
 var address = "amqp://~0.0.0.0";
 var message = new proton.Message();
 var messenger = new proton.Messenger();

Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html Sat Sep 13 13:46:29 2014
@@ -61,9 +61,9 @@ console.log("body = " + body);
 messenger.on('error', function(error) {
     console.log("Received error " + error);
 
-message.free();
+// Error recovery seems to require a new Messenger instance.
+messenger.stop();
 messenger.free();
-message = new proton.Message();
 messenger = new proton.Messenger();
 messenger.start();
 console.log("Restarted");

Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js Sat Sep 13 13:46:29 2014
@@ -19,11 +19,14 @@
  *
  */
 
-// Check if the environment is Node.js and if so import the required library.
-if (typeof exports !== "undefined" && exports !== null) {
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("send.js should be run in Node.js");
+    return;
 }
 
+var proton = require("qpid-proton");
+
 var address = "amqp://0.0.0.0";
 var subject = "UK.WEATHER";
 var msgtext = "Hello World!";

Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js Sat Sep 13 13:46:29 2014
@@ -21,11 +21,14 @@
 
 // Simple server for use with client.js illustrating request/response
 
-// Check if the environment is Node.js and if so import the required library.
-if (typeof exports !== "undefined" && exports !== null) {
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("server.js should be run in Node.js");
+    return;
 }
 
+var proton = require("qpid-proton");
+
 var address = "amqp://~0.0.0.0";
 var message = new proton.Message();
 var reply   = new proton.Message();

Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js Sat Sep 13 13:46:29 2014
@@ -19,11 +19,14 @@
  *
  */
 
-// Check if the environment is Node.js and if so import the required library.
-if (typeof exports !== "undefined" && exports !== null) {
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("spout.js should be run in Node.js");
+    return;
 }
 
+var proton = require("qpid-proton");
+
 console.log("spout not implemented yet");
 process.exit(0);
 

Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js Sat Sep 13 13:46:29 2014
@@ -214,9 +214,17 @@ Module['Messenger'] = function(name) { /
      */
     _pn_messenger_set_blocking(this._messenger, false);
 
+    // Subscriptions that haven't yet completed, used for managing subscribe events.
+    this._pendingSubscriptions = [];
+
     // Used in the Event registration mechanism (in the 'on' and 'emit' methods).
     this._callbacks = {};
 
+    // This call ensures that the emscripten network callback functions are initialised.
+    Module.EventDispatch.registerMessenger(this);
+
+
+    // TODO improve error handling mechanism.
     /*
      * The emscripten websocket error event could get triggered by any Messenger
      * and it's hard to determine which one without knowing which file descriptors
@@ -233,10 +241,13 @@ Module['Messenger'] = function(name) { /
      */
     var that = this;
     Module['websocket']['on']('error', function(error) {
+
+console.log("Module['websocket']['on'] caller is " + arguments.callee.caller.toString());
+
 console.log("that._checkErrors = " + that._checkErrors);
 console.log("error = " + error);
         if (that._checkErrors) {
-            that.emit('error', new Module['MessengerError'](error[2]));
+            that._emit('error', new Module['MessengerError'](error[2]));
         }
     });
 };
@@ -269,7 +280,7 @@ _Messenger_._check = function(code) {
         var message = errno ? this['getError']() : Pointer_stringify(_pn_code(code));
 
         if (this._callbacks['error']) {
-            this.emit('error', new Module['MessengerError'](message));
+            this._emit('error', new Module['MessengerError'](message));
         } else {
             throw new Module['MessengerError'](message);
         }
@@ -279,18 +290,48 @@ _Messenger_._check = function(code) {
 };
 
 /**
- * Invokes a callback registered for a specified event.
- * @method emit
+ * Invokes the callbacks registered for a specified event.
+ * @method _emit
  * @memberof! proton.Messenger#
  * @param event the event we want to emit.
  * @param param the parameter we'd like to pass to the event callback.
  */
-_Messenger_.emit = function(event, param) {
-    if ('function' === typeof this._callbacks[event]) {
-        this._callbacks[event].call(this, param);
+_Messenger_._emit = function(event, param) {
+    var callbacks = this._callbacks[event];
+    if (callbacks) {
+        for (var i = 0; i < callbacks.length; i++) {
+            var callback = callbacks[i];
+            if ('function' === typeof callback) {
+                callback.call(this, param);
+            }
+        }
+    }
+};
+
+/**
+ * Checks any pending subscriptions and when a source address becomes available
+ * emit a subscription event passing the Subscription that triggered the event.
+ * Note that this doesn't seem to work for listen/bind style subscriptions,
+ * that is to say subscriptions of the form amqp://~0.0.0.0 don't know why?
+ */
+_Messenger_._checkSubscriptions = function() {
+    // Check for completed subscriptions, and emit subscribe event.
+    var subscriptions = this._pendingSubscriptions;
+    if (subscriptions.length) {
+        var pending = []; // Array of any subscriptions that remain pending.
+        for (var j = 0; j < subscriptions.length; j++) {
+            subscription = subscriptions[j];
+            if (subscription['getAddress']()) {
+                this._emit('subscription', subscription);
+            } else {
+                pending.push(subscription);
+            }
+        }
+        this._pendingSubscriptions = pending;
     }
 };
 
+
 // *************************** Public methods *****************************
 
 /**
@@ -327,11 +368,11 @@ _Messenger_.emit = function(event, param
  */
 _Messenger_['on'] = function(event, callback) {
     if ('function' === typeof callback) {
-        if (event === 'work') {
-            Module.EventDispatch.addListener(this, callback);
-        } else {
-	        this._callbacks[event] = callback;
+        if (!this._callbacks[event]) {
+            this._callbacks[event] = [];
         }
+
+        this._callbacks[event].push(callback);
     }
 };
 
@@ -340,15 +381,25 @@ _Messenger_['on'] = function(event, call
  * @method removeListener
  * @memberof! proton.Messenger#
  * @param event the event we want to detach from.
- * @param callback the callback function to be remove for the specified event.
+ * @param callback the callback function to be removed for the specified event.
+ *        if no callback is specified all callbacks are removed for the event.
  */
 _Messenger_['removeListener'] = function(event, callback) {
-    if ('function' === typeof callback) {
-        if (event === 'work') {
-            Module.EventDispatch.removeListener(this, callback);
-        } else {
-	        this._callbacks[event] = null;//callback;
+    if (callback) {
+        var callbacks = this._callbacks[event];
+        if ('function' === typeof callback && callbacks) {
+            // Search for the specified callback.
+            for (var i = 0; i < callbacks.length; i++) {
+                if (callback === callbacks[i]) {
+                    // If we find the specified callback delete it and return.
+                    callbacks.splice(i, 1);
+                    return;
+                }
+            }
         }
+    } else {
+        // If we call remove with no callback we remove all callbacks.
+        delete this._callbacks[event];
     }
 };
 
@@ -392,6 +443,8 @@ _Messenger_['isBlocking'] = function() {
  * @memberof! proton.Messenger#
  */
 _Messenger_['free'] = function() {
+    // This call ensures that the emscripten network callback functions are removed.
+    Module.EventDispatch.unregisterMessenger(this);
     _pn_messenger_free(this._messenger);
 };
 
@@ -477,14 +530,6 @@ _Messenger_['setIncomingWindow'] = funct
  */
 _Messenger_['start'] = function() {
     this._check(_pn_messenger_start(this._messenger));
-
-    // This call ensures that the emscripten network callback functions are set
-    // up even if a client hasn't explicity added a work function via a call to
-    // messenger.on('work', <work function>);
-    // Doing this means that pn_messenger_work() will still get called when any
-    // WebSocket events occur, which keeps things more reliable when things like
-    // reconnections occur.
-    Module.EventDispatch.addListener(this);
 };
 
 /**
@@ -534,7 +579,7 @@ _Messenger_['subscribe'] = function(sour
         this._check(Module['Error']['ARG_ERR']);
     }
     var sp = Runtime.stackSave();
-    this._checkErrors = true;
+    this._checkErrors = true; // TODO improve error handling mechanism.
     var subscription = _pn_messenger_subscribe(this._messenger,
                                                allocate(intArrayFromString(source), 'i8', ALLOC_STACK));
     Runtime.stackRestore(sp);
@@ -542,7 +587,10 @@ _Messenger_['subscribe'] = function(sour
     if (!subscription) {
         this._check(Module['Error']['ERR']);
     }
-    return new Subscription(subscription);
+
+    subscription = new Subscription(subscription)
+    this._pendingSubscriptions.push(subscription);
+    return subscription;
 };
 
 /**
@@ -570,7 +618,7 @@ _Messenger_['subscribe'] = function(sour
 _Messenger_['put'] = function(message, flush) {
     flush = flush === false ? false : true;
     message._preEncode();
-    this._checkErrors = true;
+    this._checkErrors = true; // TODO improve error handling mechanism.
     this._check(_pn_messenger_put(this._messenger, message._message));
 
     // If flush is set invoke pn_messenger_work.
@@ -660,7 +708,7 @@ _Messenger_['work'] = function() {
 console.log("work = false");
         return false;
     } else {
-        this._checkErrors = false;
+        this._checkErrors = false; // TODO improve error handling mechanism.
         this._check(err);
 console.log("work = true");
         return true;
@@ -931,7 +979,7 @@ _Messenger_['rewrite'] = function(patter
  * @memberof proton
  */
 Module.EventDispatch = new function() { // Note the use of new to create a Singleton.
-    var _firstCall = true; // Flag used to check the first time addListener is called.
+    var _firstCall = true; // Flag used to check the first time registerMessenger is called.
     var _messengers = {};
 
     /**
@@ -950,15 +998,15 @@ Module.EventDispatch = new function() { 
     var _pump = function(fd, closing) {
         for (var i in _messengers) {
             if (_messengers.hasOwnProperty(i)) {
-                var current = _messengers[i];
+                var messenger = _messengers[i];
 
                 if (closing) {
-                    current.invokeCallbacks();
+                    messenger._emit('work');
                 } else {
-                    var messenger = current.messenger;
                     while (_pn_messenger_work(messenger._messenger, 0) >= 0) {
-                        messenger._checkErrors = false;
-                        current.invokeCallbacks();
+                        messenger._checkSubscriptions();
+                        messenger._checkErrors = false; // TODO improve error handling mechanism.
+                        messenger._emit('work');
                     }
                 }
             }
@@ -974,64 +1022,33 @@ Module.EventDispatch = new function() { 
     };
 
     /**
-     * Initialises the emscripten network callback functions. This needs to be
-     * done the first time we call addListener rather that when we create the
-     * Singleton because emscripten's socket filesystem has to be mounted before
-     * we can register listeners for any of these events.
+     * Register the specified Messenger as being interested in network events.
      */
-    var _init = function() {
-        Module['websocket']['on']('open', _pump);
-        Module['websocket']['on']('connection', _pump);
-        Module['websocket']['on']('message', _pump);
-        Module['websocket']['on']('close', _close);
-    };
-
-    /**
-     * Add a listener callback for the specified Messenger. Multiple listeners
-     * are permitted for each Messenger and listeners can be registered for
-     * multiple Messenger instances. The first time this method is called we
-     * initialise the emscripten network callback functions.
-     */
-    this.addListener = function(messenger, callback) {
+    this.registerMessenger = function(messenger) {
         if (_firstCall) {
-            _init();
+            /**
+             * Initialises the emscripten network callback functions. This needs
+             * to be done the first time we call registerMessenger rather than
+             * when we create the Singleton because emscripten's socket filesystem
+             * has to be mounted before can listen for any of these events.
+             */
+            Module['websocket']['on']('open', _pump);
+            Module['websocket']['on']('connection', _pump);
+            Module['websocket']['on']('message', _pump);
+            Module['websocket']['on']('close', _close);
             _firstCall = false;
         }
 
         var name = messenger.getName();
-        if (!_messengers[name]) {
-            _messengers[name] = {
-                messenger: messenger,
-                callbacks: [],
-                invokeCallbacks: function() {
-                    for (var j = 0; j < this.callbacks.length; j++) {
-                        this.callbacks[j]();
-                    }
-                }
-            };
-        }
-
-        if (callback) {
-            _messengers[name].callbacks.push(callback);
-        }
+        _messengers[name] = messenger; 
     };
 
     /**
-     * Remove the specified listener callback from the specified Messenger.
+     * Unregister the specified Messenger from interest in network events.
      */
-    this.removeListener = function(messenger, callback) {
+    this.unregisterMessenger = function(messenger) {
         var name = messenger.getName();
-        if (_messengers[name]) {
-            // If we find the registered Messenger search for the specified callback.
-            var callbacks = _messengers[name].callbacks;
-            for (var j = 0; j < callbacks.length; j++) {
-                if (callback === callbacks[j]) {
-                    // If we find the specified callback delete it and return.
-                    callbacks.splice(j, 1);
-                    return;
-                }
-            }
-        }
+        delete _messengers[name];
     };
 };
 

Modified: qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js Sat Sep 13 13:46:29 2014
@@ -23,13 +23,16 @@
  * proton.Data wrapper class.
  */
 
-// Check if the environment is Node.js and if so import the required libraries.
-if (typeof exports !== "undefined" && exports !== null) {
-    unittest = require("./unittest.js");
-    assert = require("assert");
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("codec.js should be run in Node.js");
+    return;
 }
 
+var unittest = require("./unittest.js");
+var assert = require("assert");
+var proton = require("qpid-proton");
+
 // Extend TestCase by creating a prototype instance and adding test methods as properties.
 var DataTest = new unittest.TestCase();
 

Modified: qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js Sat Sep 13 13:46:29 2014
@@ -23,13 +23,16 @@
  * proton.Message wrapper class.
  */
 
-// Check if the environment is Node.js and if so import the required libraries.
-if (typeof exports !== "undefined" && exports !== null) {
-    unittest = require("./unittest.js");
-    assert = require("assert");
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("message.js should be run in Node.js");
+    return;
 }
 
+var unittest = require("./unittest.js");
+var assert = require("assert");
+var proton = require("qpid-proton");
+
 /**
  * JavaScript Implementation of Python's range() function taken from:
  * http://stackoverflow.com/questions/8273047/javascript-function-similar-to-python-range

Modified: qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js?rev=1624737&r1=1624736&r2=1624737&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js Sat Sep 13 13:46:29 2014
@@ -19,19 +19,20 @@
  *
  */
 
-// Check if the environment is Node.js and if so import the required library.
-if (typeof exports !== "undefined" && exports !== null) {
-    proton = require("qpid-proton");
+// Check if the environment is Node.js and if not log an error and exit.
+if (!exports) {
+    console.error("soak.js should be run in Node.js");
+    return;
 }
 
+var proton = require("qpid-proton");
+
 var addr = 'guest:guest@localhost:5673';
 //var addr = 'localhost:5673';
 var address = 'amqp://' + addr;
 console.log(address);
 
 var subscriptionQueue = '';
-var subscription;
-var subscribed = false;
 var count = 0;
 var start = 0; // Start Time.
 
@@ -39,16 +40,6 @@ var message = new proton.Message();
 var messenger = new proton.Messenger();
 
 var pumpData = function() {
-    if (!subscribed) {
-        var subscriptionAddress = subscription.getAddress();
-        if (subscriptionAddress) {
-            subscribed = true;
-            var splitAddress = subscriptionAddress.split('/');
-            subscriptionQueue = splitAddress[splitAddress.length - 1];
-            onSubscription();
-        }
-    }
-
     while (messenger.incoming()) {
         // The second parameter forces Binary payloads to be decoded as strings
         // this is useful because the broker QMF Agent encodes strings as AMQP
@@ -84,16 +75,21 @@ var sendMessage = function() {
 
 messenger.on('error', function(error) {console.log(error);});
 messenger.on('work', pumpData);
+messenger.on('subscription', function(subscription) {
+    var subscriptionAddress = subscription.getAddress();
+    var splitAddress = subscriptionAddress.split('/');
+    subscriptionQueue = splitAddress[splitAddress.length - 1];
+
+    console.log("Subscription Queue: " + subscriptionQueue);
+    start = +new Date();
+    sendMessage();
+});
+
 //messenger.setOutgoingWindow(1024);
 messenger.setIncomingWindow(1024); // The Java Broker seems to need this.
+messenger.recv(); // Receive as many messages as messenger can buffer.
 messenger.start();
 
-subscription = messenger.subscribe('amqp://' + addr + '/#');
-messenger.recv(); // Receive as many messages as messenger can buffer.
+messenger.subscribe('amqp://' + addr + '/#');
 
-var onSubscription = function() {
-    console.log("Subscription Queue: " + subscriptionQueue);
-    start = +new Date();
-    sendMessage();
-};
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org