You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2016/04/12 16:59:21 UTC
qpid-proton git commit: PROTON-1133: Reactor should not use the
connection hostname as a transport address.
Repository: qpid-proton
Updated Branches:
refs/heads/master 6ea002d27 -> 619c5c7ff
PROTON-1133: Reactor should not use the connection hostname as a
transport address.
This patch introduces a new reactor API for setting the host address
for a connection created via the reactor. This API is intended to
replace the existing semantics where the connection's host address is
derived via the connection's hostname setting.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/619c5c7f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/619c5c7f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/619c5c7f
Branch: refs/heads/master
Commit: 619c5c7ff6f155468f472b66a60021db72af8ecf
Parents: 6ea002d
Author: Ken Giusti <kg...@apache.org>
Authored: Sat Mar 26 20:13:59 2016 -0400
Committer: Ken Giusti <kg...@apache.org>
Committed: Tue Apr 12 10:54:28 2016 -0400
----------------------------------------------------------------------
examples/c/reactor/receiver.c | 32 +++++-
examples/c/reactor/sender.c | 33 +++++-
.../qpid/proton/example/reactor/Send.java | 27 +++--
examples/python/reactor/send.py | 20 ++--
examples/python/reactor/tornado-send.py | 20 ++--
proton-c/bindings/python/proton/__init__.py | 9 +-
proton-c/bindings/python/proton/reactor.py | 47 +++++++--
proton-c/include/proton/connection.h | 11 +-
proton-c/include/proton/reactor.h | 72 ++++++++++++-
proton-c/src/reactor/connection.c | 100 ++++++++++++++++---
proton-c/src/tests/reactor.c | 33 +++++-
.../apache/qpid/proton/engine/Connection.java | 11 ++
.../org/apache/qpid/proton/reactor/Reactor.java | 57 +++++++++--
.../qpid/proton/reactor/impl/IOHandler.java | 33 ++++--
.../qpid/proton/reactor/impl/ReactorImpl.java | 36 +++++++
.../apache/qpid/proton/reactor/ReactorTest.java | 21 ++--
.../org/apache/qpid/proton/ProtonJInterop.java | 20 ++--
tests/java/shim/creactor.py | 7 ++
18 files changed, 493 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/c/reactor/receiver.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c
index dd6a9e3..0948569 100644
--- a/examples/c/reactor/receiver.c
+++ b/examples/c/reactor/receiver.c
@@ -32,6 +32,8 @@
#include "proton/delivery.h"
#include "proton/event.h"
#include "proton/handlers.h"
+#include "proton/transport.h"
+#include "proton/url.h"
static int quiet = 0;
@@ -148,6 +150,23 @@ static void event_handler(pn_handler_t *handler,
}
} break;
+ case PN_TRANSPORT_ERROR: {
+ // The connection to the peer failed.
+ //
+ pn_transport_t *tport = pn_event_transport(event);
+ pn_condition_t *cond = pn_transport_condition(tport);
+ fprintf(stderr, "Network transport failed!\n");
+ if (pn_condition_is_set(cond)) {
+ const char *name = pn_condition_get_name(cond);
+ const char *desc = pn_condition_get_description(cond);
+ fprintf(stderr, " Error: %s Description: %s\n",
+ (name) ? name : "<error name not provided>",
+ (desc) ? desc : "<no description provided>");
+ }
+ // pn_reactor_process() will exit with a false return value, stopping
+ // the main loop.
+ } break;
+
default:
break;
}
@@ -211,11 +230,20 @@ int main(int argc, char** argv)
}
pn_reactor_t *reactor = pn_reactor();
- pn_connection_t *conn = pn_reactor_connection(reactor, handler);
+
+ pn_url_t *url = pn_url_parse(address);
+ if (url == NULL) {
+ fprintf(stderr, "Invalid host address %s\n", address);
+ exit(1);
+ }
+ pn_connection_t *conn = pn_reactor_connection_to_host(reactor,
+ pn_url_get_host(url),
+ pn_url_get_port(url),
+ handler);
+ pn_decref(url);
// the container name should be unique for each client
pn_connection_set_container(conn, container);
- pn_connection_set_hostname(conn, address); // FIXME
// wait up to 5 seconds for activity before returning from
// pn_reactor_process()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/c/reactor/sender.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/sender.c b/examples/c/reactor/sender.c
index 9a07c9e..e1f73dc 100644
--- a/examples/c/reactor/sender.c
+++ b/examples/c/reactor/sender.c
@@ -32,6 +32,9 @@
#include "proton/delivery.h"
#include "proton/event.h"
#include "proton/handlers.h"
+#include "proton/transport.h"
+#include "proton/url.h"
+
static int quiet = 0;
@@ -157,6 +160,23 @@ static void event_handler(pn_handler_t *handler,
}
} break;
+ case PN_TRANSPORT_ERROR: {
+ // The connection to the peer failed.
+ //
+ pn_transport_t *tport = pn_event_transport(event);
+ pn_condition_t *cond = pn_transport_condition(tport);
+ fprintf(stderr, "Network transport failed!\n");
+ if (pn_condition_is_set(cond)) {
+ const char *name = pn_condition_get_name(cond);
+ const char *desc = pn_condition_get_description(cond);
+ fprintf(stderr, " Error: %s Description: %s\n",
+ (name) ? name : "<error name not provided>",
+ (desc) ? desc : "<no description provided>");
+ }
+ // pn_reactor_process() will exit with a false return value, stopping
+ // the main loop.
+ } break;
+
default:
break;
}
@@ -260,11 +280,20 @@ int main(int argc, char** argv)
pn_decref(message); // message no longer needed
pn_reactor_t *reactor = pn_reactor();
- pn_connection_t *conn = pn_reactor_connection(reactor, handler);
+
+ pn_url_t *url = pn_url_parse(address);
+ if (url == NULL) {
+ fprintf(stderr, "Invalid host address %s\n", address);
+ exit(1);
+ }
+ pn_connection_t *conn = pn_reactor_connection_to_host(reactor,
+ pn_url_get_host(url),
+ pn_url_get_port(url),
+ handler);
+ pn_decref(url);
// the container name should be unique for each client
pn_connection_set_container(conn, container);
- pn_connection_set_hostname(conn, address); // FIXME
// wait up to 5 seconds for activity before returning from
// pn_reactor_process()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
index 22da720..5978c45 100644
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
@@ -42,12 +42,10 @@ public class Send extends BaseHandler {
private class SendHandler extends BaseHandler {
- private final String hostname;
private final Message message;
private int nextTag = 0;
- private SendHandler(String hostname, Message message) {
- this.hostname = hostname;
+ private SendHandler(Message message) {
this.message = message;
// Add a child handler that performs some default handshaking
@@ -58,7 +56,6 @@ public class Send extends BaseHandler {
@Override
public void onConnectionInit(Event event) {
Connection conn = event.getConnection();
- conn.setHostname(hostname);
// Every session or link could have their own handler(s) if we
// wanted simply by adding the handler to the given session
@@ -111,11 +108,13 @@ public class Send extends BaseHandler {
}
}
- private final String hostname;
+ private final String host;
+ private final int port;
private final Message message;
- private Send(String hostname, String content) {
- this.hostname = hostname;
+ private Send(String host, int port, String content) {
+ this.host = host;
+ this.port = port;
message = Proton.message();
message.setBody(new AmqpValue(content));
}
@@ -128,14 +127,22 @@ public class Send extends BaseHandler {
// for this connection will go to the SendHandler object instead of
// going to the reactor. If you were to omit the SendHandler object,
// all the events would go to the reactor.
- event.getReactor().connection(new SendHandler(hostname, message));
+ event.getReactor().connectionToHost(host, port, new SendHandler(message));
}
public static void main(String[] args) throws IOException {
- String hostname = args.length > 0 ? args[0] : "localhost";
+ int port = 5672;
+ String host = "localhost";
+ if (args.length > 0) {
+ String[] parts = args[0].split(":", 2);
+ host = parts[0];
+ if (parts.length > 1) {
+ port = Integer.parseInt(parts[1]);
+ }
+ }
String content = args.length > 1 ? args[1] : "Hello World!";
- Reactor r = Proton.reactor(new Send(hostname, content));
+ Reactor r = Proton.reactor(new Send(host, port, content));
r.run();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/python/reactor/send.py
----------------------------------------------------------------------
diff --git a/examples/python/reactor/send.py b/examples/python/reactor/send.py
index c718780..4356da1 100755
--- a/examples/python/reactor/send.py
+++ b/examples/python/reactor/send.py
@@ -19,7 +19,7 @@
#
import sys
-from proton import Message
+from proton import Message, Url
from proton.reactor import Reactor
from proton.handlers import CHandshaker
@@ -30,16 +30,15 @@ from proton.handlers import CHandshaker
class Send:
- def __init__(self, host, message):
- self.host = host
+ def __init__(self, message, target):
self.message = message
+ self.target = target if target is not None else "examples"
# Use the handlers property to add some default handshaking
# behaviour.
self.handlers = [CHandshaker()]
def on_connection_init(self, event):
conn = event.connection
- conn.hostname = self.host
# Every session or link could have their own handler(s) if we
# wanted simply by setting the "handler" slot on the
@@ -51,6 +50,7 @@ class Send:
# the events go to its parent connection. If the connection
# doesn't have a handler, the events go to the reactor.
snd = ssn.sender("sender")
+ snd.target.address = self.target
conn.open()
ssn.open()
snd.open()
@@ -69,8 +69,8 @@ class Send:
class Program:
- def __init__(self, hostname, content):
- self.hostname = hostname
+ def __init__(self, url, content):
+ self.url = url
self.content = content
def on_reactor_init(self, event):
@@ -80,11 +80,13 @@ class Program:
# for this connection will go to the Send object instead of
# going to the reactor. If you were to omit the Send object,
# all the events would go to the reactor.
- event.reactor.connection(Send(self.hostname, Message(self.content)))
+ event.reactor.connection_to_host(self.url.host, self.url.port,
+ Send(Message(self.content),
+ self.url.path))
args = sys.argv[1:]
-hostname = args.pop() if args else "localhost"
+url = Url(args.pop() if args else "localhost:5672/examples")
content = args.pop() if args else "Hello World!"
-r = Reactor(Program(hostname, content))
+r = Reactor(Program(url, content))
r.run()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/python/reactor/tornado-send.py
----------------------------------------------------------------------
diff --git a/examples/python/reactor/tornado-send.py b/examples/python/reactor/tornado-send.py
index 54b8618..c69876a 100755
--- a/examples/python/reactor/tornado-send.py
+++ b/examples/python/reactor/tornado-send.py
@@ -20,21 +20,20 @@
import sys, tornado.ioloop
from tornado_app import TornadoApp
-from proton import Message
+from proton import Message, Url
from proton.handlers import CHandshaker
class Send:
- def __init__(self, host, message):
- self.host = host
+ def __init__(self, message, target):
self.message = message
+ self.target = target if target is not None else "examples"
# Use the handlers property to add some default handshaking
# behaviour.
self.handlers = [CHandshaker()]
def on_connection_init(self, event):
conn = event.connection
- conn.hostname = self.host
# Every session or link could have their own handler(s) if we
# wanted simply by setting the "handler" slot on the
@@ -46,6 +45,7 @@ class Send:
# the events go to its parent connection. If the connection
# doesn't have a handler, the events go to the reactor.
snd = ssn.sender("sender")
+ snd.target.address = self.target
conn.open()
ssn.open()
snd.open()
@@ -61,8 +61,8 @@ class Send:
class Program:
- def __init__(self, hostname, content):
- self.hostname = hostname
+ def __init__(self, url, content):
+ self.url = url
self.content = content
def on_reactor_init(self, event):
@@ -72,11 +72,13 @@ class Program:
# for this connection will go to the Send object instead of
# going to the reactor. If you were to omit the Send object,
# all the events would go to the reactor.
- event.reactor.connection(Send(self.hostname, Message(self.content)))
+ event.reactor.connection_to_host(self.url.host, self.url.port,
+ Send(Message(self.content),
+ self.url.path))
args = sys.argv[1:]
-hostname = args.pop() if args else "localhost"
+url = Url(args.pop() if args else "localhost:5672/examples")
content = args.pop() if args else "Hello World!"
-TornadoApp(Program(hostname, content))
+TornadoApp(Program(url, content))
tornado.ioloop.IOLoop.instance().start()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 5ffede8..d7db20b 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -2496,7 +2496,14 @@ class Connection(Wrapper, Endpoint):
def _set_hostname(self, name):
return pn_connection_set_hostname(self._impl, unicode2utf8(name))
- hostname = property(_get_hostname, _set_hostname)
+ hostname = property(_get_hostname, _set_hostname,
+ doc="""
+Set the name of the host (either fully qualified or relative) to which this
+connection is connecting to. This information may be used by the remote
+peer to determine the correct back-end service to connect the client to.
+This value will be sent in the Open performative, and will be used by SSL
+and SASL layers to identify the peer.
+""")
def _get_user(self):
return utf82unicode(pn_connection_get_user(self._impl))
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/bindings/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py
index cda6248..df1e039 100644
--- a/proton-c/bindings/python/proton/reactor.py
+++ b/proton-c/bindings/python/proton/reactor.py
@@ -178,11 +178,42 @@ class Reactor(Wrapper):
raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
def connection(self, handler=None):
+ """Deprecated: use connection_to_host() instead
+ """
impl = _chandler(handler, self.on_error)
result = Connection.wrap(pn_reactor_connection(self._impl, impl))
- pn_decref(impl)
+ if impl: pn_decref(impl)
return result
+ def connection_to_host(self, host, port, handler=None):
+ """Create an outgoing Connection that will be managed by the reactor.
+ The reator's pn_iohandler will create a socket connection to the host
+ once the connection is opened.
+ """
+ conn = self.connection(handler)
+ self.set_connection_host(conn, host, port)
+ return conn
+
+ def set_connection_host(self, connection, host, port):
+ """Change the address used by the connection. The address is
+ used by the reactor's iohandler to create an outgoing socket
+ connection. This must be set prior to opening the connection.
+ """
+ pn_reactor_set_connection_host(self._impl,
+ connection._impl,
+ unicode2utf8(str(host)),
+ unicode2utf8(str(port)))
+
+ def get_connection_address(self, connection):
+ """This may be used to retrieve the host address used by the reactor to
+ establish the outgoing socket connection.
+ @return: string containing the address in URL format or None if no
+ address assigned. Use the proton.Url class to create a Url object from
+ the returned value.
+ """
+ _url = pn_reactor_get_connection_address(self._impl, connection._impl)
+ return utf82unicode(_url)
+
def selectable(self, handler=None):
impl = _chandler(handler, self.on_error)
result = Selectable.wrap(pn_reactor_selectable(self._impl))
@@ -503,11 +534,11 @@ class Connector(Handler):
self.user = None
self.password = None
- def _connect(self, connection):
+ def _connect(self, connection, reactor):
+ assert(reactor is not None)
url = self.address.next()
- # IoHandler uses the hostname to determine where to try to connect to
- connection.hostname = "%s:%s" % (url.host, url.port)
- logging.info("connecting to %s..." % connection.hostname)
+ reactor.set_connection_host(connection, url.host, str(url.port))
+ logging.info("connecting to %s..." % url)
transport = Transport()
if self.sasl_enabled:
@@ -533,7 +564,7 @@ class Connector(Handler):
self.ssl.peer_hostname = url.host
def on_connection_local_open(self, event):
- self._connect(event.connection)
+ self._connect(event.connection, event.reactor)
def on_connection_remote_open(self, event):
logging.info("connected to %s" % event.connection.hostname)
@@ -551,7 +582,7 @@ class Connector(Handler):
delay = self.reconnect.next()
if delay == 0:
logging.info("Disconnected, reconnecting...")
- self._connect(self.connection)
+ self._connect(self.connection, event.reactor)
else:
logging.info("Disconnected will try to reconnect after %s seconds" % delay)
event.reactor.schedule(delay, self)
@@ -560,7 +591,7 @@ class Connector(Handler):
self.connection = None
def on_timer_task(self, event):
- self._connect(self.connection)
+ self._connect(self.connection, event.reactor)
def on_connection_remote_close(self, event):
self.connection = None
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/include/proton/connection.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h
index f8a688c..da20f94 100644
--- a/proton-c/include/proton/connection.h
+++ b/proton-c/include/proton/connection.h
@@ -326,10 +326,17 @@ PN_EXTERN const char *pn_connection_get_user(pn_connection_t *connection);
PN_EXTERN const char *pn_connection_get_hostname(pn_connection_t *connection);
/**
- * Set the value of the AMQP Hostname used by a connection object.
+ * Set the name of the host (either fully qualified or relative) to which this
+ * connection is connecting to. This information may be used by the remote
+ * peer to determine the correct back-end service to connect the client to.
+ * This value will be sent in the Open performative, and will be used by SSL
+ * and SASL layers to identify the peer.
+ *
+ * @note Note that it is illegal to set the hostname to a numeric IP address or
+ * include a port number.
*
* @param[in] connection the connection object
- * @param[in] hostname the hostname
+ * @param[in] hostname the RFC1035 compliant host name
*/
PN_EXTERN void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
index e91b169..be642a9 100644
--- a/proton-c/include/proton/reactor.h
+++ b/proton-c/include/proton/reactor.h
@@ -74,7 +74,77 @@ PN_EXTERN pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor);
PN_EXTERN void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable);
PN_EXTERN pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port,
pn_handler_t *handler);
-PN_EXTERN pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler);
+
+
+/**
+ * Create an outgoing connection that will be managed by the reactor.
+ *
+ * The reator's pn_iohandler will create a socket connection to the host
+ * once the connection is opened.
+ *
+ * @param[in] reactor the reactor that will own the connection.
+ * @param[in] host the address of the remote host. e.g. "localhost"
+ * @param[in] port the port to connect to. e.g. "5672"
+ * @param[in] handler the handler that will process all events generated by
+ * this connection.
+ * @return a connection object
+ */
+PN_EXTERN pn_connection_t *pn_reactor_connection_to_host(pn_reactor_t *reactor,
+ const char *host,
+ const char *port,
+ pn_handler_t *handler);
+
+/**
+ * Create an outgoing connection that will be managed by the reactor.
+ *
+ * The host address for the connection must be set via
+ * ::pn_reactor_set_connection_host() prior to opening the connection.
+ * Typically this can be done by the handler when processing the
+ * ::PN_CONNECTION_INIT event.
+ *
+ * @param[in] reactor the reactor that will own the connection.
+ * @param[in] handler the handler that will process all events generated by
+ * this connection.
+ * @return a connection object
+ * @deprecated Use ::pn_reactor_connection_to_host() instead.
+ */
+PN_EXTERN pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor,
+ pn_handler_t *handler);
+
+/**
+ * Change the host address used by the connection.
+ *
+ * The address is used by the reactor's iohandler to create an outgoing socket
+ * connection. This must be set prior to opening the connection.
+ *
+ * @param[in] reactor the reactor that owns the connection.
+ * @param[in] connection the connection created by the reactor.
+ * @param[in] host the network address or DNS name of the host to connect to.
+ * @param[in] port the network port to use. Optional - default is "5672"
+ */
+PN_EXTERN void pn_reactor_set_connection_host(pn_reactor_t *reactor,
+ pn_connection_t *connection,
+ const char *host,
+ const char *port);
+/**
+ * Retrieve the host address assigned to a reactor connection.
+ *
+ * This may be used to retrieve the host address used by the reactor to
+ * establish the outgoing socket connection.
+ *
+ * The pointer returned by this operation is valid until either the address is
+ * changed via ::pn_reactor_set_connection_host() or the connection object
+ * is freed.
+ *
+ * @param[in] reactor the reactor that owns the connection.
+ * @param[in] connection the connection created by ::pn_reactor_connection()
+ * @return a C string containing the address in URL format or NULL if no
+ * address assigned. ::pn_url_parse() may be used to create a Proton pn_url_t
+ * instance from the returned value.
+ */
+PN_EXTERN const char *pn_reactor_get_connection_address(pn_reactor_t *reactor,
+ pn_connection_t *connection);
+
PN_EXTERN int pn_reactor_wakeup(pn_reactor_t *reactor);
PN_EXTERN void pn_reactor_start(pn_reactor_t *reactor);
PN_EXTERN bool pn_reactor_quiesced(pn_reactor_t *reactor);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/src/reactor/connection.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c
index 4a57bfd..5cc7099 100644
--- a/proton-c/src/reactor/connection.c
+++ b/proton-c/src/reactor/connection.c
@@ -24,6 +24,7 @@
#include <proton/sasl.h>
#include <proton/ssl.h>
#include <proton/transport.h>
+#include <proton/url.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
@@ -32,6 +33,7 @@
// XXX: overloaded for both directions
PN_HANDLE(PN_TRANCTX)
+PN_HANDLE(PNI_CONN_URL)
static pn_transport_t *pni_transport(pn_selectable_t *sel) {
pn_record_t *record = pn_selectable_attachments(sel);
@@ -110,20 +112,55 @@ void pni_handle_bound(pn_reactor_t *reactor, pn_event_t *event) {
assert(event);
pn_connection_t *conn = pn_event_connection(event);
- const char *hostname = pn_connection_get_hostname(conn);
- if (!hostname) {
- return;
- }
- pn_string_t *str = pn_string(hostname);
- char *host = pn_string_buffer(str);
+ pn_record_t *record = pn_connection_attachments(conn);
+ pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_URL);
+ const char *host = NULL;
const char *port = "5672";
- char *colon = strrchr(host, ':');
- if (colon) {
- port = colon + 1;
- colon[0] = '\0';
+ pn_string_t *str = NULL;
+
+ if (url) {
+ host = pn_url_get_host(url);
+ const char *uport = pn_url_get_port(url);
+ if (uport) {
+ port = uport;
+ } else {
+ const char *scheme = pn_url_get_scheme(url);
+ if (scheme && strcmp(scheme, "amqps") == 0) {
+ port = "5671";
+ }
+ }
+ if (!pn_connection_get_user(conn)) {
+ // user did not manually set auth info
+ const char *user = pn_url_get_username(url);
+ if (user) pn_connection_set_user(conn, user);
+ const char *passwd = pn_url_get_password(url);
+ if (passwd) pn_connection_set_password(conn, passwd);
+ }
+ } else {
+ // for backward compatibility, see if the connection's hostname can be
+ // used for the remote address. See JIRA PROTON-1133
+ const char *hostname = pn_connection_get_hostname(conn);
+ if (hostname) {
+ str = pn_string(hostname);
+ host = pn_string_buffer(str);
+ // see if a port has been included in the hostname. This is not
+ // allowed by the spec, but the old reactor interface allowed it.
+ char *colon = strrchr(host, ':');
+ if (colon) {
+ port = colon + 1;
+ colon[0] = '\0';
+ }
+ }
}
- pn_socket_t sock = pn_connect(pn_reactor_io(reactor), host, port);
+
+ // host will be NULL if this connection was created via the acceptor, which
+ // creates its own transport/socket when created.
+ if (!host) {
+ return;
+ }
+
pn_transport_t *transport = pn_event_transport(event);
+ pn_socket_t sock = pn_connect(pn_reactor_io(reactor), host, port);
// invalid sockets are ignored by poll, so we need to do this manualy
if (sock == PN_INVALID_SOCKET) {
pn_condition_t *cond = pn_transport_condition(transport);
@@ -132,7 +169,7 @@ void pni_handle_bound(pn_reactor_t *reactor, pn_event_t *event) {
pn_transport_close_tail(transport);
pn_transport_close_head(transport);
}
- pn_free(str);
+ if (str) pn_free(str);
pn_reactor_selectable_transport(reactor, sock, transport);
}
@@ -264,3 +301,42 @@ pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *hand
pn_decref(connection);
return connection;
}
+
+pn_connection_t *pn_reactor_connection_to_host(pn_reactor_t *reactor,
+ const char *host,
+ const char *port,
+ pn_handler_t *handler) {
+ pn_connection_t *connection = pn_reactor_connection(reactor, handler);
+ pn_reactor_set_connection_host(reactor, connection, host, port);
+ return connection;
+}
+
+
+void pn_reactor_set_connection_host(pn_reactor_t *reactor,
+ pn_connection_t *connection,
+ const char *host,
+ const char *port)
+{
+ (void)reactor; // ignored
+ pn_url_t *url = pn_url();
+ pn_url_set_host(url, host);
+ pn_url_set_port(url, port);
+ pn_record_t *record = pn_connection_attachments(connection);
+ if (!pn_record_has(record, PNI_CONN_URL)) {
+ pn_record_def(record, PNI_CONN_URL, PN_OBJECT);
+ }
+ pn_record_set(record, PNI_CONN_URL, url);
+ pn_decref(url);
+}
+
+
+const char *pn_reactor_get_connection_address(pn_reactor_t *reactor,
+ pn_connection_t *connection)
+{
+ (void)reactor; // ignored
+ if (!connection) return NULL;
+ pn_record_t *record = pn_connection_attachments(connection);
+ pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_URL);
+ if (!url) return NULL;
+ return pn_url_str(url);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index 1e706e2..9564569 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -26,7 +26,9 @@
#include <proton/session.h>
#include <proton/link.h>
#include <proton/delivery.h>
+#include <proton/url.h>
#include <stdlib.h>
+#include <string.h>
#define assert(E) ((E) ? 0 : (abort(), 0))
@@ -193,6 +195,11 @@ static void test_reactor_connection(void) {
pn_handler_t *tch = test_handler(reactor, cevents);
pn_connection_t *connection = pn_reactor_connection(reactor, tch);
assert(connection);
+ pn_reactor_set_connection_host(reactor, connection, "127.0.0.1", "5672");
+ pn_url_t *url = pn_url_parse(pn_reactor_get_connection_address(reactor, connection));
+ assert(strcmp(pn_url_get_host(url), "127.0.0.1") == 0);
+ assert(strcmp(pn_url_get_port(url), "5672") == 0);
+ pn_decref(url);
pn_handler_t *root = pn_reactor_get_handler(reactor);
pn_list_t *revents = pn_list(PN_VOID, 0);
pn_handler_add(root, test_handler(reactor, revents));
@@ -290,7 +297,7 @@ static void client_dispatch(pn_handler_t *handler, pn_event_t *event, pn_event_t
pn_connection_t *conn = pn_event_connection(event);
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT:
- pn_connection_set_hostname(conn, "127.0.0.1:5678");
+ pn_connection_set_hostname(conn, "some.org");
pn_connection_open(conn);
break;
case PN_CONNECTION_REMOTE_OPEN:
@@ -315,7 +322,15 @@ static void test_reactor_connect(void) {
pn_handler_t *ch = pn_handler_new(client_dispatch, sizeof(client_t), NULL);
client_t *cli = cmem(ch);
cli->events = pn_list(PN_VOID, 0);
- pn_reactor_connection(reactor, ch);
+ pn_connection_t *conn = pn_reactor_connection_to_host(reactor,
+ "127.0.0.1",
+ "5678",
+ ch);
+ assert(conn);
+ pn_url_t *url = pn_url_parse(pn_reactor_get_connection_address(reactor, conn));
+ assert(strcmp(pn_url_get_host(url), "127.0.0.1") == 0);
+ assert(strcmp(pn_url_get_port(url), "5678") == 0);
+ pn_decref(url);
pn_reactor_run(reactor);
expect(srv->events, PN_CONNECTION_INIT, PN_CONNECTION_BOUND,
PN_CONNECTION_REMOTE_OPEN,
@@ -376,7 +391,6 @@ void source_dispatch(pn_handler_t *handler, pn_event_t *event, pn_event_type_t t
switch (type) {
case PN_CONNECTION_INIT:
{
- pn_connection_set_hostname(conn, "127.0.0.1:5678");
pn_session_t *ssn = pn_session(conn);
pn_link_t *snd = pn_sender(ssn, "sender");
pn_connection_open(conn);
@@ -427,7 +441,18 @@ static void test_reactor_transfer(int count, int window) {
pn_handler_t *ch = pn_handler_new(source_dispatch, sizeof(source_t), NULL);
source_t *src = source(ch);
src->remaining = count;
- pn_reactor_connection(reactor, ch);
+ pn_connection_t *conn = NULL;
+ // Using the connection's hostname to set the connection address is
+ // deprecated. Once support is dropped the conditional code can be removed:
+ #if 0
+ conn = pn_reactor_connection(reactor, ch);
+ assert(conn);
+ pn_reactor_connection_set_address(reactor, conn, "127.0.0.1", "5678");
+ #else
+ // This is deprecated:
+ conn = pn_reactor_connection(reactor, ch);
+ pn_connection_set_hostname(conn, "127.0.0.1:5678");
+ #endif
pn_reactor_run(reactor);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
index feff80b..fb4de11 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
@@ -92,6 +92,17 @@ public interface Connection extends Endpoint, ReactorChild
public String getContainer();
+ /**
+ * Set the name of the host (either fully qualified or relative) to which
+ * this connection is connecting to. This information may be used by the
+ * remote peer to determine the correct back-end service to connect the
+ * client to. This value will be sent in the Open performative.
+ *
+ * <b>Note that it is illegal to set the hostname to a numeric IP
+ * address or include a port number.</b>
+ *
+ * @param hostname the RFC1035 compliant host name.
+ */
public void setHostname(String hostname);
public String getHostname();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 9d67d49..a3307d2 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -130,15 +130,16 @@ public interface Reactor {
void setHandler(Handler handler);
/**
- * @return a set containing the child objects associated wit this reactor.
- * This will contain any active instances of: {@link Task} - created
- * using the {@link #schedule(int, Handler)} method,
+ * @return a set containing the child objects associated with this reactor.
+ * This will contain any active instances of: {@link Task} -
+ * created using the {@link #schedule(int, Handler)} method,
* {@link Connection} - created using the
- * {@link #connection(Handler)} method, {@link Acceptor} -
- * created using the {@link #acceptor(String, int)} method.
+ * {@link #connectionToHost(String, int, Handler)} method,
+ * {@link Acceptor} - created using the
+ * {@link #acceptor(String, int)} method,
* {@link #acceptor(String, int, Handler)} method, or
- * {@link Selectable} - created using the {@link #selectable()}
- * method.
+ * {@link Selectable} - created using the
+ * {@link #selectable()} method.
*/
Set<ReactorChild> children();
@@ -245,12 +246,52 @@ public interface Reactor {
* connection. Typically the host and port to connect to
* would be supplied to the connection object inside the
* logic which handles the {@link Type#CONNECTION_INIT}
- * event.
+ * event via
+ * {@link #setConnectionHost(Connection, String, int)}
* @return the newly created connection object.
+ * @deprecated Use {@link #connectionToHost(String, int, Handler)} instead.
*/
+ @Deprecated
Connection connection(Handler handler);
/**
+ * Creates a new out-bound connection to the given host and port.
+ * <p>
+ * This method will cause Reactor to set up a network connection to the
+ * host and create a Connection for it.
+ * @param host the host to connect to (e.g. "localhost")
+ * @param port the port used for the connection.
+ * @param handler a handler that is notified when events occur for the
+ * connection.
+ * @return the newly created connection object.
+ */
+ Connection connectionToHost(String host, int port, Handler handler);
+
+ /**
+ * Set the host address used by the connection
+ * <p>
+ * This method will set/change the host address used by the Reactor to
+ * create an outbound network connection for the given Connection
+ * @param c the Connection to assign the address to
+ * @param host the address of the host to connect to (e.g. "localhost")
+ * @param port the port to use for the connection.
+ */
+ void setConnectionHost(Connection c, String host, int port);
+
+ /**
+ * Get the address used by the connection
+ * <p>
+ * This method will retrieve the Connection's address as set by
+ * {@link #setConnectionHost(Connection, String, int)}.
+ * @param c the Connection
+ * @return a string containing the address in the following format:
+ * <pre>
+ * host[:port]
+ * </pre>
+ */
+ String getConnectionAddress(Connection c);
+
+ /**
* Creates a new acceptor. This is equivalent to calling:
* <pre>
* acceptor(host, port, null);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index 40eddac..5a32824 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -42,6 +42,7 @@ import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selectable.Callback;
import org.apache.qpid.proton.reactor.Selector;
+import org.apache.qpid.proton.messenger.impl.Address;
public class IOHandler extends BaseHandler {
@@ -87,20 +88,34 @@ public class IOHandler extends BaseHandler {
// pni_handle_bound(...) from connection.c
private void handleBound(Reactor reactor, Event event) {
Connection connection = event.getConnection();
+ String url = reactor.getConnectionAddress(connection);
String hostname = connection.getHostname();
- if (hostname == null || hostname.equals("")) {
- return;
- }
-
- int colonIndex = hostname.indexOf(':');
int port = 5672;
- if (colonIndex >= 0) {
+
+ if (url != null) {
+ Address address = new Address(url);
+ hostname = address.getHost();
try {
- port = Integer.parseInt(hostname.substring(colonIndex+1));
+ port = Integer.parseInt(address.getImpliedPort());
} catch(NumberFormatException nfe) {
- throw new IllegalArgumentException("Not a valid host: " + hostname, nfe);
+ throw new IllegalArgumentException("Not a valid host: " + url, nfe);
}
- hostname = hostname.substring(0, colonIndex);
+ } else if (hostname != null && !hostname.equals("")) {
+ // Backward compatibility with old code that illegally overloaded
+ // the connection's hostname
+ int colonIndex = hostname.indexOf(':');
+ if (colonIndex >= 0) {
+ try {
+ port = Integer.parseInt(hostname.substring(colonIndex+1));
+ } catch(NumberFormatException nfe) {
+ throw new IllegalArgumentException("Not a valid host: " + hostname, nfe);
+ }
+ hostname = hostname.substring(0, colonIndex);
+ }
+ } else {
+ // The transport connection already exists (like Acceptor), so no
+ // socket needed.
+ return;
}
Transport transport = event.getConnection().getTransport();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 0eb126a..d13cfbe 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -50,6 +50,7 @@ import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selectable.Callback;
import org.apache.qpid.proton.reactor.Selector;
import org.apache.qpid.proton.reactor.Task;
+import org.apache.qpid.proton.messenger.impl.Address;
public class ReactorImpl implements Reactor, Extendable {
public static final ExtendableAccessor<Event, Handler> ROOT = new ExtendableAccessor<>(Handler.class);
@@ -427,6 +428,41 @@ public class ReactorImpl implements Reactor, Extendable {
}
@Override
+ public Connection connectionToHost(String host, int port, Handler handler) {
+ Connection connection = connection(handler);
+ setConnectionHost(connection, host, port);
+ return connection;
+ }
+
+ static final String CONNECTION_ADDRESS_KEY = "pn_reactor_address";
+
+ @Override
+ public String getConnectionAddress(Connection connection) {
+ Record r = connection.attachments();
+ Address addr = r.get(CONNECTION_ADDRESS_KEY, Address.class);
+ if (addr != null) {
+ StringBuilder sb = new StringBuilder(addr.getHost());
+ if (addr.getPort() != null)
+ sb.append(":" + addr.getPort());
+ return sb.toString();
+ }
+ return null;
+ }
+
+ @Override
+ public void setConnectionHost(Connection connection,
+ String host, int port) {
+ Record r = connection.attachments();
+ Address addr = new Address();
+ addr.setHost(host);
+ if (port == 0) {
+ port = 5672;
+ }
+ addr.setPort(Integer.toString(port));
+ r.set(CONNECTION_ADDRESS_KEY, Address.class, addr);
+ }
+
+ @Override
public Acceptor acceptor(String host, int port) throws IOException {
return this.acceptor(host, port, null);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
index 10c591a..4e92cd9 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
@@ -165,6 +165,9 @@ public class ReactorTest {
Connection connection = reactor.connection(connectionHandler);
assertNotNull(connection);
assertTrue("connection should be one of the reactor's children", reactor.children().contains(connection));
+ reactor.setConnectionHost(connection, "127.0.0.1", 5672);
+ assertEquals("connection address configuration failed",
+ reactor.getConnectionAddress(connection), "127.0.0.1:5672");
TestHandler reactorHandler = new TestHandler();
reactor.getHandler().add(reactorHandler);
reactor.run();
@@ -240,7 +243,9 @@ public class ReactorTest {
@Override
public void onConnectionInit(Event event) {
super.onConnectionInit(event);
- event.getConnection().setHostname("127.0.0.1:" + listeningPort);
+ event.getReactor().setConnectionHost(event.getConnection(),
+ "127.0.0.1",
+ listeningPort);
event.getConnection().open();
}
@Override
@@ -300,17 +305,14 @@ public class ReactorTest {
private static class SourceHandler extends BaseHandler {
private int remaining;
- private final int port;
- protected SourceHandler(int count, int port) {
+ protected SourceHandler(int count) {
remaining = count;
- this.port = port;
}
@Override
public void onConnectionInit(Event event) {
Connection conn = event.getConnection();
- conn.setHostname("127.0.0.1:" + port);
Session ssn = conn.session();
Sender snd = ssn.sender("sender");
conn.open();
@@ -352,9 +354,9 @@ public class ReactorTest {
SinkHandler snk = new SinkHandler();
sh.add(snk);
- SourceHandler src = new SourceHandler(count, ((AcceptorImpl)acceptor).getPortNumber());
- reactor.connection(src);
-
+ SourceHandler src = new SourceHandler(count);
+ reactor.connectionToHost("127.0.0.1", ((AcceptorImpl)acceptor).getPortNumber(),
+ src);
reactor.run();
reactor.free();
assertEquals("Did not receive the expected number of messages", count, snk.received);
@@ -575,7 +577,6 @@ public class ReactorTest {
public void onConnectionInit(Event event) {
super.onConnectionInit(event);
Connection connection = event.getConnection();
- connection.setHostname("127.0.0.1:" + serverSocket.getLocalPort());
connection.open();
try {
serverSocket.close();
@@ -587,7 +588,7 @@ public class ReactorTest {
}
}
TestHandler connectionHandler = new ConnectionHandler();
- reactor.connection(connectionHandler);
+ reactor.connectionToHost("127.0.0.1", serverSocket.getLocalPort(), connectionHandler);
reactor.run();
reactor.free();
serverSocket.close();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/tests/java/org/apache/qpid/proton/ProtonJInterop.java
----------------------------------------------------------------------
diff --git a/tests/java/org/apache/qpid/proton/ProtonJInterop.java b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
index 31306ef..58a9c31 100644
--- a/tests/java/org/apache/qpid/proton/ProtonJInterop.java
+++ b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
@@ -44,13 +44,11 @@ public class ProtonJInterop {
private static class SendHandler extends BaseHandler {
- private final String hostname;
private int numMsgs;
private int count = 0;
private boolean result = false;
- private SendHandler(String hostname, int numMsgs) {
- this.hostname = hostname;
+ private SendHandler(int numMsgs) {
this.numMsgs = numMsgs;
add(new Handshaker());
}
@@ -58,7 +56,6 @@ public class ProtonJInterop {
@Override
public void onConnectionInit(Event event) {
Connection conn = event.getConnection();
- conn.setHostname(hostname);
Session ssn = conn.session();
Sender snd = ssn.sender("sender");
conn.open();
@@ -111,14 +108,19 @@ public class ProtonJInterop {
private static class Send extends BaseHandler {
private final SendHandler sendHandler;
+ private final String host;
+ private final int port;
- private Send(String hostname, int numMsgs) {
- sendHandler = new SendHandler(hostname, numMsgs);
+ private Send(String host, int port, int numMsgs) {
+ this.host = host;
+ this.port = port;
+ sendHandler = new SendHandler(numMsgs);
}
@Override
public void onReactorInit(Event event) {
- event.getReactor().connection(sendHandler);
+ Reactor r = event.getReactor();
+ r.connectionToHost(host, port, sendHandler);
}
public boolean getResult() {
@@ -185,7 +187,7 @@ public class ProtonJInterop {
boolean result = false;
if ("send".equalsIgnoreCase(args[0])) {
- Send send = new Send("localhost:" + port, numMsgs);
+ Send send = new Send("localhost", port, numMsgs);
Reactor r = Proton.reactor(send);
r.run();
result = send.getResult();
@@ -200,4 +202,4 @@ public class ProtonJInterop {
System.exit(1);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/tests/java/shim/creactor.py
----------------------------------------------------------------------
diff --git a/tests/java/shim/creactor.py b/tests/java/shim/creactor.py
index 95fd020..b61c1df 100644
--- a/tests/java/shim/creactor.py
+++ b/tests/java/shim/creactor.py
@@ -56,6 +56,13 @@ def pn_reactor_selectable(r):
return r.selectable()
def pn_reactor_connection(r, h):
return wrap(r.connection(h), pn_connection_wrapper)
+def pn_reactor_connection_to_host(r, host, port, h):
+ return wrap(r.connectionToHost(host, int(port), h),
+ pn_connection_wrapper)
+def pn_reactor_get_connection_address(r, c):
+ return r.getConnectionAddress(c.impl)
+def pn_reactor_set_connection_host(r, c, h, p):
+ r.setConnectionHost(c.impl, h, int(p))
def pn_reactor_acceptor(r, host, port, handler):
return r.acceptor(host, int(port), handler)
def pn_reactor_mark(r):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org