You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/12/13 22:28:59 UTC

svn commit: r1421547 - in /qpid/proton/trunk: proton-c/bindings/python/proton.py proton-c/include/proton/engine.h proton-c/src/engine/engine-internal.h proton-c/src/engine/engine.c proton-c/src/messenger.c tests/proton_tests/engine.py

Author: rhs
Date: Thu Dec 13 21:28:56 2012
New Revision: 1421547

URL: http://svn.apache.org/viewvc?rev=1421547&view=rev
Log:
added condition API for sessions and links; added error reporting and redirect for messenger

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/tests/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Dec 13 21:28:56 2012
@@ -1711,6 +1711,29 @@ class Endpoint(object):
   def __init__(self):
     self.condition = None
 
+  def _update_cond(self):
+    impl = self._get_cond_impl()
+    pn_condition_clear(impl)
+    if self.condition:
+      pn_condition_set_name(impl, self.condition.name)
+      pn_condition_set_description(impl, self.condition.description)
+      info = Data(pn_condition_info(impl))
+      if self.condition.info:
+        info.put_object(self.condition.info)
+
+  @property
+  def remote_condition(self):
+    impl = self._get_remote_cond_impl()
+    if pn_condition_is_set(impl):
+      info_impl = Data(pn_condition_info(impl))
+      info_impl.rewind()
+      info_impl.next()
+      info = info_impl.get_object()
+      info_impl.rewind()
+      return Condition(pn_condition_get_name(impl),
+                       pn_condition_get_description(impl),
+                       info)
+
 class Condition:
 
   def __init__(self, name, description=None, info=None):
@@ -1758,18 +1781,11 @@ class Connection(Endpoint):
     else:
       return err
 
-  @property
-  def remote_condition(self):
-    impl = pn_connection_remote_condition(self._conn)
-    if pn_condition_is_set(impl):
-      info_impl = Data(pn_condition_info(impl))
-      info_impl.rewind()
-      info_impl.next()
-      info = info_impl.get_object()
-      info_impl.rewind()
-      return Condition(pn_condition_get_name(impl),
-                       pn_condition_get_description(impl),
-                       info)
+  def _get_cond_impl(self):
+    return pn_connection_condition(self._conn)
+
+  def _get_remote_cond_impl(self):
+    return pn_connection_remote_condition(self._conn)
 
   def _get_container(self):
     return pn_connection_get_container(self._conn)
@@ -1813,14 +1829,7 @@ class Connection(Endpoint):
     pn_connection_open(self._conn)
 
   def close(self):
-    if self.condition:
-      impl = pn_connection_condition(self._conn)
-      pn_condition_clear(impl)
-      pn_condition_set_name(impl, self.condition.name)
-      pn_condition_set_description(impl, self.condition.description)
-      info = Data(pn_condition_info(impl))
-      if self.condition.info:
-        info.put_object(self.condition.info)
+    self._update_cond()
     pn_connection_close(self._conn)
 
   @property
@@ -1868,10 +1877,17 @@ class Session(Endpoint):
       pn_session_free(self._ssn)
       del self._ssn
 
+  def _get_cond_impl(self):
+    return pn_session_condition(self._ssn)
+
+  def _get_remote_cond_impl(self):
+    return pn_session_remote_condition(self._ssn)
+
   def open(self):
     pn_session_open(self._ssn)
 
   def close(self):
+    self._update_cond()
     pn_session_close(self._ssn)
 
   @property
@@ -1922,10 +1938,17 @@ class Link(Endpoint):
     else:
       return err
 
+  def _get_cond_impl(self):
+    return pn_link_condition(self._link)
+
+  def _get_remote_cond_impl(self):
+    return pn_link_remote_condition(self._link)
+
   def open(self):
     pn_link_open(self._link)
 
   def close(self):
+    self._update_cond()
     pn_link_close(self._link)
 
   @property

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Thu Dec 13 21:28:56 2012
@@ -402,6 +402,12 @@ void pn_delivery_set_context(pn_delivery
 pn_condition_t *pn_connection_condition(pn_connection_t *connection);
 pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection);
 
+pn_condition_t *pn_session_condition(pn_session_t *session);
+pn_condition_t *pn_session_remote_condition(pn_session_t *session);
+
+pn_condition_t *pn_link_condition(pn_link_t *link);
+pn_condition_t *pn_link_remote_condition(pn_link_t *link);
+
 bool pn_condition_is_set(pn_condition_t *condition);
 void pn_condition_clear(pn_condition_t *condition);
 
@@ -414,6 +420,8 @@ int pn_condition_set_description(pn_cond
 pn_data_t *pn_condition_info(pn_condition_t *condition);
 
 bool pn_condition_is_redirect(pn_condition_t *condition);
+const char *pn_condition_redirect_host(pn_condition_t *condition);
+int pn_condition_redirect_port(pn_condition_t *condition);
 
 #ifdef __cplusplus
 }

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Dec 13 21:28:56 2012
@@ -28,7 +28,7 @@
 #include "../dispatcher/dispatcher.h"
 #include "../util.h"
 
-typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER, TRANSPORT} pn_endpoint_type_t;
+typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpoint_type_t;
 
 typedef struct pn_endpoint_t pn_endpoint_t;
 
@@ -46,6 +46,7 @@ struct pn_endpoint_t {
   pn_state_t state;
   pn_error_t *error;
   pn_condition_t condition;
+  pn_condition_t remote_condition;
   pn_endpoint_t *endpoint_next;
   pn_endpoint_t *endpoint_prev;
   pn_endpoint_t *transport_next;

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Dec 13 21:28:56 2012
@@ -136,8 +136,6 @@ pn_connection_t *pn_ep_get_connection(pn
   case SENDER:
   case RECEIVER:
     return ((pn_link_t *) endpoint)->session->connection;
-  case TRANSPORT:
-    return ((pn_transport_t *) endpoint)->connection;
   }
 
   return NULL;
@@ -159,28 +157,6 @@ void pn_close(pn_endpoint_t *endpoint)
   pn_modified(pn_ep_get_connection(endpoint), endpoint);
 }
 
-void pn_free(pn_endpoint_t *endpoint)
-{
-  switch (endpoint->type)
-  {
-  case CONNECTION:
-    pn_connection_free((pn_connection_t *)endpoint);
-    break;
-  case TRANSPORT:
-    pn_transport_free((pn_transport_t *)endpoint);
-    break;
-  case SESSION:
-    pn_session_free((pn_session_t *)endpoint);
-    break;
-  case SENDER:
-    pn_link_free((pn_link_t *)endpoint);
-    break;
-  case RECEIVER:
-    pn_link_free((pn_link_t *)endpoint);
-    break;
-  }
-}
-
 void pn_connection_reset(pn_connection_t *connection)
 {
   assert(connection);
@@ -425,6 +401,7 @@ void pn_endpoint_init(pn_endpoint_t *end
   endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
   endpoint->error = pn_error();
   pn_condition_init(&endpoint->condition);
+  pn_condition_init(&endpoint->remote_condition);
   endpoint->endpoint_next = NULL;
   endpoint->endpoint_prev = NULL;
   endpoint->transport_next = NULL;
@@ -437,6 +414,7 @@ void pn_endpoint_init(pn_endpoint_t *end
 void pn_endpoint_tini(pn_endpoint_t *endpoint)
 {
   pn_error_free(endpoint->error);
+  pn_condition_tini(&endpoint->remote_condition);
   pn_condition_tini(&endpoint->condition);
 }
 
@@ -899,6 +877,7 @@ int pn_transport_unbind(pn_transport_t *
 
   pn_endpoint_t *endpoint = conn->endpoint_head;
   while (endpoint) {
+    pn_condition_clear(&endpoint->remote_condition);
     pn_modified(conn, endpoint);
     endpoint = endpoint->endpoint_next;
   }
@@ -1368,7 +1347,10 @@ void pn_delivery_settle(pn_delivery_t *d
 
 int pn_post_close(pn_transport_t *transport, const char *condition)
 {
-  pn_condition_t *cond = pn_connection_condition(transport->connection);
+  pn_condition_t *cond = NULL;
+  if (transport->connection) {
+    cond = pn_connection_condition(transport->connection);
+  }
   const char *description = NULL;
   pn_data_t *info = NULL;
   if (!condition && pn_condition_is_set(cond)) {
@@ -1759,6 +1741,20 @@ int pn_do_disposition(pn_dispatcher_t *d
   return 0;
 }
 
+static int pn_scan_error(pn_dispatcher_t *disp, pn_condition_t *condition, bool detach)
+{
+  pn_bytes_t cond;
+  pn_bytes_t desc;
+  pn_condition_clear(condition);
+  int err = pn_scan_args(disp, detach ? "D.[..D.[sSC]" : "D.[D.[sSC]",
+                         &cond, &desc, condition->info);
+  if (err) return err;
+  strncat(condition->name, cond.start, cond.size);
+  strncat(condition->description, desc.start, desc.size);
+  pn_data_rewind(condition->info);
+  return 0;
+}
+
 int pn_do_detach(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = (pn_transport_t *) disp->context;
@@ -1774,6 +1770,9 @@ int pn_do_detach(pn_dispatcher_t *disp)
   pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
   pn_link_t *link = link_state->link;
 
+  err = pn_scan_error(disp, &link->endpoint.remote_condition, true);
+  if (err) return err;
+
   link_state->remote_handle = -2;
 
   if (closed)
@@ -1791,30 +1790,18 @@ int pn_do_end(pn_dispatcher_t *disp)
   pn_transport_t *transport = (pn_transport_t *) disp->context;
   pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
   pn_session_t *session = ssn_state->session;
-
+  int err = pn_scan_error(disp, &session->endpoint.remote_condition, false);
+  if (err) return err;
   ssn_state->remote_channel = -2;
   PN_SET_REMOTE(session->endpoint.state, PN_REMOTE_CLOSED);
   return 0;
 }
 
-static int pn_scan_error(pn_dispatcher_t *disp, pn_condition_t *condition)
-{
-  pn_bytes_t cond;
-  pn_bytes_t desc;
-  pn_condition_clear(condition);
-  int err = pn_scan_args(disp, "D.[D.[sSC]", &cond, &desc, condition->info);
-  if (err) return err;
-  strncat(condition->name, cond.start, cond.size);
-  strncat(condition->description, desc.start, desc.size);
-  pn_data_rewind(condition->info);
-  return 0;
-}
-
 int pn_do_close(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = (pn_transport_t *) disp->context;
   pn_connection_t *conn = transport->connection;
-  int err = pn_scan_error(disp, &transport->remote_condition);
+  int err = pn_scan_error(disp, &transport->remote_condition, false);
   if (err) return err;
   transport->close_rcvd = true;
   PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
@@ -2362,8 +2349,19 @@ int pn_process_link_teardown(pn_transpor
           (int32_t) state->remote_handle != -2 &&
           (int16_t) ssn_state->remote_channel != -2 &&
           !transport->close_rcvd) return 0;
-      int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io]", DETACH,
-                              state->local_handle, true);
+
+      const char *name = NULL;
+      const char *description = NULL;
+      pn_data_t *info = NULL;
+
+      if (pn_condition_is_set(&endpoint->condition)) {
+        name = pn_condition_get_name(&endpoint->condition);
+        description = pn_condition_get_description(&endpoint->condition);
+        info = pn_condition_info(&endpoint->condition);
+      }
+
+      int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH,
+                              state->local_handle, true, (bool) name, ERROR, name, description, info);
       if (err) return err;
       state->local_handle = -2;
     }
@@ -2410,7 +2408,19 @@ int pn_process_ssn_teardown(pn_transport
         && !transport->close_sent)
     {
       if (pn_pointful_buffering(transport, session)) return 0;
-      int err = pn_post_frame(transport->disp, state->local_channel, "DL[]", END);
+
+      const char *name = NULL;
+      const char *description = NULL;
+      pn_data_t *info = NULL;
+
+      if (pn_condition_is_set(&endpoint->condition)) {
+        name = pn_condition_get_name(&endpoint->condition);
+        description = pn_condition_get_description(&endpoint->condition);
+        info = pn_condition_info(&endpoint->condition);
+      }
+
+      int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END,
+                              (bool) name, ERROR, name, description, info);
       if (err) return err;
       state->local_channel = -2;
     }
@@ -2765,15 +2775,41 @@ bool pn_delivery_partial(pn_delivery_t *
 
 pn_condition_t *pn_connection_condition(pn_connection_t *connection)
 {
+  assert(connection);
   return &connection->endpoint.condition;
 }
 
 pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection)
 {
+  assert(connection);
   pn_transport_t *transport = connection->transport;
   return transport ? &transport->remote_condition : NULL;
 }
 
+pn_condition_t *pn_session_condition(pn_session_t *session)
+{
+  assert(session);
+  return &session->endpoint.condition;
+}
+
+pn_condition_t *pn_session_remote_condition(pn_session_t *session)
+{
+  assert(session);
+  return &session->endpoint.remote_condition;
+}
+
+pn_condition_t *pn_link_condition(pn_link_t *link)
+{
+  assert(link);
+  return &link->endpoint.condition;
+}
+
+pn_condition_t *pn_link_remote_condition(pn_link_t *link)
+{
+  assert(link);
+  return &link->endpoint.remote_condition;
+}
+
 bool pn_condition_is_set(pn_condition_t *condition)
 {
   return condition && condition->name[0];
@@ -2838,3 +2874,35 @@ bool pn_condition_is_redirect(pn_conditi
   const char *name = pn_condition_get_name(condition);
   return name && !strcmp(name, "amqp:connection:redirect");
 }
+
+const char *pn_condition_redirect_host(pn_condition_t *condition)
+{
+  pn_data_t *data = pn_condition_info(condition);
+  pn_data_rewind(data);
+  pn_data_next(data);
+  pn_data_enter(data);
+  pn_data_next(data);
+  pn_data_next(data);
+  pn_data_next(data);
+  pn_data_next(data);
+  pn_bytes_t host = pn_data_get_bytes(data);
+  pn_data_rewind(data);
+  return host.start;
+}
+
+int pn_condition_redirect_port(pn_condition_t *condition)
+{
+  pn_data_t *data = pn_condition_info(condition);
+  pn_data_rewind(data);
+  pn_data_next(data);
+  pn_data_enter(data);
+  pn_data_next(data);
+  pn_data_next(data);
+  pn_data_next(data);
+  pn_data_next(data);
+  pn_data_next(data);
+  pn_data_next(data);
+  int port = pn_data_get_int(data);
+  pn_data_rewind(data);
+  return port;
+}

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Thu Dec 13 21:28:56 2012
@@ -67,6 +67,16 @@ struct pn_subscription_t {
   void *context;
 };
 
+typedef struct {
+  int refcount;
+  char *address;
+  char *scheme;
+  char *user;
+  char *pass;
+  char *host;
+  char *port;
+} pn_connection_ctx_t;
+
 void pn_queue_init(pn_queue_t *queue)
 {
   queue->capacity = 1024;
@@ -110,18 +120,24 @@ void pn_queue_gc(pn_queue_t *queue)
   queue->lwm += delta;
 }
 
-void pn_incref(pn_connection_t *conn)
+static void pn_incref(pn_connection_t *conn)
 {
-  intptr_t refcount = (intptr_t) pn_connection_get_context(conn);
-  pn_connection_set_context(conn, (void *) (refcount + 1));
+  pn_connection_ctx_t *ctx = pn_connection_get_context(conn);
+  ctx->refcount++;
 }
 
-void pn_decref(pn_connection_t *conn)
+static void pn_decref(pn_connection_t *conn)
 {
-  intptr_t refcount = (intptr_t) pn_connection_get_context(conn);
-  pn_connection_set_context(conn, (void *) (refcount - 1));
-  if (refcount == 1) {
+  pn_connection_ctx_t *ctx = pn_connection_get_context(conn);
+  ctx->refcount--;
+  if (ctx->refcount == 0) {
     pn_connection_free(conn);
+    free(ctx->scheme);
+    free(ctx->user);
+    free(ctx->pass);
+    free(ctx->host);
+    free(ctx->port);
+    free(ctx);
   }
 }
 
@@ -391,9 +407,56 @@ void pn_messenger_flow(pn_messenger_t *m
   }
 }
 
+static void pn_transport_config(pn_messenger_t *messenger,
+                                pn_connector_t *connector,
+                                pn_connection_t *connection)
+{
+  pn_connection_ctx_t *ctx = pn_connection_get_context(connection);
+  pn_transport_t *transport = pn_connector_transport(connector);
+  if (ctx->scheme && !strcmp(ctx->scheme, "amqps")) {
+    pn_ssl_t *ssl = pn_ssl(transport);
+    pn_ssl_init(ssl, PN_SSL_MODE_CLIENT);
+    if (messenger->certificate && messenger->private_key) {
+      pn_ssl_set_credentials(ssl, messenger->certificate,
+                             messenger->private_key,
+                             messenger->password);
+    }
+    if (messenger->trusted_certificates) {
+      pn_ssl_set_trusted_ca_db(ssl, messenger->trusted_certificates);
+      pn_ssl_set_peer_authentication(ssl, PN_SSL_VERIFY_PEER, NULL);
+    } else {
+      pn_ssl_set_peer_authentication(ssl, PN_SSL_ANONYMOUS_PEER, NULL);
+    }
+  }
+
+  pn_sasl_t *sasl = pn_sasl(transport);
+  if (ctx->user) {
+    pn_sasl_plain(sasl, ctx->user, ctx->pass);
+  } else {
+    pn_sasl_mechanisms(sasl, "ANONYMOUS");
+    pn_sasl_client(sasl);
+  }
+}
+
+static void pn_condition_report(const char *pfx, pn_condition_t *condition)
+{
+  if (pn_condition_is_redirect(condition)) {
+    fprintf(stderr, "%s NOTICE (%s) redirecting to %s:%i\n",
+            pfx,
+            pn_condition_get_name(condition),
+            pn_condition_redirect_host(condition),
+            pn_condition_redirect_port(condition));
+  } else if (pn_condition_is_set(condition)) {
+    fprintf(stderr, "%s ERROR (%s) %s\n",
+            pfx,
+            pn_condition_get_name(condition),
+            pn_condition_get_description(condition));
+  }
+}
+
 void pn_messenger_endpoints(pn_messenger_t *messenger, pn_connection_t *conn, pn_connector_t *ctor)
 {
-  if (pn_connection_state(conn) | PN_LOCAL_UNINIT) {
+  if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
     pn_connection_open(conn);
   }
 
@@ -435,18 +498,36 @@ void pn_messenger_endpoints(pn_messenger
 
   ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
   while (ssn) {
+    pn_condition_report("SESSION", pn_session_remote_condition(ssn));
     pn_session_close(ssn);
     ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
   }
 
   link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
   while (link) {
+    pn_condition_report("LINK", pn_link_remote_condition(link));
     pn_link_close(link);
     link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
   }
 
   if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+    pn_condition_t *condition = pn_connection_remote_condition(conn);
+    pn_condition_report("CONNECTION", condition);
     pn_connection_close(conn);
+    if (pn_condition_is_redirect(condition)) {
+      const char *host = pn_condition_redirect_host(condition);
+      char buf[1024];
+      sprintf(buf, "%i", pn_condition_redirect_port(condition));
+
+      pn_connector_process(ctor);
+      pn_connector_set_connection(ctor, NULL);
+      pn_driver_t *driver = messenger->driver;
+      pn_connector_t *connector = pn_connector(driver, host, buf, NULL);
+      pn_transport_unbind(pn_connector_transport(ctor));
+      pn_connection_reset(conn);
+      pn_transport_config(messenger, connector, conn);
+      pn_connector_set_connection(connector, conn);
+    }
   }
 }
 
@@ -463,6 +544,31 @@ void pn_messenger_reclaim(pn_messenger_t
   }
 }
 
+
+pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
+                                         char *scheme,
+                                         char *user,
+                                         char *pass,
+                                         char *host,
+                                         char *port)
+{
+  pn_connection_t *connection = pn_connection();
+  if (!connection) return NULL;
+  pn_connection_ctx_t *ctx = malloc(sizeof(pn_connection_ctx_t));
+  ctx->refcount = 0;
+  ctx->scheme = pn_strdup(scheme);
+  ctx->user = pn_strdup(user);
+  ctx->pass = pn_strdup(pass);
+  ctx->host = pn_strdup(host);
+  ctx->port = pn_strdup(port);
+  pn_connection_set_context(connection, ctx);
+  pn_incref(connection);
+
+  pn_connection_set_container(connection, messenger->name);
+  pn_connection_set_hostname(connection, ctx->host);
+  return connection;
+}
+
 int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
 {
   pn_connector_t *ctor = pn_connector_head(messenger->driver);
@@ -504,9 +610,8 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_sasl_mechanisms(sasl, "ANONYMOUS");
       pn_sasl_server(sasl);
       pn_sasl_done(sasl, PN_SASL_OK);
-      pn_connection_t *conn = pn_connection();
-      pn_incref(conn);
-      pn_connection_set_container(conn, messenger->name);
+      pn_connection_t *conn =
+        pn_messenger_connection(messenger, scheme, NULL, NULL, NULL, NULL);
       pn_connector_set_connection(c, conn);
     }
 
@@ -517,9 +622,11 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_messenger_endpoints(messenger, conn, c);
       if (pn_connector_closed(c)) {
         pn_connector_free(c);
-        pn_messenger_reclaim(messenger, conn);
-        pn_decref(conn);
-        pn_messenger_flow(messenger);
+        if (conn) {
+          pn_messenger_reclaim(messenger, conn);
+          pn_decref(conn);
+          pn_messenger_flow(messenger);
+        }
       } else {
         pn_connector_process(c);
       }
@@ -589,7 +696,6 @@ static const char *default_port(const ch
   else
     return "5672";
 }
-
 pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, char *address, char **name)
 {
   char domain[strlen(address) + 1];
@@ -615,10 +721,16 @@ pn_connection_t *pn_messenger_resolve(pn
   pn_connector_t *ctor = pn_connector_head(messenger->driver);
   while (ctor) {
     pn_connection_t *connection = pn_connector_connection(ctor);
+    pn_connection_ctx_t *ctx = pn_connection_get_context(connection);
+    if (pn_streq(scheme, ctx->scheme) && pn_streq(user, ctx->user) &&
+        pn_streq(pass, ctx->pass) && pn_streq(host, ctx->host) &&
+        pn_streq(port, ctx->port)) {
+      return connection;
+    }
     const char *container = pn_connection_remote_container(connection);
-    const char *hostname = pn_connection_get_hostname(connection);
-    if (pn_streq(container, domain) || pn_streq(hostname, domain))
+    if (pn_streq(container, domain)) {
       return connection;
+    }
     ctor = pn_connector_next(ctor);
   }
 
@@ -626,34 +738,9 @@ pn_connection_t *pn_messenger_resolve(pn
                                            port ? port : default_port(scheme),
                                            NULL);
   if (!connector) return NULL;
-  pn_transport_t *transport = pn_connector_transport(connector);
-  if (scheme && !strcmp(scheme, "amqps")) {
-    pn_ssl_t *ssl = pn_ssl(transport);
-    pn_ssl_init(ssl, PN_SSL_MODE_CLIENT);
-    if (messenger->certificate && messenger->private_key) {
-      pn_ssl_set_credentials(ssl, messenger->certificate,
-                             messenger->private_key,
-                             messenger->password);
-    }
-    if (messenger->trusted_certificates) {
-      pn_ssl_set_trusted_ca_db(ssl, messenger->trusted_certificates);
-      pn_ssl_set_peer_authentication(ssl, PN_SSL_VERIFY_PEER, NULL);
-    } else {
-      pn_ssl_set_peer_authentication(ssl, PN_SSL_ANONYMOUS_PEER, NULL);
-    }
-  }
-
-  pn_sasl_t *sasl = pn_sasl(transport);
-  if (user) {
-    pn_sasl_plain(sasl, user, pass);
-  } else {
-    pn_sasl_mechanisms(sasl, "ANONYMOUS");
-    pn_sasl_client(sasl);
-  }
-  pn_connection_t *connection = pn_connection();
-  pn_incref(connection);
-  pn_connection_set_container(connection, messenger->name);
-  pn_connection_set_hostname(connection, domain);
+  pn_connection_t *connection =
+    pn_messenger_connection(messenger, scheme, user, pass, host, port);
+  pn_transport_config(messenger, connector, connection);
   pn_connection_open(connection);
   pn_connector_set_connection(connector, connection);
 
@@ -1132,4 +1219,3 @@ int pn_messenger_incoming(pn_messenger_t
 {
   return pn_messenger_queued(messenger, false);
 }
-

Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1421547&r1=1421546&r2=1421547&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Thu Dec 13 21:28:56 2012
@@ -200,7 +200,7 @@ class ConnectionTest(Test):
     assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
 
     rcond = self.c2.remote_condition
-    assert rcond == cond
+    assert rcond == cond, (rcond, cond)
 
 class SessionTest(Test):
 
@@ -290,6 +290,28 @@ class SessionTest(Test):
     self.ssn.close()
     self.pump()
 
+  def test_condition(self):
+    self.ssn.open()
+    self.pump()
+    ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT)
+    assert ssn != None
+    ssn.open()
+    self.pump()
+
+    assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+    assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+
+    cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"})
+    self.ssn.condition = cond
+    self.ssn.close()
+
+    self.pump()
+
+    assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+    assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
+
+    rcond = ssn.remote_condition
+    assert rcond == cond, (rcond, cond)
 
 class LinkTest(Test):
 
@@ -463,6 +485,26 @@ class LinkTest(Test):
                                             timeout=7,
                                             capabilities=[]))
 
+  def test_condition(self):
+    self.snd.open()
+    self.rcv.open()
+    self.pump()
+
+    assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+    assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+
+    cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"})
+    self.snd.condition = cond
+    self.snd.close()
+
+    self.pump()
+
+    assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+    assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
+
+    rcond = self.rcv.remote_condition
+    assert rcond == cond, (rcond, cond)
+
 class TerminusConfig:
 
   def __init__(self, address=None, timeout=None, durability=None, filter=None,



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org