You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/11/28 14:50:26 UTC

[48/51] [abbrv] qpid-proton git commit: Replace pn_messenger_work based event handling with selectable based implementation in order to have more control. This greatly improves error handling and allows much cleaner tidying up of resources when errors oc

Replace pn_messenger_work based event handling with selectable based implementation in order to have more control. This greatly improves error handling and allows much cleaner tidying up of resources when errors occur

git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/fadams-javascript-binding@1631208 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e8faa57e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e8faa57e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e8faa57e

Branch: refs/heads/master
Commit: e8faa57eafcb567d3447d817bd500d1a4e9cc41a
Parents: 0d44f57
Author: fadams <fa...@unknown>
Authored: Sun Oct 12 16:45:29 2014 +0000
Committer: fadams <fa...@unknown>
Committed: Sun Oct 12 16:45:29 2014 +0000

----------------------------------------------------------------------
 examples/messenger/javascript/send.html      |   9 -
 proton-c/bindings/javascript/CMakeLists.txt  |   2 +-
 proton-c/bindings/javascript/error.js        |  33 +-
 proton-c/bindings/javascript/messenger.js    |  57 +---
 proton-c/bindings/javascript/module.js       | 370 ++++++++++++++++------
 proton-c/bindings/javascript/subscription.js |  18 +-
 proton-c/src/codec/data.h                    |   2 +-
 7 files changed, 339 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8faa57e/examples/messenger/javascript/send.html
----------------------------------------------------------------------
diff --git a/examples/messenger/javascript/send.html b/examples/messenger/javascript/send.html
index f38a78e..c61e3f2 100644
--- a/examples/messenger/javascript/send.html
+++ b/examples/messenger/javascript/send.html
@@ -67,15 +67,6 @@ console.log("body = " + body);
 
 var errorHandler = function(error) {
     console.log("Received error " + error);
-
-    // Error recovery seems to require a new Messenger instance.
-    messenger.stop();
-    messenger.free();
-    messenger = new proton.Messenger();
-
-    messenger.on('error', errorHandler);
-    messenger.start();
-    console.log("Restarted");
 };
 
 messenger.on('error', errorHandler);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8faa57e/proton-c/bindings/javascript/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/CMakeLists.txt b/proton-c/bindings/javascript/CMakeLists.txt
index 316dbe4..ee1241f 100644
--- a/proton-c/bindings/javascript/CMakeLists.txt
+++ b/proton-c/bindings/javascript/CMakeLists.txt
@@ -216,7 +216,7 @@ set_target_properties(
   # fiddly with node.js packages. This behaviour might be reinstated if the
   # packaging mechanism improves.
 
-  LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/module.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/error.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/messenger.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/subscription.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/message.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-uuid.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-symbol.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-described.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-array.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-typed-number.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-long.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-binary.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_get_version_major', '_
 pn_get_version_minor', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_work', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming',  '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_subscription_address', '_pn_message',
  '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_pn_message_get_group_id', '_pn_messa
 ge_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_pn_data_put_double', '_pn_data_put_d
 ecimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump']\""
+  LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/module.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/error.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/messenger.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/subscription.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/message.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-uuid.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-symbol.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-described.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-array.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-typed-number.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-long.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-binary.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_get_version_major', '_
 pn_get_version_minor', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_work', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming',  '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_messenger_set_passive', '_pn_messenger_selectable', '_pn_subscription_get_context', '_pn_subscription
 _set_context', '_pn_subscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_s
 et_creation_time', '_pn_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_p
 n_data_put_float', '_pn_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump', '_pn_selectable_readable', '_pn_selectable_capacity', '_pn_selectable_writable', '_pn_selectable_pending', '_pn_s
 electable_is_terminal', '_pn_selectable_fd', '_pn_selectable_free']\""
   )
 
 # This command packages up the compiled proton.js into a node.js package called

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8faa57e/proton-c/bindings/javascript/error.js
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/error.js b/proton-c/bindings/javascript/error.js
index a1553b0..4069bef 100644
--- a/proton-c/bindings/javascript/error.js
+++ b/proton-c/bindings/javascript/error.js
@@ -90,7 +90,34 @@ Module['MessengerError'].prototype = new Error();
 Module['MessengerError'].prototype.constructor = Module['MessengerError'];
 
 Module['MessengerError'].prototype.toString = function() {
-    return this.name + ': ' + this.message
+    return this.name + ': ' + this.message;
+};
+
+
+/*****************************************************************************/
+/*                                                                           */
+/*                              SubscriptionError                            */
+/*                                                                           */
+/*****************************************************************************/
+
+/**
+ * Constructs a proton.SubscriptionError instance.
+ * @classdesc This class is a subclass of MessengerError.
+ * @constructor proton.SubscriptionError
+ * @param {string} source the address that we want to subscribe to.
+ * @param {string} message the error message.
+ */
+Module['SubscriptionError'] = function(source, message) { // SubscriptionError constructor.
+    this.name = "SubscriptionError";
+    this.source = source;
+    this.message = (message || "");
+};
+
+Module['SubscriptionError'].prototype = new Module['MessengerError']();
+Module['SubscriptionError'].prototype.constructor = Module['SubscriptionError'];
+
+Module['SubscriptionError'].prototype.toString = function() {
+    return this.name + ': ' + this.source + ': ' + this.message;
 };
 
 
@@ -115,7 +142,7 @@ Module['MessageError'].prototype = new Error();
 Module['MessageError'].prototype.constructor = Module['MessageError'];
 
 Module['MessageError'].prototype.toString = function() {
-    return this.name + ': ' + this.message
+    return this.name + ': ' + this.message;
 };
 
 
@@ -140,6 +167,6 @@ Module['DataError'].prototype = new Error();
 Module['DataError'].prototype.constructor = Module['DataError'];
 
 Module['DataError'].prototype.toString = function() {
-    return this.name + ': ' + this.message
+    return this.name + ': ' + this.message;
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8faa57e/proton-c/bindings/javascript/messenger.js
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/messenger.js b/proton-c/bindings/javascript/messenger.js
index 80eec5b..e2f4418 100644
--- a/proton-c/bindings/javascript/messenger.js
+++ b/proton-c/bindings/javascript/messenger.js
@@ -269,14 +269,14 @@ _Messenger_['removeListener'] = function(event, callback) {
             // Search for the specified callback.
             for (var i = 0; i < callbacks.length; i++) {
                 if (callback === callbacks[i]) {
-                    // If we find the specified callback delete it and return.
+                    // If we find the specified callback, delete it and return.
                     callbacks.splice(i, 1);
                     return;
                 }
             }
         }
     } else {
-        // If we call remove with no callback we remove all callbacks.
+        // If we call remove with no callback specified we remove all callbacks.
         delete this._callbacks[event];
     }
 };
@@ -368,7 +368,7 @@ _Messenger_['getOutgoingWindow'] = function() {
  * @param {number} window the size of the tracking window in messages.
  */
 _Messenger_['setOutgoingWindow'] = function(window) {
-    this._check(_pn_messenger_set_outgoing_window(this._messenger, window));
+    _pn_messenger_set_outgoing_window(this._messenger, window);
 };
 
 /**
@@ -397,7 +397,7 @@ _Messenger_['getIncomingWindow'] = function() {
  * @param {number} window the size of the tracking window in messages.
  */
 _Messenger_['setIncomingWindow'] = function(window) {
-    this._check(_pn_messenger_set_incoming_window(this._messenger, window));
+    _pn_messenger_set_incoming_window(this._messenger, window);
 };
 
 /**
@@ -421,10 +421,9 @@ _Messenger_['start'] = function() {
  * to see if it has fully stopped.
  * @method stop
  * @memberof! proton.Messenger#
- * @returns {@link proton.Error.INPROGRESS} if still busy.
  */
 _Messenger_['stop'] = function() {
-    return this._check(_pn_messenger_stop(this._messenger));
+    _pn_messenger_stop(this._messenger);
 };
 
 /**
@@ -454,29 +453,10 @@ _Messenger_['isStopped'] = function() {
  */
 _Messenger_['subscribe'] = function(source) {
     if (!source) {
-        this._check(Module['Error']['ARG_ERR']);
-    }
-    var sp = Runtime.stackSave();
-    Module.EventDispatch.setCurrentMessenger(this);
-    var subscription = _pn_messenger_subscribe(this._messenger,
-                                               allocate(intArrayFromString(source), 'i8', ALLOC_STACK));
-    Module.EventDispatch.setCurrentMessenger(null);
-    Runtime.stackRestore(sp);
-
-    if (!subscription) {
-        this._check(Module['Error']['ERR']);
-    }
-
-    // For peer subscriptions to this Messenger emit a subscription event
-    // immediately otherwise defer until the address is resolved remotely.
-    if (source.indexOf('~') !== -1) {
-        subscription = new Subscription(subscription, source);
-        this._emit('subscription', subscription);
+        this._emit('error', new Module['SubscriptionError'](source, 'CONNECTION ERROR: Address not specified'));
     } else {
-        subscription = new Subscription(subscription)
-        this._pendingSubscriptions.push(subscription);
+        return Module.EventDispatch.subscribe(this, source);
     }
-    return subscription;
 };
 
 /**
@@ -504,13 +484,10 @@ _Messenger_['subscribe'] = function(source) {
 _Messenger_['put'] = function(message, flush) {
     flush = flush === false ? false : true; // Defaults to true if not explicitly specified.
     message._preEncode();
-    Module.EventDispatch.setCurrentMessenger(this);
     this._check(_pn_messenger_put(this._messenger, message._message));
-    Module.EventDispatch.setCurrentMessenger(null);
 
-    // If flush is set invoke pn_messenger_work.
     if (flush) {
-        this._check(_pn_messenger_work(this._messenger, 0));
+        Module.EventDispatch.flush();
     }
 
     // Getting the tracker is a little tricky as it is a 64 bit number. The way
@@ -577,7 +554,7 @@ _Messenger_['settle'] = function(tracker) {
         flags = Module['Messenger'].PN_CUMULATIVE;
     }
 
-    this._check(_pn_messenger_settle(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits(), flags));
+    _pn_messenger_settle(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits(), flags);
 };
 
 /**
@@ -602,7 +579,7 @@ _Messenger_['work'] = function() {
  *        as many messages as it can buffer internally.
  */
 _Messenger_['recv'] = function(limit) {
-    this._check(_pn_messenger_recv(this._messenger, (limit ? limit : -1)));
+    _pn_messenger_recv(this._messenger, (limit ? limit : -1));
 };
 
 /**
@@ -641,7 +618,7 @@ _Messenger_['get'] = function(message, decodeBinaryAsString) {
         impl = message._message;
     }
 
-    this._check(_pn_messenger_get(this._messenger, impl));
+    _pn_messenger_get(this._messenger, impl);
 
     if (message) {
         message._postDecode(decodeBinaryAsString);
@@ -798,9 +775,9 @@ _Messenger_['incoming'] = function() {
  */
 _Messenger_['route'] = function(pattern, address) {
     var sp = Runtime.stackSave();
-    this._check(_pn_messenger_route(this._messenger,
-                                    allocate(intArrayFromString(pattern), 'i8', ALLOC_STACK),
-                                    allocate(intArrayFromString(address), 'i8', ALLOC_STACK)));
+    _pn_messenger_route(this._messenger,
+                        allocate(intArrayFromString(pattern), 'i8', ALLOC_STACK),
+                        allocate(intArrayFromString(address), 'i8', ALLOC_STACK));
     Runtime.stackRestore(sp);
 };
 
@@ -825,9 +802,9 @@ _Messenger_['route'] = function(pattern, address) {
  */
 _Messenger_['rewrite'] = function(pattern, address) {
     var sp = Runtime.stackSave();
-    this._check(_pn_messenger_rewrite(this._messenger,
-                                      allocate(intArrayFromString(pattern), 'i8', ALLOC_STACK),
-                                      allocate(intArrayFromString(address), 'i8', ALLOC_STACK)));
+    _pn_messenger_rewrite(this._messenger,
+                          allocate(intArrayFromString(pattern), 'i8', ALLOC_STACK),
+                          allocate(intArrayFromString(address), 'i8', ALLOC_STACK));
     Runtime.stackRestore(sp);
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8faa57e/proton-c/bindings/javascript/module.js
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/module.js b/proton-c/bindings/javascript/module.js
index aefceca..b755f59 100644
--- a/proton-c/bindings/javascript/module.js
+++ b/proton-c/bindings/javascript/module.js
@@ -68,6 +68,10 @@
  * &lt;script type="text/javascript"&gt;PROTON_TOTAL_MEMORY = 50000000;&lt;/script&gt;
  * &lt;script type="text/javascript" src="proton.js">&lt;/script&gt;
  * </pre>
+ * If the global variable PROTON_TOTAL_MEMORY has been set by the application this
+ * will result in the emscripten heap getting set to the next multiple of
+ * 16777216 above PROTON_TOTAL_MEMORY.
+ * <p>
  * The global variable PROTON_TOTAL_STACK may be used in a similar way to increase
  * the stack size from its default of 5*1024*1024 = 5242880. It is worth noting
  * that Strings are allocated on the stack, so you may need this if you end up
@@ -76,17 +80,14 @@
  */
 var Module = {};
 
-// If the global variable PROTON_TOTAL_MEMORY has been set by the application this
-// will result in the emscripten heap getting set to the next multiple of
-// 16777216 above PROTON_TOTAL_MEMORY.
-if (typeof process === 'object' && typeof require === 'function') {
+if (typeof global === 'object') { // If Node.js
     if (global['PROTON_TOTAL_MEMORY']) {
         Module['TOTAL_MEMORY'] = global['PROTON_TOTAL_MEMORY'];
     }
     if (global['PROTON_TOTAL_STACK']) {
         Module['TOTAL_STACK'] = global['PROTON_TOTAL_STACK'];
     }
-} else if (typeof window === 'object') {
+} else if (typeof window === 'object') { // If browser
     if (window['PROTON_TOTAL_MEMORY']) {
         Module['TOTAL_MEMORY'] = window['PROTON_TOTAL_MEMORY'];
     }
@@ -102,140 +103,323 @@ if (typeof process === 'object' && typeof require === 'function') {
 /*****************************************************************************/
 
 /**
- * EventDispatch is a Singleton class that allows callbacks to be registered which
- * will get triggered by the emscripten WebSocket network callbacks. Clients of
- * Messenger will register callbacks by calling:
+ * EventDispatch is a Singleton class that allows callbacks to be registered,
+ * which will get triggered by the emscripten WebSocket network callbacks.
+ * Clients of Messenger will register callbacks by calling:
  * <pre>
+ * messenger.on('error', &lt;callback function&gt;);
  * messenger.on('work', &lt;callback function&gt;);
+ * messenger.on('subscription', &lt;callback function&gt;);
  * </pre>
  * EventDispatch supports callback registration from multiple Messenger instances.
  * The client callbacks will actually be called when a given messenger has work
- * available or a WebSocket close has been occurred (in which case all registered
- * callbacks will be called).
+ * available or a WebSocket close has been occurred.
  * <p>
  * The approach implemented here allows the registered callbacks to follow a
  * similar pattern to _process_incoming and _process_outgoing in async.py
- * @memberof proton
+ * @constructor proton.EventDispatch
  */
 Module.EventDispatch = new function() { // Note the use of new to create a Singleton.
-    var _firstCall = true; // Flag used to check the first time registerMessenger is called.
+    var POLLIN  = 0x001;
+    var POLLOUT = 0x004;
+    var _error = null;
+    var _messengers = {};  // Keyed by name.
+    var _selectables = {}; // Keyed by file descriptor.
+
+    var _initialise = function() {
+        /**
+         * Initialises the emscripten network callback functions. This needs
+         * to be done the first time we call registerMessenger rather than
+         * when we create the Singleton because emscripten's socket filesystem
+         * has to be mounted before can listen for any of these events.
+         */
+        Module['websocket']['on']('open', _pump);
+        Module['websocket']['on']('connection', _pump);
+        Module['websocket']['on']('message', _pump);
+        Module['websocket']['on']('close', _closeHandler);
+        Module['websocket']['on']('error', _errorHandler);
+
+        /**
+         * For Node.js the network code uses the ws WebSocket library, see
+         * https://github.com/einaros/ws. The following is a "Monkey Patch"
+         * that fixes a problem with Receiver.js where it wasn't checking if
+         * an Object was null before accessing its properties, so it was
+         * possible to see errors like:
+         * TypeError: Cannot read property 'fragmentedOperation' of null
+         * at Receiver.endPacket (.....node_modules/ws/lib/Receiver.js:224:18)
+         * This problem is generally seen in Server code after messenger.stop()
+         * I *think* that the underlying issue is actually because ws calls
+         * cleanup directly rather than pushing it onto the event loop so the
+         * this.state stuff gets cleared before the endPacket method is called.
+         * This fix simply interposes a check to avoid calling endPacket if
+         * the state has been cleared (i.e. the WebSocket has been closed).
+         */
+        if (ENVIRONMENT_IS_NODE) {
+            try {
+                var ws = require('ws');
+                // Array notation to stop Closure compiler minifying properties we need.
+                ws['Receiver'].prototype['originalEndPacket'] = ws['Receiver'].prototype['endPacket'];
+                ws['Receiver'].prototype['endPacket'] = function() {
+                    if (this['state']) {
+                        this['originalEndPacket']();
+                    }
+                };
+            } catch (e) {
+                console.error("Failed to apply Monkey Patch to ws WebSocket library");
+            }
+        }
+
+        _initialise = function() {}; // After first call replace with null function.
+    };
+
     /**
-     * We employ a cheat/hack to map file descriptors to the Messenger instance
-     * that owns them. In put/subscribe we set the current Messenger and then we
-     * intercept the library socket call with our own, which makes a call to
-     * the real library socket but also maps the file descriptor to _currentMessenger.
+     * Messenger error handling can be a bit inconsistent and in several places
+     * rather than returning an error code or setting an error it simply writes
+     * to fprintf. This is something of a Monkey Patch that replaces the emscripten
+     * library fprintf call with one that checks the message and sets a variable
+     * if the message is an ERROR. TODO At some point hopefully Dominic Evans'
+     * patch on Jira PROTON-571 will render this code redundant.
      */
-    var _currentMessenger = null;
-    var _messengers = {};
-
-    var _fd2Messenger = {};
+    _fprintf = function(stream, format, varargs) {
+        var array = __formatString(format, varargs);
+        array.pop(); // Remove the trailing \n
+        var string = intArrayToString(array); // Convert to native JavaScript string.
+        if (string.indexOf('ERROR') === -1) { // If not an ERROR just log the message.
+            console.log(string);
+        } else {
+            _error = string;
+        }
+    };
 
     /**
-     * Provides functionality roughly equivalent to the following C code:
-     * while (1) {
-     *     pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
-     *     process();
-     * }
-     * The blocking call isn't viable in JavaScript as it is entirely asynchronous
-     * and we wouldn't want to replace the while(1) with a timed loop either!!
-     * This method gets triggered asynchronously by the emscripten socket events and
-     * we then perform an equivalent loop for each messenger, triggering every
-     * registered callback whilst there is work remaining. If triggered by close
-     * we bypass the _pn_messenger_work test as it will never succeed after closing.
+     * This method iterates through all registered Messengers and retrieves any
+     * pending selectables, which are stored in a _selectables map keyed by fd.
      */
-    var _pump = function(fd, closing) {
-//console.log("\t_pump entry " + fd + ", " + closing);
-        for (var i in _messengers) {
-            if (_messengers.hasOwnProperty(i)) {
-                var messenger = _messengers[i];
-                //var messenger = _fd2Messenger[fd];
-
-                if (closing) {
-//console.log("_pump closing");
-                    messenger._emit('work');
+    var _updateSelectables = function() {
+        var sel = 0;
+        var fd = -1;
+        for (var name in _messengers) {
+            var messenger = _messengers[name];
+            while ((sel = _pn_messenger_selectable(messenger._messenger))) {
+                fd = _pn_selectable_fd(sel);
+                // Only register valid selectables, otherwise free them.
+                if (fd === -1) {
+                    _pn_selectable_free(sel);
                 } else {
-//console.log("_pump while start");
-                    while (_pn_messenger_work(messenger._messenger, 0) > 0) {
-                    //while (messenger['work']()) {
-                    //while (messenger._check(_pn_messenger_work(messenger._messenger, 0)) > 0) {
-//console.log("A");
-                        messenger._checkSubscriptions();
-                        messenger._emit('work');
-//console.log("B");
+                    _selectables[fd] = {messenger: messenger, selectable: sel};
+                }
+            }
+        }
+        return fd; // Return the most recently added selector's file descriptor.
+    };
+
+    /**
+     * Continually pump data while there's still work to do.
+     */
+    var _pump = function() {
+        while (_pumpOnce());
+    };
+
+    /**
+     * This method more or less follows the pattern of the pump_once method from
+     * class Pump in tests/python/proton_tests/messenger.py. It looks a little
+     * different because the select/poll implemented here uses some low-level
+     * emscripten internals (stream = FS.getStream(fd), sock = stream.node.sock,
+     * mask = sock.sock_ops.poll(sock)). We use the internals so we don't have
+     * to massage from file descriptors into the C style poll interface.
+     */
+    var _pumpOnce = function() {
+        _updateSelectables();
+
+        var count = 0;
+        for (var fd in _selectables) {
+            var selectable = _selectables[fd];
+            var sel = selectable.selectable;
+            var terminal = _pn_selectable_is_terminal(sel);
+            if (terminal) {
+                _pn_selectable_free(sel);
+                delete _selectables[fd];
+            } else {
+                var stream = FS.getStream(fd);
+                if (stream) {
+                    var sock = stream.node.sock;
+                    if (sock.sock_ops.poll) {
+                        var mask = sock.sock_ops.poll(sock); // Low-level poll call.
+                        if (mask) {
+                            var messenger = selectable.messenger;
+                            var capacity = _pn_selectable_capacity(sel) > 0;
+                            var pending = _pn_selectable_pending(sel) > 0;
+
+                            if ((mask & POLLIN) && capacity) {
+//console.log("- readable fd = " + fd + ", capacity = " + _pn_selectable_capacity(sel));
+                                _error = null; // May get set by _pn_selectable_readable.
+                                _pn_selectable_readable(sel);
+                                count++; // Should this be inside the test for _error? Don't know.
+                                var errno = messenger['getErrno']();
+                                _error = errno ? messenger['getError']() : _error;
+                                if (_error) {
+                                    _errorHandler([fd, 0, _error]);
+                                } else {
+                                    // Don't send work event if it's a listen socket.
+                                    if (!sock.server) {
+                                        messenger._checkSubscriptions();
+                                        messenger._emit('work');
+                                    }
+                                }
+                            }
+                            if ((mask & POLLOUT) && pending) {
+//console.log("- writeable fd = " + fd + ", pending = " + _pn_selectable_pending(sel));
+                                _pn_selectable_writable(sel);
+                                //TODO looks like this block isn't needed. Need to
+                                //check with a test-case that writes data as fast as
+                                //it can. If not needed then delete.
+                                /*
+                                count++;
+                                // Check _selectables again in case the call to
+                                // _pn_selectable_writable caused a socket close.
+                                if (_selectables[fd]) {
+                                    messenger._checkSubscriptions();
+                                    messenger._emit('work');
+                                }
+                                */
+                            }
+                        }
                     }
-//console.log("_pump while finish");
                 }
             }
         }
-//console.log("\t_pump exit");
+
+        return count;
     };
 
     /**
-     * Listener for the emscripten socket close event. Delegates to _pump()
-     * passing a flag to indicate that the socket is closing.
+     * Handler for the emscripten socket close event.
      */
-    var _close = function(fd) {
-//console.log("calling close fd = " + fd);
-        _pump(fd, true);
-        delete _fd2Messenger[fd];
+    var _closeHandler = function(fd) {
+        var selectable = _selectables[fd];
+        if (selectable) {
+            // Close and remove the selectable.
+            var sel = selectable.selectable;
+            _pn_selectable_free(sel); // This closes the underlying socket too.
+            delete _selectables[fd];
+
+            var messenger = selectable.messenger;
+            messenger._emit('work');
+        }
     };
 
-    var _error = function(error) {
+    /**
+     * Handler for the emscripten socket error event.
+     */
+    var _errorHandler = function(error) {
         var fd = error[0];
-        var messenger = _fd2Messenger[fd];
-        messenger._emit('error', new Module['MessengerError'](error[2]));
-        delete _fd2Messenger[fd];
+        var message = error[2];
+
+        _updateSelectables();
+
+        var selectable = _selectables[fd];
+        if (selectable) {
+            // Close and remove the selectable.
+            var sel = selectable.selectable;
+            _pn_selectable_free(sel); // This closes the underlying socket too.
+            delete _selectables[fd];
+
+            var messenger = selectable.messenger;
+
+            // Remove any pending Subscriptions whose fd matches the error fd.
+            var subscriptions = messenger._pendingSubscriptions;
+            for (var i = 0; i < subscriptions.length; i++) {
+                subscription = subscriptions[i];
+                // Use == not === as we don't care if fd is a number or a string.
+                if (subscription.fd == fd) {
+                    messenger._pendingSubscriptions.splice(i, 1);
+                    if (message.indexOf('EHOSTUNREACH:') === 0) {
+                        message = 'CONNECTION ERROR (' + subscription.source + '): bind: Address already in use';
+                    }
+                    messenger._emit('error', new Module['SubscriptionError'](subscription.source, message));
+                    return;
+                }
+            }
+
+            messenger._emit('error', new Module['MessengerError'](message));
+        }
     };
 
     /**
-     * This code cheekily replaces the library socket call with our own one.
-     * The real socket call returns a file descriptor so we harvest that and use
-     * that as a key to map file descriptors to their owning Messenger.
+     * Flush any data that has been written by the Messenger put() method.
+     * @method flush
      */
-    var realsocket = _socket;
-    _socket = function(domain, type, protocol) {
-        var fd = realsocket(domain, type, protocol);
-//console.log("calling socket fd = " + fd);
-        if (_currentMessenger) {
-            _fd2Messenger[fd] = _currentMessenger;
-        } else {
-            console.error("Error: file descriptor " + fd + " cannot be mapped to a Messenger.");
+    this.flush = function() {
+        _pump();
+    };
+
+    /**
+     * Subscribe to a specified source address.
+     * <p>
+     * This method is delegated to by the subscribe method of {@link proton.Messenger}.
+     * We delegate to EventDispatch because we create Subscription objects that
+     * contain some additional information (such as file descriptors) which are
+     * only available to EventDispatch and we don't really want to expose to the
+     * wider API. This low-level information is mainly used for error handling
+     * which is itself encapsulated in EventDispatch.
+     * @method subscribe
+     * @memberof! proton.EventDispatch#
+     * @param {proton.Messenger} messenger the Messenger instance that this
+     *        subscription relates to.
+     * @param {string} source the address that we'd like to subscribe to.
+     */
+    this.subscribe = function(messenger, source) {
+        // First update selectables before subscribing so we can work out the
+        // Subscription fd (which will be the listen file descriptor).
+        _updateSelectables();
+        var sp = Runtime.stackSave();
+        var subscription = _pn_messenger_subscribe(messenger._messenger,
+                                                   allocate(intArrayFromString(source), 'i8', ALLOC_STACK));
+        Runtime.stackRestore(sp);
+        var fd = _updateSelectables();
+
+        subscription = new Subscription(subscription, source, fd);
+        messenger._pendingSubscriptions.push(subscription);
+
+        // For passive subscriptions emit a subscription event (almost) immediately,
+        // otherwise defer until the address has been resolved remotely.
+        if (subscription.passive) {
+            // We briefly delay the call to checkSubscriptions because it is possible
+            // for passive subscriptions to fail if another process is bound to the
+            // port specified in the subscription.
+            var check = function() {messenger._checkSubscriptions();};
+            setTimeout(check, 10);
         }
-        return fd;
-    }
 
-    this.setCurrentMessenger = function(messenger) {
-        _currentMessenger = messenger;
-    }
+        return subscription;
+    };
 
     /**
      * Register the specified Messenger as being interested in network events.
+     * @method registerMessenger
+     * @memberof! proton.EventDispatch#
+     * @param {proton.Messenger} messenger the Messenger instance we want to
+     *        register to receive network events.
      */
     this.registerMessenger = function(messenger) {
-        if (_firstCall) {
-            /**
-             * Initialises the emscripten network callback functions. This needs
-             * to be done the first time we call registerMessenger rather than
-             * when we create the Singleton because emscripten's socket filesystem
-             * has to be mounted before can listen for any of these events.
-             */
-            Module['websocket']['on']('open', _pump);
-            Module['websocket']['on']('connection', _pump);
-            Module['websocket']['on']('message', _pump);
-            Module['websocket']['on']('close', _close);
-            Module['websocket']['on']('error', _error);
-            _firstCall = false;
-        }
+        _initialise();
 
-        var name = messenger.getName();
+        var name = messenger['getName']();
         _messengers[name] = messenger;
+
+        // Set the Messenger "passive" as we are supplying our own event loop here.
+        _pn_messenger_set_passive(messenger._messenger, true);
     };
 
     /**
      * Unregister the specified Messenger from interest in network events.
+     * @method unregisterMessenger
+     * @memberof! proton.EventDispatch#
+     * @param {proton.Messenger} messenger the Messenger instance we want to
+     *        unregister from receiving network events.
      */
     this.unregisterMessenger = function(messenger) {
-        var name = messenger.getName();
+        var name = messenger['getName']();
         delete _messengers[name];
     };
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8faa57e/proton-c/bindings/javascript/subscription.js
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/subscription.js b/proton-c/bindings/javascript/subscription.js
index 602a741..89fd1a3 100644
--- a/proton-c/bindings/javascript/subscription.js
+++ b/proton-c/bindings/javascript/subscription.js
@@ -32,11 +32,19 @@
  * constructor in the scope of the package and don't export it via Module.
  * @constructor Subscription
  * @param {number} subscription a pointer to the underlying subscription object.
- * @param {string} address if the address is already known it can be (optionally) specified.
+ * @param {string} source the address that we want to subscribe to.
+ * @param {number} fd the file descriptor associated with the subscription. This
+ *                 is used internally to tidy up during error handling.
  */
-var Subscription = function(subscription, address) { // Subscription Constructor.
+var Subscription = function(subscription, source, fd) { // Subscription Constructor.
     this._subscription = subscription;
-    this._address = address;
+    this.source = source;
+    this.fd = fd;
+    if (source.indexOf('~') !== -1) {
+        this.passive = true;
+    } else {
+        this.passive = false;
+    }
 };
 
 /**
@@ -65,8 +73,8 @@ Subscription.prototype['setContext'] = function(context) {
  * @returns the Subscription's Address.
  */
 Subscription.prototype['getAddress'] = function() {
-    if (this._address) {
-        return this._address;
+    if (this.passive) {
+        return this.source;
     }
     return Pointer_stringify(_pn_subscription_address(this._subscription));
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8faa57e/proton-c/src/codec/data.h
----------------------------------------------------------------------
diff --git a/proton-c/src/codec/data.h b/proton-c/src/codec/data.h
index a528d26..99e8228 100644
--- a/proton-c/src/codec/data.h
+++ b/proton-c/src/codec/data.h
@@ -61,7 +61,7 @@ struct pn_data_t {
   pni_nid_t base_current;
 };
 
-inline pni_node_t * pn_data_node(pn_data_t *data, pni_nid_t nd) 
+static inline pni_node_t * pn_data_node(pn_data_t *data, pni_nid_t nd) 
 {
   return nd ? (data->nodes + nd - 1) : NULL;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org