You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/11/28 14:49:54 UTC
[16/51] [abbrv] qpid-proton git commit: The qpid-config port is
largely complete except for xml binding support,
it also needs a bit of tidying up. Added a simple soak test to send and
receive messages from a broker ad-infinitum and a simple html send me
The qpid-config port is largely complete except for xml binding support, it also needs a bit of tidying up. Added a simple soak test to send and receive messages from a broker ad-infinitum and a simple html send message example
git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/fadams-javascript-binding@1621865 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4a78327f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4a78327f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4a78327f
Branch: refs/heads/master
Commit: 4a78327f5a6f2369ef15ede360f4bc86d288f7c4
Parents: c3efc08
Author: fadams <fa...@unknown>
Authored: Mon Sep 1 18:32:04 2014 +0000
Committer: fadams <fa...@unknown>
Committed: Mon Sep 1 18:32:04 2014 +0000
----------------------------------------------------------------------
examples/messenger/javascript/qpid-config.js | 227 +++++--
examples/messenger/javascript/send.html | 110 ++++
examples/messenger/javascript/send.js | 6 +-
proton-c/bindings/javascript/CMakeLists.txt | 43 +-
proton-c/bindings/javascript/binding.js | 39 +-
proton-c/bindings/javascript/my-library.js | 755 ----------------------
tests/javascript/soak.js | 99 +++
7 files changed, 448 insertions(+), 831 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/examples/messenger/javascript/qpid-config.js
----------------------------------------------------------------------
diff --git a/examples/messenger/javascript/qpid-config.js b/examples/messenger/javascript/qpid-config.js
index 466f8b6..eb0951f 100755
--- a/examples/messenger/javascript/qpid-config.js
+++ b/examples/messenger/javascript/qpid-config.js
@@ -41,7 +41,11 @@ if (typeof exports !== "undefined" && exports !== null) {
proton = require("qpid-proton");
}
-var address = 'amqp://0.0.0.0:5673/qmf.default.direct';
+var addr = 'guest:guest@localhost:5673';
+//var addr = 'localhost:5673';
+var address = 'amqp://' + addr + '/qmf.default.direct';
+console.log(address);
+
var replyTo = '';
var subscription;
var subscribed = false;
@@ -52,13 +56,14 @@ var messenger = new proton.Messenger();
/**
* The correlator object is a mechanism used to correlate requests with their
* aynchronous responses. It might possible be better to make use of Promises
- * to implement part of this behaviour but a mechanism would still be meeded to
+ * to implement part of this behaviour but a mechanism would still be needed to
* correlate a request with its response callback in order to wrap things up in
* a Promise, so much of the behaviour of this object would still be required.
* In addition it seemed to make sense to make this QMF2 implementation fairly
* free of dependencies and using Promises would require external libraries.
- * Instead the correlator implements Promise-like semantics, you might call it
+ * Instead the correlator implements "Promise-like" semantics, you might call it
* a broken Promise :-)
+ * <p>
* in particular the request method behaves a *bit* like Promise.all() though it
* is mostly fake and takes an array of functions that call the add() method
* which is really the method used to associate response objects by correlationID.
@@ -155,6 +160,7 @@ var getObjects = function(packageName, className) {
message.setReplyTo(replyTo);
message.setCorrelationID(className);
message.properties = {
+ "routing-key": "broker", // Added for Java Broker
"x-amqp-0-10.app-id": "qmf2",
"method": "request",
"qmf.opcode": "_query_request",
@@ -178,6 +184,7 @@ var invokeMethod = function(object, method, arguments) {
message.setReplyTo(replyTo);
message.setCorrelationID(correlationID);
message.properties = {
+ "routing-key": "broker", // Added for Java Broker
"x-amqp-0-10.app-id": "qmf2",
"method": "request",
"qmf.opcode": "_method_request",
@@ -197,7 +204,7 @@ messenger.on('work', pumpData);
messenger.setOutgoingWindow(1024);
messenger.start();
-subscription = messenger.subscribe('amqp://0.0.0.0:5673/#');
+subscription = messenger.subscribe('amqp://' + addr + '/#');
messenger.recv(); // Receive as many messages as messenger can buffer.
@@ -358,11 +365,24 @@ var _options =
' output\n';
var REPLICATE_LEVELS = {"none" : true, "configuration": true, "all": true};
-var DEFAULT_PROPERTIES = {"exchange":["name", "type", "durable"], "queue":["name", "durable", "autoDelete"]};
+var DEFAULT_PROPERTIES = {"exchange": {"name": true, "type": true, "durable": true},
+ "queue": {"name": true, "durable": true, "autoDelete": true}};
+
+var getValue = function(r) {
+ var value = null;
+ if (r.length === 2) {
+ value = r[1];
+ if (!isNaN(value)) {
+ value = parseInt(value);
+ }
+ }
+
+ return value;
+};
var config = {
_recursive : false,
- _host : 'localhost',
+ _host : 'localhost:5673', // Note 5673 not 5672 as we use WebSocket transport.
_connTimeout : 10,
_ignoreDefault : false,
_altern_ex : null,
@@ -389,17 +409,13 @@ var config = {
_extra_arguments: [],
_start_replica : null,
_returnCode : 0,
- _list_properties: [],
+ _list_properties: null,
getOptions: function() {
var options = {};
for (var a = 0; a < this._extra_arguments.length; a++) {
var r = this._extra_arguments[a].split('=');
- var value = null;
- if (r.length === 2) {
- value = r[1];
- }
- options[r[0]] = value;
+ options[r[0]] = getValue(r);
}
return options;
}
@@ -471,7 +487,7 @@ var idMap = function(list) {
return map;
};
-var renderArguments = function(obj, list) {
+var renderObject = function(obj, list) {
if (!obj) {
return '';
}
@@ -493,7 +509,7 @@ var renderArguments = function(obj, list) {
}
if (addComma) {
- return ' {' + string + '}';
+ return '{' + string + '}';
} else {
if (list) {
return string;
@@ -645,7 +661,7 @@ var exchangeListRecurse = function(filter) {
var queue = queues[oid(bind.queueRef)];
var queueName = queue ? queue._values.name : "<unknown>";
console.log(" bind [" + bind.bindingKey + "] => " + queueName +
- renderArguments(bind.arguments));
+ " " + renderObject(bind.arguments));
}
}
}
@@ -743,7 +759,7 @@ var queueList = function(filter) {
if (args[SHARED_MSG_GROUP] === 1) {
string += ' --shared-groups';
}
- string += renderArguments(args, true);
+ string += ' ' + renderObject(args, true);
console.log(string);
}
}
@@ -785,7 +801,7 @@ var queueListRecurse = function(filter) {
}
console.log(" bind [" + bind.bindingKey + "] => " + exchangeName +
- renderArguments(bind.arguments));
+ " " + renderObject(bind.arguments));
}
}
}
@@ -824,10 +840,10 @@ console.log("Method result");
if (response._arguments) {
//console.log(response._arguments);
} if (response._values) {
- console.error("Exception from Agent: " + renderArguments(response._values));
+ console.error("Exception from Agent: " + renderObject(response._values));
}
// Mostly we want to stop the Messenger Event loop and exit when a QMF method
- // call returns, but sometimes we don't.
+ // returns, but sometimes we don't, the dontStop flag prevents this behaviour.
if (!dontStop) {
messenger.stop();
}
@@ -846,11 +862,7 @@ var addExchange = function(args) {
for (var a = 0; a < config._extra_arguments.length; a++) {
var r = config._extra_arguments[a].split('=');
- var value = null;
- if (r.length === 2) {
- value = r[1];
- }
- declArgs[r[0]] = value;
+ declArgs[r[0]] = getValue(r);
}
if (config._msgSequence) {
@@ -918,11 +930,7 @@ var addQueue = function(args) {
for (var a = 0; a < config._extra_arguments.length; a++) {
var r = config._extra_arguments[a].split('=');
- var value = null;
- if (r.length === 2) {
- value = r[1];
- }
- declArgs[r[0]] = value;
+ declArgs[r[0]] = getValue(r);
}
if (config._durable) {
@@ -1201,21 +1209,141 @@ console.log(args);
* The following methods are "generic" create and delete methods to for arbitrary
* Management Objects e.g. Incoming, Outgoing, Domain, Topic, QueuePolicy,
* TopicPolicy etc. use --argument k1=v1 --argument k2=v2 --argument k3=v3 to
- * pass arbitrary arguments as key/value pairs to the Object being created/deleted.
+ * pass arbitrary arguments as key/value pairs to the Object being created/deleted,
+ * for example to add a topic object that uses the fanout exchange:
+ * ./qpid-config.js add topic fanout --argument exchange=amq.fanout \
+ * --argument qpid.max_size=1000000 --argument qpid.policy_type=ring
*/
var createObject = function(type, name, args) {
-console.log("createObject");
-console.log(type);
-console.log(name);
-console.log(args);
+ correlator.request(
+ // We invoke the CRUD methods on the broker object.
+ getObjects('org.apache.qpid.broker', 'broker')
+ ).then(function(objects) {
+ var broker = objects.broker[0];
+ correlator.request(
+ // Create an object of the specified type.
+ invokeMethod(broker, 'create', {
+ "type": type,
+ "name": name,
+ "properties": args,
+ "strict": true})
+ ).then(handleMethodResponse);
+ });
+};
+var deleteObject = function(type, name, args) {
+ correlator.request(
+ // We invoke the CRUD methods on the broker object.
+ getObjects('org.apache.qpid.broker', 'broker')
+ ).then(function(objects) {
+ var broker = objects.broker[0];
+ correlator.request(
+ // Create an object of the specified type and name.
+ invokeMethod(broker, 'delete', {
+ "type": type,
+ "name": name,
+ "options": args})
+ ).then(handleMethodResponse);
+ });
};
-var deleteObject = function(args) {
-console.log("deleteObject");
-console.log(args);
+/**
+ * This is a "generic" mechanism for listing arbitrary Management Objects.
+ */
+var listObjects = function(type) {
+ correlator.request(
+ getObjects('org.apache.qpid.broker', type)
+ ).then(function(objects) {
+ // The correlator passes an object containing responses for all of the
+ // supplied requests so we index it by the supplied type to get our response.
+ objects = objects[type];
+
+ // Collect available attributes, stringify the values and compute the max
+ // length of the value of each attribute so that we can later create a table.
+ var attributes = {};
+ var lengths = {};
+ for (var i = 0; i < objects.length; i++) {
+ var object = objects[i];
+ object = object._values;
+ for (var prop in object) {
+ if (typeof object[prop] === 'object') { // Stringify Object properties.
+ // Check if property is an ObjectID (reference property),
+ // if so replace with the "name" part of the OID.
+ if (object[prop]['_object_name']) {
+ var parts = object[prop]['_object_name'].split(':');
+ object[prop] = parts[parts.length - 1];
+ } else {
+ // Stringify general Object properties.
+ object[prop] = renderObject(object[prop]);
+ }
+ } else {
+ object[prop] = object[prop].toString(); // Stringify other property types.
+ }
+
+ if (!lengths[prop] || object[prop].length > lengths[prop]) { // Compute lengths.
+ lengths[prop] = object[prop].length > prop.length ? object[prop].length : prop.length;
+ }
+
+ if (!config._list_properties || config._list_properties[prop]) { // Do we want this property?
+ attributes[prop] = true;
+ }
+ }
+ }
+ if (!config._list_properties && DEFAULT_PROPERTIES[type]) {
+ attributes = DEFAULT_PROPERTIES[type];
+ }
+
+ // Using the information we've previously prepared now render a table
+ // showing the required property values.
+ var desired = [];
+ var header = ''; // Table header showing the property names.
+ if (attributes['name']) {
+ desired.push('name');
+ delete attributes['name'];
+ header += 'name' + Array(lengths['name'] + 2 - 4).join(' ');
+ }
+
+ for (var prop in attributes) {
+ desired.push(prop);
+ header += prop + Array(lengths[prop] + 2 - prop.length).join(' ');
+ }
+
+ console.log("Objects of type '" + type + "'");
+ console.log(header);
+ console.log(Array(header.length).join('='));
+ for (var i = 0; i < objects.length; i++) {
+ var object = objects[i];
+ object = object._values;
+ var string = '';
+ for (var j = 0; j < desired.length; j++) {
+ var key = desired[j];
+ string += object[key] + Array(lengths[key] + 2 - object[key].length).join(' ');
+ }
+
+ console.log(string);
+ }
+
+ messenger.stop();
+ });
+};
+
+var reloadAcl = function() {
+ correlator.request(
+ getObjects('org.apache.qpid.acl', 'acl')
+ ).then(function(objects) {
+ if (objects.acl.length > 0) {
+ var acl = objects.acl[0];
+ correlator.request(
+ // Create an object of the specified type.
+ invokeMethod(acl, 'reloadACLFile', {})
+ ).then(handleMethodResponse);
+ } else {
+ console.log("Failed: No ACL Loaded in Broker");
+ messenger.stop();
+ }
+ });
};
@@ -1263,10 +1391,9 @@ if (args.length > 0) {
if (config._connTimeout === 0) {
config._connTimeout = null;
}
- } else if (arg === '-b' || arg === '--broker' || arg === '-b' || arg === '--broker-addr') {
- config._host = val;
- if (config._host == null) {
- config._host = 'localhost:5672';
+ } else if (arg === '-b' || arg === '--broker' || arg === '-a' || arg === '--broker-addr') {
+ if (val != null) {
+ config._host = val;
}
} else if (arg === '--alternate-exchange') {
config._altern_ex = val;
@@ -1306,7 +1433,10 @@ if (args.length > 0) {
} else if (arg === '--f' || arg === '--file') { // TODO Won't work in node.js
config._file = val;
} else if (arg === '--show-property') {
- config._list_properties = val;
+ if (config._list_properties === null) {
+ config._list_properties = {};
+ }
+ config._list_properties[val] = true;
}
} else {
params.push(arg);
@@ -1316,9 +1446,6 @@ if (args.length > 0) {
config._extra_arguments = extra_arguments;
-console.log("params");
-console.log(params);
-
// The command only *actually* gets called when the QMF connection has actually
// been established so we wrap up the function we want to get called in a lambda.
var command = function() {overview();};
@@ -1365,11 +1492,19 @@ if (params.length > 0) {
command = function() {bind(Array.prototype.slice.apply(params, [1]));};
} else if (cmd === 'unbind') {
command = function() {unbind(Array.prototype.slice.apply(params, [1]));};
+ } else if (cmd === 'reload-acl') {
+ command = function() {reloadAcl();};
+ } else if (cmd === 'list' && params.length > 1) {
+ command = function() {listObjects(modifier);};
+ } else {
+ usage();
}
}
+//console.log(config._host);
+
+
var onSubscription = function() {
command();
};
-
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/examples/messenger/javascript/send.html
----------------------------------------------------------------------
diff --git a/examples/messenger/javascript/send.html b/examples/messenger/javascript/send.html
new file mode 100644
index 0000000..b6aaef2
--- /dev/null
+++ b/examples/messenger/javascript/send.html
@@ -0,0 +1,110 @@
+<!DOCTYPE html> <!-- HTML5 doctype -->
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<html>
+
+<head>
+ <title>Simple Proton Messenger Send Example</title>
+ <meta http-equiv="content-type" content="text/html;charset=utf-8" />
+
+<!--
+ Import JavaScript Messenger Binding proton.js. Note that this simple example pulls
+ it from the node_modules/qpid-proton/lib which is created by the build process
+ so that the node.js based examples "just work", in a real Web App you would need
+ to copy the proton.js to your own server.
+ In actual fact the CMake build actually builds proton.js into the directory:
+ <build>/proton-c/bindings/javascript
+ where <build> is the build directory created to run cmake from.
+-->
+<script type="text/javascript" src="../../../node_modules/qpid-proton/lib/proton.js"></script>
+
+<script type="text/javascript">
+
+var message = new proton.Message();
+var messenger = new proton.Messenger();
+
+var sendMessage = function() {
+ var address = document.getElementById("address").value;
+ var subject = document.getElementById("subject").value;
+ var body = document.getElementById("body").value;
+
+console.log("sendMessage");
+console.log("address = " + address);
+console.log("subject = " + subject);
+console.log("body = " + body);
+
+ message.setAddress(address);
+ message.setSubject(subject);
+ message.body = body;
+
+ messenger.put(message);
+};
+
+messenger.on('error', function(error) {console.log("Received error " + error);});
+messenger.start();
+
+</script>
+
+<style>
+body
+{
+ font: 13px/1.5 Helvetica, Arial, 'Liberation Sans', FreeSans, sans-serif;
+ overflow-x: hidden; /* Hide horizontal scrollbar */
+ background: #dddddd;
+}
+
+label
+{
+ display: block;
+ font-size: 17px;
+}
+
+input, textarea
+{
+ font-size: 13px;
+ margin-bottom: 10px;
+}
+</style>
+
+</head>
+
+<body>
+<div>
+ <label for="address">Address:</label>
+ <input type="text" id="address" size="40"
+ placeholder="amqp://user:password@host:port"
+ name="address" value="amqp://guest:guest@0.0.0.0" />
+</div>
+<div>
+ <label for="subject">Subject:</label>
+ <input type="text" id="subject" size="40"
+ name="subject" value="Browser Message" />
+</div>
+<div>
+ <label for="body">Message:</label>
+ <textarea id="body" name="body" rows="4" cols="40">Hello From Browser!</textarea>
+</div>
+<div>
+ <input type="button" value="send" onclick="sendMessage()"/>
+</div>
+</body>
+
+</html>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/examples/messenger/javascript/send.js
----------------------------------------------------------------------
diff --git a/examples/messenger/javascript/send.js b/examples/messenger/javascript/send.js
index 77a605c..5a93333 100644
--- a/examples/messenger/javascript/send.js
+++ b/examples/messenger/javascript/send.js
@@ -33,6 +33,10 @@ var running = true;
var message = new proton.Message();
var messenger = new proton.Messenger();
+// Because this is an asynchronous send we can't simply call messenger.put(message)
+// then exit. The following callback function (and messenger.setOutgoingWindow())
+// gives us a means to wait until the consumer has received the message before
+// exiting. The recv.js example explicitly accepts messages it receives.
var pumpData = function() {
var status = messenger.status(tracker);
if (status != proton.Status.PENDING) {
@@ -81,7 +85,7 @@ console.log("Content: " + msgtext);
messenger.on('error', function(error) {console.log(error);});
messenger.on('work', pumpData);
-messenger.setOutgoingWindow(1024);
+messenger.setOutgoingWindow(1024); // So we can track status of send message.
messenger.start();
message.setAddress(address);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/proton-c/bindings/javascript/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/CMakeLists.txt b/proton-c/bindings/javascript/CMakeLists.txt
index 4e4dc0f..ae24407 100644
--- a/proton-c/bindings/javascript/CMakeLists.txt
+++ b/proton-c/bindings/javascript/CMakeLists.txt
@@ -19,7 +19,7 @@
# This file allows cross-compiling of proton to JavaScript using emscripten
# (https://github.com/kripken/emscripten). As it is really a cross-compilation
-# (as opposed to a binding a la swig) the approach is rather different and
+# (as opposed to a binding like with swig) the approach is rather different and
# somewhat replicates the main build in the proton-c/CMakeLists.txt using quite
# a bit of "copy and paste reuse".
# TODO refactor this file (and proton-c/CMakeLists.txt) to keep the main
@@ -27,8 +27,8 @@
message(STATUS "Found emscripten, using that to build JavaScript binding")
-# Find and install node.js packages that we might need. We can assume that
-# node.js is installed because Emscripten has a dependency on it.
+# Find and install the node.js packages that we might need. We can assume that
+# node.js itself is installed because Emscripten has a dependency on it.
find_package(NodePackages)
# Describe the target OS we are building to - Emscripten mimics the Linux platform.
@@ -43,6 +43,14 @@ set(CMAKE_C_COMPILER "${EMCC}")
include(CMakeForceCompiler)
CMAKE_FORCE_C_COMPILER("${CMAKE_C_COMPILER}" Clang)
+if (CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo")
+ message(STATUS "DEBUG JavaScript build")
+else()
+ message(STATUS "RELEASE JavaScript build")
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3")
+ set(EMSCRIPTEN_LINK_OPTIMISATIONS "-O2 --closure 1")
+endif()
+
# From this point we should be using emscripten compilation tools.
message(STATUS "emscripten compilation environment:")
message(STATUS "CMAKE_C_COMPILER: ${CMAKE_C_COMPILER}")
@@ -193,21 +201,9 @@ set_target_properties(
RUNTIME_OUTPUT_DIRECTORY examples
DEPENDS ws
- # This build shows socket messages - useful for debugging.
- #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -s SOCKET_DEBUG=1"
-
- # Optimised build - takes somewhat longer to build.
- #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1"
-
- # This build shows up emscripten warnings when building - should be able to remove it.
- #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -s VERBOSE=1 -O2"
-
- # This build is optimised but not minified
- LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2"
+ LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -${EMSCRIPTEN_LINK_OPTIMISATIONS}"
)
-
-
# Build the main JavaScript library called proton.js
add_executable(proton.js binding.c)
target_link_libraries(proton.js qpid-proton-bitcode)
@@ -217,16 +213,13 @@ set_target_properties(
PROPERTIES
COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_PLATFORM_FLAGS}"
- # This build is optimised and minified. The --memory-init-file 0 stops emscripten
- # emitting a separate memory initialization file, if this was enabled it makes
- # packaging harder as applications would expect proton.js.mem to be served too.
- # It's even more fiddly with node.js packages. This behaviour might be reinstated
- # if the packaging mechanism improves.
-
- # --js-library ${CMAKE_CURRENT_SOURCE_DIR}/my-library.js
- #LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1 --memory-init-file 0 --pre-js
+ # The --memory-init-file 0 stops emscripten emitting a separate memory
+ # initialization file, if this was enabled it makes packaging harder as
+ # applications would expect proton.js.mem to be served too. It's even more
+ # fiddly with node.js packages. This behaviour might be reinstated if the
+ # packaging mechanism improves.
- LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1 --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_test', '_uuid_generate', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_work', '_pn_messenger_recv', '_pn_messenger_receiving', '
_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_subscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_messa
ge_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_pn_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array',
'_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_pn_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_deci
mal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump']\""
+ LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_work', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messen
ger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_subscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_rep
ly_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_pn_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data
_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_pn_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_
pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump']\""
)
# This command packages up the compiled proton.js into a node.js package called
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/proton-c/bindings/javascript/binding.js
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/binding.js b/proton-c/bindings/javascript/binding.js
index eceda54..4caafa6 100644
--- a/proton-c/bindings/javascript/binding.js
+++ b/proton-c/bindings/javascript/binding.js
@@ -477,6 +477,14 @@ _Messenger_['setIncomingWindow'] = function(window) {
*/
_Messenger_['start'] = function() {
this._check(_pn_messenger_start(this._messenger));
+
+ // This call ensures that the emscripten network callback functions are set
+ // up even if a client hasn't explicity added a work function via a call to
+ // messenger.on('work', <work function>);
+ // Doing this means that pn_messenger_work() will still get called when any
+ // WebSocket events occur, which keeps things more reliable when things like
+ // reconnections occur.
+ Module.EventDispatch.addListener(this);
};
/**
@@ -541,8 +549,7 @@ _Messenger_['subscribe'] = function(source) {
* Places the content contained in the message onto the outgoing queue
* of the Messenger. This method will never block, however it will send any
* unblocked Messages in the outgoing queue immediately and leave any blocked
- * Messages remaining in the outgoing queue. The send call may be used to
- * block until the outgoing queue is empty. The outgoing property may be
+ * Messages remaining in the outgoing queue. The outgoing property may be
* used to check the depth of the outgoing queue.
* <p>
* When the content in a given Message object is copied to the outgoing
@@ -554,13 +561,23 @@ _Messenger_['subscribe'] = function(source) {
* @method put
* @memberof! proton.Messenger#
* @param {proton.Message} message a Message to send.
+ * @param {boolean} flush if this is set true or is undefined then messages are
+ * flushed (this is the default). If explicitly set to false then messages
+ * may not be sent immediately and might require an explicit call to work().
+ * This may be used to "batch up" messages and *may* be more efficient.
* @returns {proton.Data.Long} a tracker.
*/
-_Messenger_['put'] = function(message) {
+_Messenger_['put'] = function(message, flush) {
+ flush = flush === false ? false : true;
message._preEncode();
this._checkErrors = true;
this._check(_pn_messenger_put(this._messenger, message._message));
+ // If flush is set invoke pn_messenger_work.
+ if (flush) {
+ _pn_messenger_work(this._messenger, 0);
+ }
+
// Getting the tracker is a little tricky as it is a 64 bit number. The way
// emscripten handles this is to return the low 32 bits directly and pass
// the high 32 bits via the tempRet0 variable. We use Data.Long to pass the
@@ -578,6 +595,12 @@ _Messenger_['put'] = function(message) {
* @returns {proton.Status} one of None, PENDING, REJECTED, or ACCEPTED.
*/
_Messenger_['status'] = function(tracker) {
+ if (tracker == null) {
+ var low = _pn_messenger_outgoing_tracker(this._messenger);
+ var high = Runtime.getTempRet0();
+ tracker = new Data.Long(low, high);
+ }
+
return _pn_messenger_status(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits());
};
@@ -589,6 +612,12 @@ _Messenger_['status'] = function(tracker) {
* @returns {boolean} true if delivery is still buffered.
*/
_Messenger_['isBuffered'] = function(tracker) {
+ if (tracker == null) {
+ var low = _pn_messenger_outgoing_tracker(this._messenger);
+ var high = Runtime.getTempRet0();
+ tracker = new Data.Long(low, high);
+ }
+
return (_pn_messenger_buffered(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits()) > 0);
};
@@ -982,7 +1011,9 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl
};
}
- _messengers[name].callbacks.push(callback);
+ if (callback) {
+ _messengers[name].callbacks.push(callback);
+ }
};
/**
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/proton-c/bindings/javascript/my-library.js
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/my-library.js b/proton-c/bindings/javascript/my-library.js
deleted file mode 100644
index af89ef4..0000000
--- a/proton-c/bindings/javascript/my-library.js
+++ /dev/null
@@ -1,755 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-mergeInto(LibraryManager.library, {
-// Add to main emscripten library.js
-
-
-// Hacks below
-// -----------------------------------------------------------------------------------------------------------------
-
- $SOCKFS__postset: '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });',
- $SOCKFS__deps: ['$FS'],
- $SOCKFS: {
- mount: function(mount) {
- // If Module['websocket'] has already been defined (e.g. for configuring
- // subprotocol/url) use that, if not initialise it to a new object.
- Module['websocket'] = (Module['websocket'] &&
- ('object' === typeof Module['websocket'])) ? Module['websocket'] : {};
-
- // Add Event registration mechanism to the exported websocket configuration
- // object so we can register network callbacks from native JavaScript too.
- Module['websocket']._callbacks = {};
- Module['websocket'].on = function(event, callback) {
- if ('function' === typeof callback) {
- this._callbacks[event] = callback;
- }
- return this;
- };
-
- Module['websocket'].emit = function(event, param) {
- if ('function' === typeof this._callbacks[event]) {
- this._callbacks[event].call(this, param);
- }
- };
-
- // Register default null callbacks for each Event
- Module['websocket'].on("error", function(error) {
-console.log("Websocket error " + error);
- });
-
- Module['websocket'].on("open", function(fd) {
-console.log("Websocket open fd = " + fd);
- });
-
- Module['websocket'].on("connection", function(fd) {
-console.log("Websocket connection fd = " + fd);
- });
-
- Module['websocket'].on("message", function(fd) {
-console.log("Websocket message fd = " + fd);
- });
-
- Module['websocket'].on("close", function(fd) {
-console.log("Websocket close fd = " + fd);
- });
-
- return FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0);
- },
- createSocket: function(family, type, protocol) {
- var streaming = type == {{{ cDefine('SOCK_STREAM') }}};
- if (protocol) {
- assert(streaming == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if SOCK_STREAM, must be tcp
- }
-
- // create our internal socket structure
- var sock = {
- family: family,
- type: type,
- protocol: protocol,
- server: null,
- peers: {},
- pending: [],
- recv_queue: [],
-#if SOCKET_WEBRTC
-#else
- sock_ops: SOCKFS.websocket_sock_ops
-#endif
- };
-
- // create the filesystem node to store the socket structure
- var name = SOCKFS.nextname();
- var node = FS.createNode(SOCKFS.root, name, {{{ cDefine('S_IFSOCK') }}}, 0);
- node.sock = sock;
-
- // and the wrapping stream that enables library functions such
- // as read and write to indirectly interact with the socket
- var stream = FS.createStream({
- path: name,
- node: node,
- flags: FS.modeStringToFlags('r+'),
- seekable: false,
- stream_ops: SOCKFS.stream_ops
- });
-
- // map the new stream to the socket structure (sockets have a 1:1
- // relationship with a stream)
- sock.stream = stream;
-
- return sock;
- },
- getSocket: function(fd) {
- var stream = FS.getStream(fd);
- if (!stream || !FS.isSocket(stream.node.mode)) {
- return null;
- }
- return stream.node.sock;
- },
- // node and stream ops are backend agnostic
- stream_ops: {
- poll: function(stream) {
- var sock = stream.node.sock;
- return sock.sock_ops.poll(sock);
- },
- ioctl: function(stream, request, varargs) {
-console.log('stream_ops.ioctl');
- var sock = stream.node.sock;
- return sock.sock_ops.ioctl(sock, request, varargs);
- },
- read: function(stream, buffer, offset, length, position /* ignored */) {
- var sock = stream.node.sock;
- var msg = sock.sock_ops.recvmsg(sock, length);
- if (!msg) {
- // socket is closed
- return 0;
- }
-#if USE_TYPED_ARRAYS == 2
- buffer.set(msg.buffer, offset);
-#else
- for (var i = 0; i < size; i++) {
- buffer[offset + i] = msg.buffer[i];
- }
-#endif
- return msg.buffer.length;
- },
- write: function(stream, buffer, offset, length, position /* ignored */) {
- var sock = stream.node.sock;
- return sock.sock_ops.sendmsg(sock, buffer, offset, length);
- },
- close: function(stream) {
-console.log('stream_ops.close');
- var sock = stream.node.sock;
- sock.sock_ops.close(sock);
- }
- },
- nextname: function() {
- if (!SOCKFS.nextname.current) {
- SOCKFS.nextname.current = 0;
- }
- return 'socket[' + (SOCKFS.nextname.current++) + ']';
- },
- // backend-specific stream ops
- websocket_sock_ops: {
- //
- // peers are a small wrapper around a WebSocket to help in
- // emulating dgram sockets
- //
- // these functions aren't actually sock_ops members, but we're
- // abusing the namespace to organize them
- //
- createPeer: function(sock, addr, port) {
- var ws;
-
- if (typeof addr === 'object') {
- ws = addr;
- addr = null;
- port = null;
- }
-
- if (ws) {
- // for sockets that've already connected (e.g. we're the server)
- // we can inspect the _socket property for the address
- if (ws._socket) {
- addr = ws._socket.remoteAddress;
- port = ws._socket.remotePort;
- }
- // if we're just now initializing a connection to the remote,
- // inspect the url property
- else {
- var result = /ws[s]?:\/\/([^:]+):(\d+)/.exec(ws.url);
- if (!result) {
- throw new Error('WebSocket URL must be in the format ws(s)://address:port');
- }
- addr = result[1];
- port = parseInt(result[2], 10);
- }
- } else {
- // Create the actual websocket object and connect.
- try {
- // runtimeConfig gets set to true if WebSocket runtime configuration is available.
- var runtimeConfig = (Module['websocket'] && ('object' === typeof Module['websocket']));
-
- // The default value is 'ws://' the replace is needed because the compiler replaces "//" comments with '#'
- // comments without checking context, so we'd end up with ws:#, the replace swaps the "#" for "//" again.
- var url = '{{{ WEBSOCKET_URL }}}'.replace('#', '//');
-
- if (runtimeConfig) {
- if ('string' === typeof Module['websocket']['url']) {
- url = Module['websocket']['url']; // Fetch runtime WebSocket URL config.
- }
- }
-
- if (url === 'ws://' || url === 'wss://') { // Is the supplied URL config just a prefix, if so complete it.
- url = url + addr + ':' + port;
- }
-
- // Make the WebSocket subprotocol (Sec-WebSocket-Protocol) default to binary if no configuration is set.
- var subProtocols = '{{{ WEBSOCKET_SUBPROTOCOL }}}'; // The default value is 'binary'
-
- if (runtimeConfig) {
- if ('string' === typeof Module['websocket']['subprotocol']) {
- subProtocols = Module['websocket']['subprotocol']; // Fetch runtime WebSocket subprotocol config.
- }
- }
-
- // The regex trims the string (removes spaces at the beginning and end, then splits the string by
- // <any space>,<any space> into an Array. Whitespace removal is important for Websockify and ws.
- subProtocols = subProtocols.replace(/^ +| +$/g,"").split(/ *, */);
-
- // The node ws library API for specifying optional subprotocol is slightly different than the browser's.
- var opts = ENVIRONMENT_IS_NODE ? {'protocol': subProtocols.toString()} : subProtocols;
-
-#if SOCKET_DEBUG
- Module.print('connect: ' + url + ', ' + subProtocols.toString());
-#endif
- // If node we use the ws library.
- var WebSocket = ENVIRONMENT_IS_NODE ? require('ws') : window['WebSocket'];
- ws = new WebSocket(url, opts);
- ws.binaryType = 'arraybuffer';
- } catch (e) {
-console.log('e: ' + e);
- throw new FS.ErrnoError(ERRNO_CODES.EHOSTUNREACH);
- }
- }
-
-#if SOCKET_DEBUG
- Module.print('websocket adding peer: ' + addr + ':' + port);
-#endif
-
- var peer = {
- addr: addr,
- port: port,
- socket: ws,
- dgram_send_queue: []
- };
-
- SOCKFS.websocket_sock_ops.addPeer(sock, peer);
- SOCKFS.websocket_sock_ops.handlePeerEvents(sock, peer);
-
- // if this is a bound dgram socket, send the port number first to allow
- // us to override the ephemeral port reported to us by remotePort on the
- // remote end.
- if (sock.type === {{{ cDefine('SOCK_DGRAM') }}} && typeof sock.sport !== 'undefined') {
-#if SOCKET_DEBUG
- Module.print('websocket queuing port message (port ' + sock.sport + ')');
-#endif
- peer.dgram_send_queue.push(new Uint8Array([
- 255, 255, 255, 255,
- 'p'.charCodeAt(0), 'o'.charCodeAt(0), 'r'.charCodeAt(0), 't'.charCodeAt(0),
- ((sock.sport & 0xff00) >> 8) , (sock.sport & 0xff)
- ]));
- }
-
- return peer;
- },
- getPeer: function(sock, addr, port) {
- return sock.peers[addr + ':' + port];
- },
- addPeer: function(sock, peer) {
- sock.peers[peer.addr + ':' + peer.port] = peer;
- },
- removePeer: function(sock, peer) {
- delete sock.peers[peer.addr + ':' + peer.port];
- },
- handlePeerEvents: function(sock, peer) {
- var first = true;
-
- var handleOpen = function () {
-#if SOCKET_DEBUG
- Module.print('websocket handle open');
-#endif
- try {
- var queued = peer.dgram_send_queue.shift();
- while (queued) {
-#if SOCKET_DEBUG
- Module.print('websocket sending queued data (' + queued.byteLength + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(queued))]);
-#endif
- peer.socket.send(queued);
- queued = peer.dgram_send_queue.shift();
- }
- } catch (e) {
- // not much we can do here in the way of proper error handling as we've already
- // lied and said this data was sent. shut it down.
- peer.socket.close();
- }
-
-
-
- if (Module['networkCallback']) {
-console.log("handleOpen triggering networkCallback");
-
- Module['networkCallback']();
- }
-
- Module['websocket'].emit('open', 10);
-
- };
-
- function handleMessage(data) {
- assert(typeof data !== 'string' && data.byteLength !== undefined); // must receive an ArrayBuffer
- data = new Uint8Array(data); // make a typed array view on the array buffer
-
-#if SOCKET_DEBUG
- //Module.print('websocket handle message (' + data.byteLength + ' bytes): ' + [Array.prototype.slice.call(data)]);
- //Module.print('websocket handle message (' + data.byteLength + ' bytes)');
-#endif
-
- // if this is the port message, override the peer's port with it
- var wasfirst = first;
- first = false;
- if (wasfirst &&
- data.length === 10 &&
- data[0] === 255 && data[1] === 255 && data[2] === 255 && data[3] === 255 &&
- data[4] === 'p'.charCodeAt(0) && data[5] === 'o'.charCodeAt(0) && data[6] === 'r'.charCodeAt(0) && data[7] === 't'.charCodeAt(0)) {
- // update the peer's port and it's key in the peer map
- var newport = ((data[8] << 8) | data[9]);
- SOCKFS.websocket_sock_ops.removePeer(sock, peer);
- peer.port = newport;
- SOCKFS.websocket_sock_ops.addPeer(sock, peer);
- return;
- }
-
- sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data });
-
-
-
- if (Module['networkCallback']) {
-console.log("handleMessage triggering networkCallback");
-
- Module['networkCallback']();
- }
-
- Module['websocket'].emit('message', 10);
-
-
- };
-
- if (ENVIRONMENT_IS_NODE) {
- peer.socket.on('open', handleOpen);
- peer.socket.on('message', function(data, flags) {
- if (!flags.binary) {
- return;
- }
- handleMessage((new Uint8Array(data)).buffer); // copy from node Buffer -> ArrayBuffer
- });
- peer.socket.on('close', function() {
- Module['websocket'].emit('close', 10);
- });
- peer.socket.on('error', function(error) {
- Module['websocket'].emit('error', error);
- // don't throw
- });
- } else {
- peer.socket.onopen = handleOpen;
- peer.socket.onclose = function() {
- Module['websocket'].emit('close', 10);
- };
- peer.socket.onmessage = function peer_socket_onmessage(event) {
- handleMessage(event.data);
- };
- peer.socket.onerror = function(error) {
- Module['websocket'].emit('error', error);
- };
- }
- },
-
- //
- // actual sock ops
- //
- poll: function(sock) {
- if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) {
- // listen sockets should only say they're available for reading
- // if there are pending clients.
- return sock.pending.length ? ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}}) : 0;
- }
-
- var mask = 0;
- var dest = sock.type === {{{ cDefine('SOCK_STREAM') }}} ? // we only care about the socket state for connection-based sockets
- SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport) :
- null;
-
- if (sock.recv_queue.length ||
- !dest || // connection-less sockets are always ready to read
- (dest && dest.socket.readyState === dest.socket.CLOSING) ||
- (dest && dest.socket.readyState === dest.socket.CLOSED)) { // let recv return 0 once closed
- mask |= ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}});
- }
-
- if (!dest || // connection-less sockets are always ready to write
- (dest && dest.socket.readyState === dest.socket.OPEN)) {
- mask |= {{{ cDefine('POLLOUT') }}};
- }
-
- if ((dest && dest.socket.readyState === dest.socket.CLOSING) ||
- (dest && dest.socket.readyState === dest.socket.CLOSED)) {
- mask |= {{{ cDefine('POLLHUP') }}};
- }
- return mask;
- },
- ioctl: function(sock, request, arg) {
-console.log('ioctl');
- switch (request) {
- case {{{ cDefine('FIONREAD') }}}:
- var bytes = 0;
- if (sock.recv_queue.length) {
- bytes = sock.recv_queue[0].data.length;
- }
- {{{ makeSetValue('arg', '0', 'bytes', 'i32') }}};
- return 0;
- default:
- return ERRNO_CODES.EINVAL;
- }
- },
- close: function(sock) {
-console.log('close');
- // if we've spawned a listen server, close it
- if (sock.server) {
- try {
- sock.server.close();
- } catch (e) {
- }
- sock.server = null;
- }
- // close any peer connections
- var peers = Object.keys(sock.peers);
- for (var i = 0; i < peers.length; i++) {
- var peer = sock.peers[peers[i]];
- try {
- peer.socket.close();
- } catch (e) {
- }
- SOCKFS.websocket_sock_ops.removePeer(sock, peer);
- }
- return 0;
- },
- bind: function(sock, addr, port) {
- if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') {
- throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already bound
- }
- sock.saddr = addr;
- sock.sport = port || _mkport();
- // in order to emulate dgram sockets, we need to launch a listen server when
- // binding on a connection-less socket
- // note: this is only required on the server side
- if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) {
- // close the existing server if it exists
- if (sock.server) {
- sock.server.close();
- sock.server = null;
- }
- // swallow error operation not supported error that occurs when binding in the
- // browser where this isn't supported
- try {
- sock.sock_ops.listen(sock, 0);
- } catch (e) {
- if (!(e instanceof FS.ErrnoError)) throw e;
- if (e.errno !== ERRNO_CODES.EOPNOTSUPP) throw e;
- }
- }
- },
- connect: function(sock, addr, port) {
- if (sock.server) {
- throw new FS.ErrnoError(ERRNO_CODS.EOPNOTSUPP);
- }
-
- // TODO autobind
- // if (!sock.addr && sock.type == {{{ cDefine('SOCK_DGRAM') }}}) {
- // }
-
- // early out if we're already connected / in the middle of connecting
- if (typeof sock.daddr !== 'undefined' && typeof sock.dport !== 'undefined') {
- var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport);
- if (dest) {
- if (dest.socket.readyState === dest.socket.CONNECTING) {
- throw new FS.ErrnoError(ERRNO_CODES.EALREADY);
- } else {
- throw new FS.ErrnoError(ERRNO_CODES.EISCONN);
- }
- }
- }
-
- // add the socket to our peer list and set our
- // destination address / port to match
- var peer = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port);
- sock.daddr = peer.addr;
- sock.dport = peer.port;
-
- // always "fail" in non-blocking mode
- throw new FS.ErrnoError(ERRNO_CODES.EINPROGRESS);
- },
- listen: function(sock, backlog) {
- if (!ENVIRONMENT_IS_NODE) {
- throw new FS.ErrnoError(ERRNO_CODES.EOPNOTSUPP);
- }
- if (sock.server) {
- throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already listening
- }
-
- var WebSocketServer = require('ws').Server;
- var host = sock.saddr;
-#if SOCKET_DEBUG
- console.log('listen: ' + host + ':' + sock.sport);
-#endif
- sock.server = new WebSocketServer({
- host: host,
- port: sock.sport
- // TODO support backlog
- });
-
- sock.server.on('connection', function(ws) {
-#if SOCKET_DEBUG
- console.log('received connection from: ' + ws._socket.remoteAddress + ':' + ws._socket.remotePort);
-#endif
- if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) {
- var newsock = SOCKFS.createSocket(sock.family, sock.type, sock.protocol);
-
- // create a peer on the new socket
- var peer = SOCKFS.websocket_sock_ops.createPeer(newsock, ws);
- newsock.daddr = peer.addr;
- newsock.dport = peer.port;
-
- // push to queue for accept to pick up
- sock.pending.push(newsock);
- } else {
- // create a peer on the listen socket so calling sendto
- // with the listen socket and an address will resolve
- // to the correct client
- SOCKFS.websocket_sock_ops.createPeer(sock, ws);
- }
-
- if (Module['networkCallback']) {
-console.log("On connection triggering networkCallback");
-
- Module['networkCallback']();
- }
-
- Module['websocket'].emit('connection', 10);
-
-
- });
- sock.server.on('closed', function() {
-console.log('sock.server closed');
- Module['websocket'].emit('close', 10);
- sock.server = null;
- });
- sock.server.on('error', function(error) {
-console.log('sock.server error');
- Module['websocket'].emit('error', error);
- // don't throw
- });
- },
- accept: function(listensock) {
- if (!listensock.server) {
- throw new FS.ErrnoError(ERRNO_CODES.EINVAL);
- }
-
- var newsock = listensock.pending.shift();
- newsock.stream.flags = listensock.stream.flags;
- return newsock;
- },
- getname: function(sock, peer) {
-console.log('getname');
- var addr, port;
- if (peer) {
- if (sock.daddr === undefined || sock.dport === undefined) {
- throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
- }
- addr = sock.daddr;
- port = sock.dport;
- } else {
- // TODO saddr and sport will be set for bind()'d UDP sockets, but what
- // should we be returning for TCP sockets that've been connect()'d?
- addr = sock.saddr || 0;
- port = sock.sport || 0;
- }
- return { addr: addr, port: port };
- },
- sendmsg: function(sock, buffer, offset, length, addr, port) {
- if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) {
- // connection-less sockets will honor the message address,
- // and otherwise fall back to the bound destination address
- if (addr === undefined || port === undefined) {
- addr = sock.daddr;
- port = sock.dport;
- }
- // if there was no address to fall back to, error out
- if (addr === undefined || port === undefined) {
- throw new FS.ErrnoError(ERRNO_CODES.EDESTADDRREQ);
- }
- } else {
- // connection-based sockets will only use the bound
- addr = sock.daddr;
- port = sock.dport;
- }
-
- // find the peer for the destination address
- var dest = SOCKFS.websocket_sock_ops.getPeer(sock, addr, port);
-
- // early out if not connected with a connection-based socket
- if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) {
- if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) {
- throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
- } else if (dest.socket.readyState === dest.socket.CONNECTING) {
- throw new FS.ErrnoError(ERRNO_CODES.EAGAIN);
- }
- }
-
- // create a copy of the incoming data to send, as the WebSocket API
- // doesn't work entirely with an ArrayBufferView, it'll just send
- // the entire underlying buffer
- var data;
- if (buffer instanceof Array || buffer instanceof ArrayBuffer) {
- data = buffer.slice(offset, offset + length);
- } else { // ArrayBufferView
- data = buffer.buffer.slice(buffer.byteOffset + offset, buffer.byteOffset + offset + length);
- }
-
- // if we're emulating a connection-less dgram socket and don't have
- // a cached connection, queue the buffer to send upon connect and
- // lie, saying the data was sent now.
- if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) {
- if (!dest || dest.socket.readyState !== dest.socket.OPEN) {
- // if we're not connected, open a new connection
- if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) {
- dest = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port);
- }
-#if SOCKET_DEBUG
- Module.print('websocket queuing (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]);
-#endif
- dest.dgram_send_queue.push(data);
- return length;
- }
- }
-
- try {
-#if SOCKET_DEBUG
- Module.print('websocket send (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]);
-#endif
-
- // send the actual data
- dest.socket.send(data);
-
- return length;
- } catch (e) {
- throw new FS.ErrnoError(ERRNO_CODES.EINVAL);
- }
- },
- recvmsg: function(sock, length) {
- // http://pubs.opengroup.org/onlinepubs/7908799/xns/recvmsg.html
- if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) {
- // tcp servers should not be recv()'ing on the listen socket
- throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
- }
-
- var queued = sock.recv_queue.shift();
- if (!queued) {
- if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) {
- var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport);
-
- if (!dest) {
- // if we have a destination address but are not connected, error out
- throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
- }
- else if (dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) {
- // return null if the socket has closed
- return null;
- }
- else {
- // else, our socket is in a valid state but truly has nothing available
- throw new FS.ErrnoError(ERRNO_CODES.EAGAIN);
- }
- } else {
- throw new FS.ErrnoError(ERRNO_CODES.EAGAIN);
- }
- }
-
- // queued.data will be an ArrayBuffer if it's unadulterated, but if it's
- // requeued TCP data it'll be an ArrayBufferView
- var queuedLength = queued.data.byteLength || queued.data.length;
- var queuedOffset = queued.data.byteOffset || 0;
- var queuedBuffer = queued.data.buffer || queued.data;
- var bytesRead = Math.min(length, queuedLength);
- var res = {
- buffer: new Uint8Array(queuedBuffer, queuedOffset, bytesRead),
- addr: queued.addr,
- port: queued.port
- };
-
-#if SOCKET_DEBUG
- Module.print('websocket read (' + bytesRead + ' bytes): ' + [Array.prototype.slice.call(res.buffer)]);
-#endif
-
- // push back any unread data for TCP connections
- if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && bytesRead < queuedLength) {
- var bytesRemaining = queuedLength - bytesRead;
-#if SOCKET_DEBUG
- Module.print('websocket read: put back ' + bytesRemaining + ' bytes');
-#endif
- queued.data = new Uint8Array(queuedBuffer, queuedOffset + bytesRead, bytesRemaining);
- sock.recv_queue.unshift(queued);
- }
-
- return res;
- }
- }
- },
-
- emscripten_set_network_callback: function(func) {
-
- function _func() {
- try {
- Runtime.dynCall('v', func);
- } catch (e) {
- if (e instanceof ExitStatus) {
- return;
- } else {
- if (e && typeof e === 'object' && e.stack) Module.printErr('exception thrown: ' + [e, e.stack]);
- throw e;
- }
- }
- };
-
- Module['noExitRuntime'] = true;
- Module['networkCallback'] = _func;
- }
-
-});
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/tests/javascript/soak.js
----------------------------------------------------------------------
diff --git a/tests/javascript/soak.js b/tests/javascript/soak.js
new file mode 100755
index 0000000..c561989
--- /dev/null
+++ b/tests/javascript/soak.js
@@ -0,0 +1,99 @@
+#!/usr/bin/env node
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Check if the environment is Node.js and if so import the required library.
+if (typeof exports !== "undefined" && exports !== null) {
+ proton = require("qpid-proton");
+}
+
+var addr = 'guest:guest@localhost:5673';
+//var addr = 'localhost:5673';
+var address = 'amqp://' + addr;
+console.log(address);
+
+var subscriptionQueue = '';
+var subscription;
+var subscribed = false;
+var count = 0;
+var start = 0; // Start Time.
+
+var message = new proton.Message();
+var messenger = new proton.Messenger();
+
+var pumpData = function() {
+ if (!subscribed) {
+ var subscriptionAddress = subscription.getAddress();
+ if (subscriptionAddress) {
+ subscribed = true;
+ var splitAddress = subscriptionAddress.split('/');
+ subscriptionQueue = splitAddress[splitAddress.length - 1];
+ onSubscription();
+ }
+ }
+
+ while (messenger.incoming()) {
+ // The second parameter forces Binary payloads to be decoded as strings
+ // this is useful because the broker QMF Agent encodes strings as AMQP
+ // binary, which is a right pain from an interoperability perspective.
+ var t = messenger.get(message, true);
+ //console.log("Address: " + message.getAddress());
+ //console.log("Content: " + message.body);
+ messenger.accept(t);
+
+ if (count % 1000 === 0) {
+ var time = +new Date();
+ console.log("count = " + count + ", duration = " + (time - start) + ", rate = " + ((count*1000)/(time - start)));
+ }
+
+ sendMessage();
+ }
+
+ if (messenger.isStopped()) {
+ message.free();
+ messenger.free();
+ }
+};
+
+var sendMessage = function() {
+ var msgtext = "Message Number " + count;
+ count++;
+
+ message.setAddress(address + '/' + subscriptionQueue);
+ message.body = msgtext;
+ messenger.put(message);
+//messenger.settle();
+};
+
+messenger.on('error', function(error) {console.log(error);});
+messenger.on('work', pumpData);
+//messenger.setOutgoingWindow(1024);
+messenger.setIncomingWindow(1024); // The Java Broker seems to need this.
+messenger.start();
+
+subscription = messenger.subscribe('amqp://' + addr + '/#');
+messenger.recv(); // Receive as many messages as messenger can buffer.
+
+var onSubscription = function() {
+ console.log("Subscription Queue: " + subscriptionQueue);
+ start = +new Date();
+ sendMessage();
+};
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org