You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/12/13 22:28:59 UTC
svn commit: r1421547 - in /qpid/proton/trunk:
proton-c/bindings/python/proton.py proton-c/include/proton/engine.h
proton-c/src/engine/engine-internal.h proton-c/src/engine/engine.c
proton-c/src/messenger.c tests/proton_tests/engine.py
Author: rhs
Date: Thu Dec 13 21:28:56 2012
New Revision: 1421547
URL: http://svn.apache.org/viewvc?rev=1421547&view=rev
Log:
added condition API for sessions and links; added error reporting and redirect for messenger
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/tests/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Dec 13 21:28:56 2012
@@ -1711,6 +1711,29 @@ class Endpoint(object):
def __init__(self):
self.condition = None
+ def _update_cond(self):
+ impl = self._get_cond_impl()
+ pn_condition_clear(impl)
+ if self.condition:
+ pn_condition_set_name(impl, self.condition.name)
+ pn_condition_set_description(impl, self.condition.description)
+ info = Data(pn_condition_info(impl))
+ if self.condition.info:
+ info.put_object(self.condition.info)
+
+ @property
+ def remote_condition(self):
+ impl = self._get_remote_cond_impl()
+ if pn_condition_is_set(impl):
+ info_impl = Data(pn_condition_info(impl))
+ info_impl.rewind()
+ info_impl.next()
+ info = info_impl.get_object()
+ info_impl.rewind()
+ return Condition(pn_condition_get_name(impl),
+ pn_condition_get_description(impl),
+ info)
+
class Condition:
def __init__(self, name, description=None, info=None):
@@ -1758,18 +1781,11 @@ class Connection(Endpoint):
else:
return err
- @property
- def remote_condition(self):
- impl = pn_connection_remote_condition(self._conn)
- if pn_condition_is_set(impl):
- info_impl = Data(pn_condition_info(impl))
- info_impl.rewind()
- info_impl.next()
- info = info_impl.get_object()
- info_impl.rewind()
- return Condition(pn_condition_get_name(impl),
- pn_condition_get_description(impl),
- info)
+ def _get_cond_impl(self):
+ return pn_connection_condition(self._conn)
+
+ def _get_remote_cond_impl(self):
+ return pn_connection_remote_condition(self._conn)
def _get_container(self):
return pn_connection_get_container(self._conn)
@@ -1813,14 +1829,7 @@ class Connection(Endpoint):
pn_connection_open(self._conn)
def close(self):
- if self.condition:
- impl = pn_connection_condition(self._conn)
- pn_condition_clear(impl)
- pn_condition_set_name(impl, self.condition.name)
- pn_condition_set_description(impl, self.condition.description)
- info = Data(pn_condition_info(impl))
- if self.condition.info:
- info.put_object(self.condition.info)
+ self._update_cond()
pn_connection_close(self._conn)
@property
@@ -1868,10 +1877,17 @@ class Session(Endpoint):
pn_session_free(self._ssn)
del self._ssn
+ def _get_cond_impl(self):
+ return pn_session_condition(self._ssn)
+
+ def _get_remote_cond_impl(self):
+ return pn_session_remote_condition(self._ssn)
+
def open(self):
pn_session_open(self._ssn)
def close(self):
+ self._update_cond()
pn_session_close(self._ssn)
@property
@@ -1922,10 +1938,17 @@ class Link(Endpoint):
else:
return err
+ def _get_cond_impl(self):
+ return pn_link_condition(self._link)
+
+ def _get_remote_cond_impl(self):
+ return pn_link_remote_condition(self._link)
+
def open(self):
pn_link_open(self._link)
def close(self):
+ self._update_cond()
pn_link_close(self._link)
@property
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Thu Dec 13 21:28:56 2012
@@ -402,6 +402,12 @@ void pn_delivery_set_context(pn_delivery
pn_condition_t *pn_connection_condition(pn_connection_t *connection);
pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection);
+pn_condition_t *pn_session_condition(pn_session_t *session);
+pn_condition_t *pn_session_remote_condition(pn_session_t *session);
+
+pn_condition_t *pn_link_condition(pn_link_t *link);
+pn_condition_t *pn_link_remote_condition(pn_link_t *link);
+
bool pn_condition_is_set(pn_condition_t *condition);
void pn_condition_clear(pn_condition_t *condition);
@@ -414,6 +420,8 @@ int pn_condition_set_description(pn_cond
pn_data_t *pn_condition_info(pn_condition_t *condition);
bool pn_condition_is_redirect(pn_condition_t *condition);
+const char *pn_condition_redirect_host(pn_condition_t *condition);
+int pn_condition_redirect_port(pn_condition_t *condition);
#ifdef __cplusplus
}
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Dec 13 21:28:56 2012
@@ -28,7 +28,7 @@
#include "../dispatcher/dispatcher.h"
#include "../util.h"
-typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER, TRANSPORT} pn_endpoint_type_t;
+typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpoint_type_t;
typedef struct pn_endpoint_t pn_endpoint_t;
@@ -46,6 +46,7 @@ struct pn_endpoint_t {
pn_state_t state;
pn_error_t *error;
pn_condition_t condition;
+ pn_condition_t remote_condition;
pn_endpoint_t *endpoint_next;
pn_endpoint_t *endpoint_prev;
pn_endpoint_t *transport_next;
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Dec 13 21:28:56 2012
@@ -136,8 +136,6 @@ pn_connection_t *pn_ep_get_connection(pn
case SENDER:
case RECEIVER:
return ((pn_link_t *) endpoint)->session->connection;
- case TRANSPORT:
- return ((pn_transport_t *) endpoint)->connection;
}
return NULL;
@@ -159,28 +157,6 @@ void pn_close(pn_endpoint_t *endpoint)
pn_modified(pn_ep_get_connection(endpoint), endpoint);
}
-void pn_free(pn_endpoint_t *endpoint)
-{
- switch (endpoint->type)
- {
- case CONNECTION:
- pn_connection_free((pn_connection_t *)endpoint);
- break;
- case TRANSPORT:
- pn_transport_free((pn_transport_t *)endpoint);
- break;
- case SESSION:
- pn_session_free((pn_session_t *)endpoint);
- break;
- case SENDER:
- pn_link_free((pn_link_t *)endpoint);
- break;
- case RECEIVER:
- pn_link_free((pn_link_t *)endpoint);
- break;
- }
-}
-
void pn_connection_reset(pn_connection_t *connection)
{
assert(connection);
@@ -425,6 +401,7 @@ void pn_endpoint_init(pn_endpoint_t *end
endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
endpoint->error = pn_error();
pn_condition_init(&endpoint->condition);
+ pn_condition_init(&endpoint->remote_condition);
endpoint->endpoint_next = NULL;
endpoint->endpoint_prev = NULL;
endpoint->transport_next = NULL;
@@ -437,6 +414,7 @@ void pn_endpoint_init(pn_endpoint_t *end
void pn_endpoint_tini(pn_endpoint_t *endpoint)
{
pn_error_free(endpoint->error);
+ pn_condition_tini(&endpoint->remote_condition);
pn_condition_tini(&endpoint->condition);
}
@@ -899,6 +877,7 @@ int pn_transport_unbind(pn_transport_t *
pn_endpoint_t *endpoint = conn->endpoint_head;
while (endpoint) {
+ pn_condition_clear(&endpoint->remote_condition);
pn_modified(conn, endpoint);
endpoint = endpoint->endpoint_next;
}
@@ -1368,7 +1347,10 @@ void pn_delivery_settle(pn_delivery_t *d
int pn_post_close(pn_transport_t *transport, const char *condition)
{
- pn_condition_t *cond = pn_connection_condition(transport->connection);
+ pn_condition_t *cond = NULL;
+ if (transport->connection) {
+ cond = pn_connection_condition(transport->connection);
+ }
const char *description = NULL;
pn_data_t *info = NULL;
if (!condition && pn_condition_is_set(cond)) {
@@ -1759,6 +1741,20 @@ int pn_do_disposition(pn_dispatcher_t *d
return 0;
}
+static int pn_scan_error(pn_dispatcher_t *disp, pn_condition_t *condition, bool detach)
+{
+ pn_bytes_t cond;
+ pn_bytes_t desc;
+ pn_condition_clear(condition);
+ int err = pn_scan_args(disp, detach ? "D.[..D.[sSC]" : "D.[D.[sSC]",
+ &cond, &desc, condition->info);
+ if (err) return err;
+ strncat(condition->name, cond.start, cond.size);
+ strncat(condition->description, desc.start, desc.size);
+ pn_data_rewind(condition->info);
+ return 0;
+}
+
int pn_do_detach(pn_dispatcher_t *disp)
{
pn_transport_t *transport = (pn_transport_t *) disp->context;
@@ -1774,6 +1770,9 @@ int pn_do_detach(pn_dispatcher_t *disp)
pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
pn_link_t *link = link_state->link;
+ err = pn_scan_error(disp, &link->endpoint.remote_condition, true);
+ if (err) return err;
+
link_state->remote_handle = -2;
if (closed)
@@ -1791,30 +1790,18 @@ int pn_do_end(pn_dispatcher_t *disp)
pn_transport_t *transport = (pn_transport_t *) disp->context;
pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
pn_session_t *session = ssn_state->session;
-
+ int err = pn_scan_error(disp, &session->endpoint.remote_condition, false);
+ if (err) return err;
ssn_state->remote_channel = -2;
PN_SET_REMOTE(session->endpoint.state, PN_REMOTE_CLOSED);
return 0;
}
-static int pn_scan_error(pn_dispatcher_t *disp, pn_condition_t *condition)
-{
- pn_bytes_t cond;
- pn_bytes_t desc;
- pn_condition_clear(condition);
- int err = pn_scan_args(disp, "D.[D.[sSC]", &cond, &desc, condition->info);
- if (err) return err;
- strncat(condition->name, cond.start, cond.size);
- strncat(condition->description, desc.start, desc.size);
- pn_data_rewind(condition->info);
- return 0;
-}
-
int pn_do_close(pn_dispatcher_t *disp)
{
pn_transport_t *transport = (pn_transport_t *) disp->context;
pn_connection_t *conn = transport->connection;
- int err = pn_scan_error(disp, &transport->remote_condition);
+ int err = pn_scan_error(disp, &transport->remote_condition, false);
if (err) return err;
transport->close_rcvd = true;
PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
@@ -2362,8 +2349,19 @@ int pn_process_link_teardown(pn_transpor
(int32_t) state->remote_handle != -2 &&
(int16_t) ssn_state->remote_channel != -2 &&
!transport->close_rcvd) return 0;
- int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io]", DETACH,
- state->local_handle, true);
+
+ const char *name = NULL;
+ const char *description = NULL;
+ pn_data_t *info = NULL;
+
+ if (pn_condition_is_set(&endpoint->condition)) {
+ name = pn_condition_get_name(&endpoint->condition);
+ description = pn_condition_get_description(&endpoint->condition);
+ info = pn_condition_info(&endpoint->condition);
+ }
+
+ int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH,
+ state->local_handle, true, (bool) name, ERROR, name, description, info);
if (err) return err;
state->local_handle = -2;
}
@@ -2410,7 +2408,19 @@ int pn_process_ssn_teardown(pn_transport
&& !transport->close_sent)
{
if (pn_pointful_buffering(transport, session)) return 0;
- int err = pn_post_frame(transport->disp, state->local_channel, "DL[]", END);
+
+ const char *name = NULL;
+ const char *description = NULL;
+ pn_data_t *info = NULL;
+
+ if (pn_condition_is_set(&endpoint->condition)) {
+ name = pn_condition_get_name(&endpoint->condition);
+ description = pn_condition_get_description(&endpoint->condition);
+ info = pn_condition_info(&endpoint->condition);
+ }
+
+ int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END,
+ (bool) name, ERROR, name, description, info);
if (err) return err;
state->local_channel = -2;
}
@@ -2765,15 +2775,41 @@ bool pn_delivery_partial(pn_delivery_t *
pn_condition_t *pn_connection_condition(pn_connection_t *connection)
{
+ assert(connection);
return &connection->endpoint.condition;
}
pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection)
{
+ assert(connection);
pn_transport_t *transport = connection->transport;
return transport ? &transport->remote_condition : NULL;
}
+pn_condition_t *pn_session_condition(pn_session_t *session)
+{
+ assert(session);
+ return &session->endpoint.condition;
+}
+
+pn_condition_t *pn_session_remote_condition(pn_session_t *session)
+{
+ assert(session);
+ return &session->endpoint.remote_condition;
+}
+
+pn_condition_t *pn_link_condition(pn_link_t *link)
+{
+ assert(link);
+ return &link->endpoint.condition;
+}
+
+pn_condition_t *pn_link_remote_condition(pn_link_t *link)
+{
+ assert(link);
+ return &link->endpoint.remote_condition;
+}
+
bool pn_condition_is_set(pn_condition_t *condition)
{
return condition && condition->name[0];
@@ -2838,3 +2874,35 @@ bool pn_condition_is_redirect(pn_conditi
const char *name = pn_condition_get_name(condition);
return name && !strcmp(name, "amqp:connection:redirect");
}
+
+const char *pn_condition_redirect_host(pn_condition_t *condition)
+{
+ pn_data_t *data = pn_condition_info(condition);
+ pn_data_rewind(data);
+ pn_data_next(data);
+ pn_data_enter(data);
+ pn_data_next(data);
+ pn_data_next(data);
+ pn_data_next(data);
+ pn_data_next(data);
+ pn_bytes_t host = pn_data_get_bytes(data);
+ pn_data_rewind(data);
+ return host.start;
+}
+
+int pn_condition_redirect_port(pn_condition_t *condition)
+{
+ pn_data_t *data = pn_condition_info(condition);
+ pn_data_rewind(data);
+ pn_data_next(data);
+ pn_data_enter(data);
+ pn_data_next(data);
+ pn_data_next(data);
+ pn_data_next(data);
+ pn_data_next(data);
+ pn_data_next(data);
+ pn_data_next(data);
+ int port = pn_data_get_int(data);
+ pn_data_rewind(data);
+ return port;
+}
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Thu Dec 13 21:28:56 2012
@@ -67,6 +67,16 @@ struct pn_subscription_t {
void *context;
};
+typedef struct {
+ int refcount;
+ char *address;
+ char *scheme;
+ char *user;
+ char *pass;
+ char *host;
+ char *port;
+} pn_connection_ctx_t;
+
void pn_queue_init(pn_queue_t *queue)
{
queue->capacity = 1024;
@@ -110,18 +120,24 @@ void pn_queue_gc(pn_queue_t *queue)
queue->lwm += delta;
}
-void pn_incref(pn_connection_t *conn)
+static void pn_incref(pn_connection_t *conn)
{
- intptr_t refcount = (intptr_t) pn_connection_get_context(conn);
- pn_connection_set_context(conn, (void *) (refcount + 1));
+ pn_connection_ctx_t *ctx = pn_connection_get_context(conn);
+ ctx->refcount++;
}
-void pn_decref(pn_connection_t *conn)
+static void pn_decref(pn_connection_t *conn)
{
- intptr_t refcount = (intptr_t) pn_connection_get_context(conn);
- pn_connection_set_context(conn, (void *) (refcount - 1));
- if (refcount == 1) {
+ pn_connection_ctx_t *ctx = pn_connection_get_context(conn);
+ ctx->refcount--;
+ if (ctx->refcount == 0) {
pn_connection_free(conn);
+ free(ctx->scheme);
+ free(ctx->user);
+ free(ctx->pass);
+ free(ctx->host);
+ free(ctx->port);
+ free(ctx);
}
}
@@ -391,9 +407,56 @@ void pn_messenger_flow(pn_messenger_t *m
}
}
+static void pn_transport_config(pn_messenger_t *messenger,
+ pn_connector_t *connector,
+ pn_connection_t *connection)
+{
+ pn_connection_ctx_t *ctx = pn_connection_get_context(connection);
+ pn_transport_t *transport = pn_connector_transport(connector);
+ if (ctx->scheme && !strcmp(ctx->scheme, "amqps")) {
+ pn_ssl_t *ssl = pn_ssl(transport);
+ pn_ssl_init(ssl, PN_SSL_MODE_CLIENT);
+ if (messenger->certificate && messenger->private_key) {
+ pn_ssl_set_credentials(ssl, messenger->certificate,
+ messenger->private_key,
+ messenger->password);
+ }
+ if (messenger->trusted_certificates) {
+ pn_ssl_set_trusted_ca_db(ssl, messenger->trusted_certificates);
+ pn_ssl_set_peer_authentication(ssl, PN_SSL_VERIFY_PEER, NULL);
+ } else {
+ pn_ssl_set_peer_authentication(ssl, PN_SSL_ANONYMOUS_PEER, NULL);
+ }
+ }
+
+ pn_sasl_t *sasl = pn_sasl(transport);
+ if (ctx->user) {
+ pn_sasl_plain(sasl, ctx->user, ctx->pass);
+ } else {
+ pn_sasl_mechanisms(sasl, "ANONYMOUS");
+ pn_sasl_client(sasl);
+ }
+}
+
+static void pn_condition_report(const char *pfx, pn_condition_t *condition)
+{
+ if (pn_condition_is_redirect(condition)) {
+ fprintf(stderr, "%s NOTICE (%s) redirecting to %s:%i\n",
+ pfx,
+ pn_condition_get_name(condition),
+ pn_condition_redirect_host(condition),
+ pn_condition_redirect_port(condition));
+ } else if (pn_condition_is_set(condition)) {
+ fprintf(stderr, "%s ERROR (%s) %s\n",
+ pfx,
+ pn_condition_get_name(condition),
+ pn_condition_get_description(condition));
+ }
+}
+
void pn_messenger_endpoints(pn_messenger_t *messenger, pn_connection_t *conn, pn_connector_t *ctor)
{
- if (pn_connection_state(conn) | PN_LOCAL_UNINIT) {
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
pn_connection_open(conn);
}
@@ -435,18 +498,36 @@ void pn_messenger_endpoints(pn_messenger
ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (ssn) {
+ pn_condition_report("SESSION", pn_session_remote_condition(ssn));
pn_session_close(ssn);
ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}
link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (link) {
+ pn_condition_report("LINK", pn_link_remote_condition(link));
pn_link_close(link);
link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}
if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+ pn_condition_t *condition = pn_connection_remote_condition(conn);
+ pn_condition_report("CONNECTION", condition);
pn_connection_close(conn);
+ if (pn_condition_is_redirect(condition)) {
+ const char *host = pn_condition_redirect_host(condition);
+ char buf[1024];
+ sprintf(buf, "%i", pn_condition_redirect_port(condition));
+
+ pn_connector_process(ctor);
+ pn_connector_set_connection(ctor, NULL);
+ pn_driver_t *driver = messenger->driver;
+ pn_connector_t *connector = pn_connector(driver, host, buf, NULL);
+ pn_transport_unbind(pn_connector_transport(ctor));
+ pn_connection_reset(conn);
+ pn_transport_config(messenger, connector, conn);
+ pn_connector_set_connection(connector, conn);
+ }
}
}
@@ -463,6 +544,31 @@ void pn_messenger_reclaim(pn_messenger_t
}
}
+
+pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
+ char *scheme,
+ char *user,
+ char *pass,
+ char *host,
+ char *port)
+{
+ pn_connection_t *connection = pn_connection();
+ if (!connection) return NULL;
+ pn_connection_ctx_t *ctx = malloc(sizeof(pn_connection_ctx_t));
+ ctx->refcount = 0;
+ ctx->scheme = pn_strdup(scheme);
+ ctx->user = pn_strdup(user);
+ ctx->pass = pn_strdup(pass);
+ ctx->host = pn_strdup(host);
+ ctx->port = pn_strdup(port);
+ pn_connection_set_context(connection, ctx);
+ pn_incref(connection);
+
+ pn_connection_set_container(connection, messenger->name);
+ pn_connection_set_hostname(connection, ctx->host);
+ return connection;
+}
+
int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
{
pn_connector_t *ctor = pn_connector_head(messenger->driver);
@@ -504,9 +610,8 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_sasl_mechanisms(sasl, "ANONYMOUS");
pn_sasl_server(sasl);
pn_sasl_done(sasl, PN_SASL_OK);
- pn_connection_t *conn = pn_connection();
- pn_incref(conn);
- pn_connection_set_container(conn, messenger->name);
+ pn_connection_t *conn =
+ pn_messenger_connection(messenger, scheme, NULL, NULL, NULL, NULL);
pn_connector_set_connection(c, conn);
}
@@ -517,9 +622,11 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_messenger_endpoints(messenger, conn, c);
if (pn_connector_closed(c)) {
pn_connector_free(c);
- pn_messenger_reclaim(messenger, conn);
- pn_decref(conn);
- pn_messenger_flow(messenger);
+ if (conn) {
+ pn_messenger_reclaim(messenger, conn);
+ pn_decref(conn);
+ pn_messenger_flow(messenger);
+ }
} else {
pn_connector_process(c);
}
@@ -589,7 +696,6 @@ static const char *default_port(const ch
else
return "5672";
}
-
pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, char *address, char **name)
{
char domain[strlen(address) + 1];
@@ -615,10 +721,16 @@ pn_connection_t *pn_messenger_resolve(pn
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
pn_connection_t *connection = pn_connector_connection(ctor);
+ pn_connection_ctx_t *ctx = pn_connection_get_context(connection);
+ if (pn_streq(scheme, ctx->scheme) && pn_streq(user, ctx->user) &&
+ pn_streq(pass, ctx->pass) && pn_streq(host, ctx->host) &&
+ pn_streq(port, ctx->port)) {
+ return connection;
+ }
const char *container = pn_connection_remote_container(connection);
- const char *hostname = pn_connection_get_hostname(connection);
- if (pn_streq(container, domain) || pn_streq(hostname, domain))
+ if (pn_streq(container, domain)) {
return connection;
+ }
ctor = pn_connector_next(ctor);
}
@@ -626,34 +738,9 @@ pn_connection_t *pn_messenger_resolve(pn
port ? port : default_port(scheme),
NULL);
if (!connector) return NULL;
- pn_transport_t *transport = pn_connector_transport(connector);
- if (scheme && !strcmp(scheme, "amqps")) {
- pn_ssl_t *ssl = pn_ssl(transport);
- pn_ssl_init(ssl, PN_SSL_MODE_CLIENT);
- if (messenger->certificate && messenger->private_key) {
- pn_ssl_set_credentials(ssl, messenger->certificate,
- messenger->private_key,
- messenger->password);
- }
- if (messenger->trusted_certificates) {
- pn_ssl_set_trusted_ca_db(ssl, messenger->trusted_certificates);
- pn_ssl_set_peer_authentication(ssl, PN_SSL_VERIFY_PEER, NULL);
- } else {
- pn_ssl_set_peer_authentication(ssl, PN_SSL_ANONYMOUS_PEER, NULL);
- }
- }
-
- pn_sasl_t *sasl = pn_sasl(transport);
- if (user) {
- pn_sasl_plain(sasl, user, pass);
- } else {
- pn_sasl_mechanisms(sasl, "ANONYMOUS");
- pn_sasl_client(sasl);
- }
- pn_connection_t *connection = pn_connection();
- pn_incref(connection);
- pn_connection_set_container(connection, messenger->name);
- pn_connection_set_hostname(connection, domain);
+ pn_connection_t *connection =
+ pn_messenger_connection(messenger, scheme, user, pass, host, port);
+ pn_transport_config(messenger, connector, connection);
pn_connection_open(connection);
pn_connector_set_connection(connector, connection);
@@ -1132,4 +1219,3 @@ int pn_messenger_incoming(pn_messenger_t
{
return pn_messenger_queued(messenger, false);
}
-
Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Thu Dec 13 21:28:56 2012
@@ -200,7 +200,7 @@ class ConnectionTest(Test):
assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
rcond = self.c2.remote_condition
- assert rcond == cond
+ assert rcond == cond, (rcond, cond)
class SessionTest(Test):
@@ -290,6 +290,28 @@ class SessionTest(Test):
self.ssn.close()
self.pump()
+ def test_condition(self):
+ self.ssn.open()
+ self.pump()
+ ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT)
+ assert ssn != None
+ ssn.open()
+ self.pump()
+
+ assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+
+ cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"})
+ self.ssn.condition = cond
+ self.ssn.close()
+
+ self.pump()
+
+ assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
+
+ rcond = ssn.remote_condition
+ assert rcond == cond, (rcond, cond)
class LinkTest(Test):
@@ -463,6 +485,26 @@ class LinkTest(Test):
timeout=7,
capabilities=[]))
+ def test_condition(self):
+ self.snd.open()
+ self.rcv.open()
+ self.pump()
+
+ assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+
+ cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"})
+ self.snd.condition = cond
+ self.snd.close()
+
+ self.pump()
+
+ assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
+
+ rcond = self.rcv.remote_condition
+ assert rcond == cond, (rcond, cond)
+
class TerminusConfig:
def __init__(self, address=None, timeout=None, durability=None, filter=None,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org