You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2014/02/21 19:50:37 UTC

svn commit: r1570659 - in /qpid/proton/trunk/proton-c/src: engine/engine-internal.h engine/engine.c engine/event.c tests/CMakeLists.txt tests/engine.c transport/transport.c transport/transport.h

Author: kgiusti
Date: Fri Feb 21 18:50:36 2014
New Revision: 1570659

URL: http://svn.apache.org/r1570659
Log:
PROTON-489: Use reference counting for object management.

Each child object will keep a reference to its parent: Deliveries
reference Links, which reference their Sessions, which reference the
containing Connection.  This keeps the parent objects present until
all child objects have been released.

The transport work lists also reference count their members (Endpoints
and Deliveries).  This keeps freed objects around long enough to flush
their pending state prior to deleting them.

Added:
    qpid/proton/trunk/proton-c/src/tests/engine.c   (with props)
Modified:
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/engine/event.c
    qpid/proton/trunk/proton-c/src/tests/CMakeLists.txt
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/proton-c/src/transport/transport.h

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1570659&r1=1570658&r2=1570659&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Feb 21 18:50:36 2014
@@ -50,6 +50,7 @@ struct pn_endpoint_t {
   pn_endpoint_t *transport_next;
   pn_endpoint_t *transport_prev;
   bool modified;
+  bool freed;
 };
 
 typedef struct {
@@ -110,11 +111,12 @@ typedef struct pn_io_layer_t {
 } pn_io_layer_t;
 
 struct pn_transport_t {
+  bool freed;
   pn_tracer_t *tracer;
   size_t header_count;
   pn_sasl_t *sasl;
   pn_ssl_t *ssl;
-  pn_connection_t *connection;
+  pn_connection_t *connection;  // reference counted
   pn_dispatcher_t *disp;
   bool open_sent;
   bool open_rcvd;
@@ -177,13 +179,13 @@ struct pn_connection_t {
   pn_endpoint_t endpoint;
   pn_endpoint_t *endpoint_head;
   pn_endpoint_t *endpoint_tail;
-  pn_endpoint_t *transport_head;
+  pn_endpoint_t *transport_head;  // reference counted
   pn_endpoint_t *transport_tail;
   pn_list_t *sessions;
   pn_transport_t *transport;
   pn_delivery_t *work_head;
   pn_delivery_t *work_tail;
-  pn_delivery_t *tpwork_head;
+  pn_delivery_t *tpwork_head;  // reference counted
   pn_delivery_t *tpwork_tail;
   pn_string_t *container;
   pn_string_t *hostname;
@@ -196,7 +198,7 @@ struct pn_connection_t {
 
 struct pn_session_t {
   pn_endpoint_t endpoint;
-  pn_connection_t *connection;
+  pn_connection_t *connection;  // reference counted
   pn_list_t *links;
   void *context;
   size_t incoming_capacity;
@@ -224,7 +226,7 @@ struct pn_terminus_t {
 struct pn_link_t {
   pn_endpoint_t endpoint;
   pn_string_t *name;
-  pn_session_t *session;
+  pn_session_t *session;  // reference counted
   pn_terminus_t source;
   pn_terminus_t target;
   pn_terminus_t remote_source;
@@ -262,7 +264,7 @@ struct pn_disposition_t {
 };
 
 struct pn_delivery_t {
-  pn_link_t *link;
+  pn_link_t *link;  // reference counted
   pn_buffer_t *tag;
   pn_disposition_t local;
   pn_disposition_t remote;
@@ -302,9 +304,10 @@ pn_timestamp_t pn_io_layer_tick_passthru
 void pn_condition_init(pn_condition_t *condition);
 void pn_condition_tini(pn_condition_t *condition);
 void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit);
-void pn_real_settle(pn_delivery_t *delivery);
+void pn_real_settle(pn_delivery_t *delivery);  // will free delivery if link is freed
 void pn_clear_tpwork(pn_delivery_t *delivery);
 void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);
 void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
+void pn_connection_unbound(pn_connection_t *conn);
 
 #endif /* engine-internal.h */

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1570659&r1=1570658&r2=1570659&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Fri Feb 21 18:50:36 2014
@@ -86,9 +86,53 @@ void pn_endpoint_tini(pn_endpoint_t *end
 
 void pn_connection_free(pn_connection_t *connection)
 {
+  assert(!connection->endpoint.freed);
+  if (pn_connection_state(connection) & PN_LOCAL_ACTIVE)
+    pn_connection_close(connection);
+  // free those endpoints that haven't been freed by the application
+  LL_REMOVE(connection, endpoint, &connection->endpoint);
+  while (connection->endpoint_head) {
+    pn_endpoint_t *ep = connection->endpoint_head;
+    switch (ep->type) {
+    case SESSION:
+      // note: this will free all child links:
+      pn_session_free((pn_session_t *)ep);
+      break;
+    case SENDER:
+    case RECEIVER:
+      pn_link_free((pn_link_t *)ep);
+      break;
+    default:
+      assert(false);
+    }
+  }
+  connection->endpoint.freed = true;
+  if (!connection->transport) {
+    // no transport available to consume transport work items,
+    // so manually clear them:
+    pn_connection_unbound(connection);
+  }
   pn_decref(connection);
 }
 
+// invoked when transport has been removed:
+void pn_connection_unbound(pn_connection_t *connection)
+{
+  connection->transport = NULL;
+  if (connection->endpoint.freed) {
+    // connection has been freed prior to unbinding, thus it
+    // cannot be re-assigned to a new transport.  Clear the
+    // transport work lists to allow the connection to be freed.
+    while (connection->transport_head) {
+        pn_clear_modified(connection, connection->transport_head);
+    }
+    while (connection->tpwork_head) {
+      pn_real_settle(connection->tpwork_head);
+      pn_clear_tpwork(connection->tpwork_head);
+    }
+  }
+}
+
 void *pn_connection_get_context(pn_connection_t *conn)
 {
     return conn ? conn->context : 0;
@@ -118,17 +162,19 @@ void pn_add_session(pn_connection_t *con
 {
   pn_list_add(conn->sessions, ssn);
   ssn->connection = conn;
+  pn_incref(conn);  // keep around until finalized
 }
 
 void pn_remove_session(pn_connection_t *conn, pn_session_t *ssn)
 {
-  ssn->connection = NULL;
   pn_list_remove(conn->sessions, ssn);
 }
 
 pn_connection_t *pn_session_connection(pn_session_t *session)
 {
-  return session ? session->connection : NULL;
+  if (!session) return NULL;
+  return session->connection->endpoint.freed
+    ? NULL : session->connection;
 }
 
 void pn_session_open(pn_session_t *session)
@@ -143,11 +189,18 @@ void pn_session_close(pn_session_t *sess
 
 void pn_session_free(pn_session_t *session)
 {
-  if (session && session->connection) {
-    pn_remove_session(session->connection, session);
-    pn_endpoint_t *endpoint = (pn_endpoint_t *) session;
-    LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
-  }
+  assert(!session->endpoint.freed);
+  while(pn_list_size(session->links)) {
+    pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0);
+    pn_link_free(link);
+  }
+  if (pn_session_state(session) & PN_LOCAL_ACTIVE)
+    pn_session_close(session);
+  pn_remove_session(session->connection, session);
+  pn_endpoint_t *endpoint = (pn_endpoint_t *) session;
+  LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
+  session->endpoint.freed = true;
+  pn_decref(session);
 }
 
 void *pn_session_get_context(pn_session_t *session)
@@ -194,11 +247,25 @@ void pn_terminus_free(pn_terminus_t *ter
 
 void pn_link_free(pn_link_t *link)
 {
-  if (link && link->session) {
-    pn_remove_link(link->session, link);
-    pn_endpoint_t *endpoint = (pn_endpoint_t *) link;
-    LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
+  assert(!link->endpoint.freed);
+  if (pn_link_state(link) & PN_LOCAL_ACTIVE)
+    pn_link_close(link);
+  pn_remove_link(link->session, link);
+  pn_endpoint_t *endpoint = (pn_endpoint_t *) link;
+  LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
+  pn_delivery_t *delivery = link->unsettled_head;
+  while (delivery) {
+    pn_delivery_t *next = delivery->unsettled_next;
+    pn_delivery_settle(delivery);
+    delivery = next;
+  }
+  while (link->settled_head) {
+    delivery = link->settled_head;
+    LL_POP(link, settled, pn_delivery_t);
+    pn_decref(delivery);
   }
+  link->endpoint.freed = true;
+  pn_decref(link);
 }
 
 void *pn_link_get_context(pn_link_t *link)
@@ -224,6 +291,7 @@ void pn_endpoint_init(pn_endpoint_t *end
   endpoint->transport_next = NULL;
   endpoint->transport_prev = NULL;
   endpoint->modified = false;
+  endpoint->freed = false;
 
   LL_ADD(conn, endpoint, endpoint);
 }
@@ -266,7 +334,7 @@ pn_connection_t *pn_connection()
   pn_endpoint_init(&conn->endpoint, CONNECTION, conn);
   conn->transport_head = NULL;
   conn->transport_tail = NULL;
-  conn->sessions = pn_list(0, PN_REFCOUNT);
+  conn->sessions = pn_list(0, 0);
   conn->transport = NULL;
   conn->work_head = NULL;
   conn->work_tail = NULL;
@@ -389,6 +457,7 @@ void pn_add_work(pn_connection_t *connec
 {
   if (!delivery->work)
   {
+    assert(!delivery->local.settled);   // never allow settled deliveries
     LL_ADD(connection, work, delivery);
     delivery->work = true;
   }
@@ -431,6 +500,7 @@ void pn_add_tpwork(pn_delivery_t *delive
   {
     LL_ADD(connection, tpwork, delivery);
     delivery->tpwork = true;
+    pn_incref(delivery);
   }
   pn_modified(connection, &connection->endpoint, true);
 }
@@ -442,6 +512,7 @@ void pn_clear_tpwork(pn_delivery_t *deli
   {
     LL_REMOVE(connection, tpwork, delivery);
     delivery->tpwork = false;
+    pn_decref(delivery);  // may free delivery!
   }
 }
 
@@ -463,6 +534,7 @@ void pn_modified(pn_connection_t *connec
   if (!endpoint->modified) {
     LL_ADD(connection, transport, endpoint);
     endpoint->modified = true;
+    pn_incref(endpoint);
   }
 
   if (emit) {
@@ -480,6 +552,7 @@ void pn_clear_modified(pn_connection_t *
     endpoint->transport_next = NULL;
     endpoint->transport_prev = NULL;
     endpoint->modified = false;
+    pn_decref(endpoint);  // may free endpoint!
   }
 }
 
@@ -558,13 +631,21 @@ pn_link_t *pn_link_next(pn_link_t *link,
 static void pn_session_finalize(void *object)
 {
   pn_session_t *session = (pn_session_t *) object;
+  //pn_transport_t *transport = session->connection->transport;
+  //if (transport) {
+  /*   if ((int16_t)session->state.local_channel >= 0)  // END not sent */
+  /*     pn_hash_del(transport->local_channels, session->state.local_channel); */
+  /*   if ((int16_t)session->state.remote_channel >= 0)  // END not received */
+  /*     pn_unmap_channel(transport, session); */
+  /* } */
+
   pn_free(session->links);
   pn_endpoint_tini(&session->endpoint);
-
   pn_delivery_map_free(&session->state.incoming);
   pn_delivery_map_free(&session->state.outgoing);
   pn_free(session->state.local_handles);
   pn_free(session->state.remote_handles);
+  pn_decref(session->connection);
 }
 
 #define pn_session_initialize NULL
@@ -581,8 +662,7 @@ pn_session_t *pn_session(pn_connection_t
 
   pn_endpoint_init(&ssn->endpoint, SESSION, conn);
   pn_add_session(conn, ssn);
-  pn_decref(ssn);
-  ssn->links = pn_list(0, PN_REFCOUNT);
+  ssn->links = pn_list(0, 0);
   ssn->context = 0;
   ssn->incoming_capacity = 1024*1024;
   ssn->incoming_bytes = 0;
@@ -657,22 +737,17 @@ static void pn_link_finalize(void *objec
 {
   pn_link_t *link = (pn_link_t *) object;
 
+  // assumptions: all deliveries freed
+  assert(link->settled_head == NULL);
+  assert(link->unsettled_head == NULL);
+
   pn_terminus_free(&link->source);
   pn_terminus_free(&link->target);
   pn_terminus_free(&link->remote_source);
   pn_terminus_free(&link->remote_target);
-  while (link->settled_head) {
-    pn_delivery_t *d = link->settled_head;
-    LL_POP(link, settled, pn_delivery_t);
-    pn_free(d);
-  }
-  while (link->unsettled_head) {
-    pn_delivery_t *d = link->unsettled_head;
-    LL_POP(link, unsettled, pn_delivery_t);
-    pn_free(d);
-  }
   pn_free(link->name);
   pn_endpoint_tini(&link->endpoint);
+  pn_decref(link->session);
 }
 
 #define pn_link_initialize NULL
@@ -687,7 +762,7 @@ pn_link_t *pn_link_new(int type, pn_sess
 
   pn_endpoint_init(&link->endpoint, type, session->connection);
   pn_add_link(session, link);
-  pn_decref(link);
+  pn_incref(session);  // keep session until link finalized
   link->name = pn_string(name);
   pn_terminus_init(&link->source, PN_SOURCE);
   pn_terminus_init(&link->target, PN_TARGET);
@@ -905,7 +980,9 @@ bool pn_link_is_receiver(pn_link_t *link
 
 pn_session_t *pn_link_session(pn_link_t *link)
 {
-  return link->session;
+  assert(link);
+  return link->session->endpoint.freed
+      ? NULL : link->session;
 }
 
 static void pn_disposition_finalize(pn_disposition_t *ds)
@@ -918,10 +995,13 @@ static void pn_disposition_finalize(pn_d
 static void pn_delivery_finalize(void *object)
 {
   pn_delivery_t *delivery = (pn_delivery_t *) object;
+  assert(delivery->settled);
+  assert(!delivery->state.init);  // no longer in session delivery map
   pn_buffer_free(delivery->tag);
   pn_buffer_free(delivery->bytes);
   pn_disposition_finalize(&delivery->local);
   pn_disposition_finalize(&delivery->remote);
+  pn_decref(delivery->link);
 }
 
 static void pn_disposition_init(pn_disposition_t *ds)
@@ -958,6 +1038,8 @@ pn_delivery_t *pn_delivery(pn_link_t *li
     static pn_class_t clazz = PN_CLASS(pn_delivery);
     delivery = (pn_delivery_t *) pn_new(sizeof(pn_delivery_t), &clazz);
     if (!delivery) return NULL;
+    delivery->link = link;
+    pn_incref(delivery->link);  // keep link until finalized
     delivery->tag = pn_buffer(16);
     delivery->bytes = pn_buffer(64);
     pn_disposition_init(&delivery->local);
@@ -965,7 +1047,6 @@ pn_delivery_t *pn_delivery(pn_link_t *li
   } else {
     assert(!delivery->tpwork);
   }
-  delivery->link = link;
   pn_buffer_clear(delivery->tag);
   pn_buffer_append(delivery->tag, tag.bytes, tag.size);
   pn_disposition_clear(&delivery->local);
@@ -1266,25 +1347,34 @@ void pn_real_settle(pn_delivery_t *deliv
 {
   pn_link_t *link = delivery->link;
   LL_REMOVE(link, unsettled, delivery);
-  LL_ADD(link, settled, delivery);
+  pn_delivery_map_del(pn_link_is_sender(link)
+                      ? &link->session->state.outgoing
+                      : &link->session->state.incoming,
+                      delivery);
   pn_buffer_clear(delivery->tag);
   pn_buffer_clear(delivery->bytes);
   delivery->settled = true;
+  if (link->endpoint.freed) {
+    pn_decref(delivery);
+  } else {
+    LL_ADD(link, settled, delivery);
+  }
 }
 
 void pn_delivery_settle(pn_delivery_t *delivery)
 {
   assert(delivery);
+  if (!delivery->local.settled) {
+    pn_link_t *link = delivery->link;
+    if (pn_is_current(delivery)) {
+      pn_link_advance(link);
+    }
 
-  pn_link_t *link = delivery->link;
-  if (pn_is_current(delivery)) {
-    pn_link_advance(link);
+    link->unsettled_count--;
+    delivery->local.settled = true;
+    pn_add_tpwork(delivery);
+    pn_work_update(delivery->link->session->connection, delivery);
   }
-
-  link->unsettled_count--;
-  delivery->local.settled = true;
-  pn_add_tpwork(delivery);
-  pn_work_update(delivery->link->session->connection, delivery);
 }
 
 void pn_link_offered(pn_link_t *sender, int credit)
@@ -1383,8 +1473,9 @@ bool pn_link_draining(pn_link_t *receive
 
 pn_link_t *pn_delivery_link(pn_delivery_t *delivery)
 {
-  if (!delivery) return NULL;
-  return delivery->link;
+  assert(delivery);
+  return delivery->link->endpoint.freed
+    ? NULL : delivery->link;
 }
 
 pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery)

Modified: qpid/proton/trunk/proton-c/src/engine/event.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/event.c?rev=1570659&r1=1570658&r2=1570659&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/event.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/event.c Fri Feb 21 18:50:36 2014
@@ -118,6 +118,20 @@ pn_event_t *pn_collector_put(pn_collecto
 
 pn_event_t *pn_collector_peek(pn_collector_t *collector)
 {
+  // discard any events for objects that no longer exist
+  pn_event_t *event = collector->head;
+  while (event && ((event->delivery && event->delivery->local.settled)
+                   ||
+                   (event->link && event->link->endpoint.freed)
+                   ||
+                   (event->session && event->session->endpoint.freed)
+                   ||
+                   (event->connection && event->connection->endpoint.freed)
+                   ||
+                   (event->transport && event->transport->freed))) {
+    pn_collector_pop(collector);
+    event = collector->head;
+  }
   return collector->head;
 }
 
@@ -137,9 +151,11 @@ bool pn_collector_pop(pn_collector_t *co
   event->next = collector->free_head;
   collector->free_head = event;
 
-  if (event->connection) {
-    pn_decref(event->connection);
-  }
+  if (event->connection) pn_decref(event->connection);
+  if (event->session) pn_decref(event->session);
+  if (event->link) pn_decref(event->link);
+  if (event->delivery) pn_decref(event->delivery);
+  if (event->transport) pn_decref(event->transport);
 
   return true;
 }
@@ -190,6 +206,7 @@ pn_event_t *pn_event(void)
 void pn_event_init_transport(pn_event_t *event, pn_transport_t *transport)
 {
   event->transport = transport;
+  pn_incref(event->transport);
 }
 
 void pn_event_init_connection(pn_event_t *event, pn_connection_t *connection)
@@ -203,18 +220,21 @@ void pn_event_init_session(pn_event_t *e
 {
   event->session = session;
   pn_event_init_connection(event, pn_session_connection(event->session));
+  pn_incref(event->session);
 }
 
 void pn_event_init_link(pn_event_t *event, pn_link_t *link)
 {
   event->link = link;
   pn_event_init_session(event, pn_link_session(event->link));
+  pn_incref(event->link);
 }
 
 void pn_event_init_delivery(pn_event_t *event, pn_delivery_t *delivery)
 {
   event->delivery = delivery;
   pn_event_init_link(event, pn_delivery_link(delivery));
+  pn_incref(event->delivery);
 }
 
 pn_event_type_t pn_event_type(pn_event_t *event)

Modified: qpid/proton/trunk/proton-c/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/tests/CMakeLists.txt?rev=1570659&r1=1570658&r2=1570659&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/tests/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/src/tests/CMakeLists.txt Fri Feb 21 18:50:36 2014
@@ -37,6 +37,16 @@ set_target_properties (
   )
 pn_c_files (message.c)
 
+add_executable (c-engine-tests engine.c)
+target_link_libraries (c-engine-tests qpid-proton)
+set_target_properties (
+  c-engine-tests
+  PROPERTIES
+  COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_PLATFORM_FLAGS}"
+  OUTPUT_NAME c-engine-tests
+  )
+pn_c_files (engine.c)
+
 if (ENABLE_VALGRIND AND VALGRIND)
   set(memcheck-cmd ${VALGRIND} --error-exitcode=1 --quiet
                    --leak-check=full --trace-children=yes)
@@ -44,4 +54,5 @@ endif (ENABLE_VALGRIND AND VALGRIND)
 
 add_test (c-object-tests ${memcheck-cmd} ./c-object-tests)
 add_test (c-message-tests ${memcheck-cmd} ./c-message-tests)
+add_test (c-engine-tests ${memcheck-cmd} ./c-engine-tests)
 

Added: qpid/proton/trunk/proton-c/src/tests/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/tests/engine.c?rev=1570659&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/src/tests/engine.c (added)
+++ qpid/proton/trunk/proton-c/src/tests/engine.c Fri Feb 21 18:50:36 2014
@@ -0,0 +1,311 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <proton/engine.h>
+
+// never remove 'assert()'
+#undef NDEBUG
+#include <assert.h>
+
+// push data from one transport to another
+static int xfer(pn_transport_t *src, pn_transport_t *dest)
+{
+    ssize_t out = pn_transport_pending(src);
+    if (out > 0) {
+        ssize_t in = pn_transport_capacity(dest);
+        if (in > 0) {
+            size_t count = (size_t)((out < in) ? out : in);
+            pn_transport_push(dest,
+                              pn_transport_head(src),
+                              count);
+            pn_transport_pop(src, count);
+            return (int)count;
+        }
+    }
+    return 0;
+}
+
+// transfer all available data between two transports
+static int pump(pn_transport_t *t1, pn_transport_t *t2)
+{
+    int total = 0;
+    int work;
+    do {
+        work = xfer(t1, t2) + xfer(t2, t1);
+        total += work;
+    } while (work);
+    return total;
+}
+
+// handle state changes of the endpoints
+static void process_endpoints(pn_connection_t *conn)
+{
+    pn_session_t *ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
+    while (ssn) {
+        //fprintf(stderr, "Opening session %p\n", (void*)ssn);
+        pn_session_open(ssn);
+        ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
+    }
+
+    pn_link_t *link = pn_link_head(conn, PN_LOCAL_UNINIT);
+    while (link) {
+        //fprintf(stderr, "Opening link %p\n", (void*)link);
+        pn_link_open(link);
+        link = pn_link_next(link, PN_LOCAL_UNINIT);
+    }
+
+    link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+    while (link) {
+        //fprintf(stderr, "Closing link %p\n", (void*)link);
+        pn_link_close(link);
+        link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+    }
+
+    ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+    while (ssn) {
+        //fprintf(stderr, "Closing session %p\n", (void*)ssn);
+        pn_session_close(ssn);
+        ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+    }
+}
+
+// bring up a session and a link between the two connections
+static void test_setup(pn_connection_t *c1, pn_transport_t *t1,
+                       pn_connection_t *c2, pn_transport_t *t2)
+{
+    pn_connection_open(c1);
+    pn_connection_open(c2);
+
+    pn_session_t *s1 = pn_session(c1);
+    pn_session_open(s1);
+
+    pn_link_t *tx = pn_sender(s1, "sender");
+    pn_link_open(tx);
+
+    while (pump(t1, t2)) {
+        process_endpoints(c1);
+        process_endpoints(c2);
+    }
+
+    // session and link should be up, c2 should have a receiver link:
+
+    assert(pn_session_state( s1 ) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+    assert(pn_link_state( tx ) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+
+    pn_link_t *rx = pn_link_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+    assert(rx && pn_link_is_receiver(rx));
+}
+
+// test that free'ing the connection should free all contained
+// resources (session, links, deliveries)
+int test_free_connection(int argc, char **argv)
+{
+    fprintf(stdout, "test_free_connection\n");
+    pn_connection_t *c1 = pn_connection();
+    pn_transport_t  *t1 = pn_transport();
+    pn_transport_bind(t1, c1);
+
+    pn_connection_t *c2 = pn_connection();
+    pn_transport_t  *t2 = pn_transport();
+    pn_transport_bind(t2, c2);
+
+    //pn_transport_trace(t1, PN_TRACE_FRM);
+    test_setup(c1, t1,
+               c2, t2);
+
+    pn_link_t *tx = pn_link_head(c1, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+    assert(tx);
+    pn_link_t *rx = pn_link_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+    assert(rx);
+
+    // transfer some data across the link:
+    pn_link_flow(rx, 10);
+    pn_delivery_t *d1 = pn_delivery(tx, pn_dtag("tag-1", 6));
+    while (pump(t1, t2)) {
+        process_endpoints(c1);
+        process_endpoints(c2);
+    }
+    assert(pn_delivery_writable(d1));
+    pn_link_send(tx, "ABC", 4);
+    pn_link_advance(tx);
+
+    // now free the connection, but keep processing the transport
+    process_endpoints(c1);
+    pn_connection_free(c1);
+    while (pump(t1, t2)) {
+        process_endpoints(c2);
+    }
+
+    // delivery should have transfered:
+    assert(pn_link_current(rx) &&
+           pn_delivery_readable(pn_link_current(rx)));
+
+    pn_transport_unbind(t1);
+    pn_transport_free(t1);
+
+    pn_connection_free(c2);
+    pn_transport_unbind(t2);
+    pn_transport_free(t2);
+
+    return 0;
+}
+
+int test_free_session(int argc, char **argv)
+{
+    fprintf(stdout, "test_free_session\n");
+    pn_connection_t *c1 = pn_connection();
+    pn_transport_t  *t1 = pn_transport();
+    pn_transport_bind(t1, c1);
+
+    pn_connection_t *c2 = pn_connection();
+    pn_transport_t  *t2 = pn_transport();
+    pn_transport_bind(t2, c2);
+
+    //pn_transport_trace(t1, PN_TRACE_FRM);
+    test_setup(c1, t1,
+               c2, t2);
+
+    pn_session_t *ssn = pn_session_head(c1, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+    assert(ssn);
+    pn_link_t *tx = pn_link_head(c1, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+    assert(tx);
+    pn_link_t *rx = pn_link_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+    assert(rx);
+
+    // prepare for transfer: request some credit
+    pn_link_flow(rx, 10);
+    pn_delivery_t *d1 = pn_delivery(tx, pn_dtag("tag-1", 6));
+    while (pump(t1, t2)) {
+        process_endpoints(c1);
+        process_endpoints(c2);
+    }
+    assert(pn_delivery_writable(d1));
+
+    // send some data, but also close the session:
+    pn_link_send(tx, "ABC", 4);
+    pn_link_advance(tx);
+
+    pn_session_close(ssn);
+    pn_session_free(ssn);
+
+    while (pump(t1, t2)) {
+        process_endpoints(c1);
+        process_endpoints(c2);
+    }
+
+    // delivery should have transfered:
+    assert(pn_link_current(rx));
+    assert(pn_delivery_readable(pn_link_current(rx)));
+
+    // c2's session should see the close:
+    pn_session_t *ssn2 = pn_session_head(c2, 0);
+    assert(ssn2 && pn_session_state(ssn2) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED));
+
+    pn_transport_unbind(t1);
+    pn_transport_free(t1);
+    pn_connection_free(c1);
+
+    pn_transport_unbind(t2);
+    pn_transport_free(t2);
+    pn_connection_free(c2);
+
+    return 0;
+}
+
+int test_free_link(int argc, char **argv)
+{
+    fprintf(stdout, "test_free_link\n");
+    pn_connection_t *c1 = pn_connection();
+    pn_transport_t  *t1 = pn_transport();
+    pn_transport_bind(t1, c1);
+
+    pn_connection_t *c2 = pn_connection();
+    pn_transport_t  *t2 = pn_transport();
+    pn_transport_bind(t2, c2);
+
+    //pn_transport_trace(t1, PN_TRACE_FRM);
+    test_setup(c1, t1,
+               c2, t2);
+
+    pn_link_t *tx = pn_link_head(c1, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+    assert(tx);
+    pn_link_t *rx = pn_link_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+    assert(rx);
+
+    // prepare for transfer: request some credit
+    pn_link_flow(rx, 10);
+    pn_delivery_t *d1 = pn_delivery(tx, pn_dtag("tag-1", 6));
+    while (pump(t1, t2)) {
+        process_endpoints(c1);
+        process_endpoints(c2);
+    }
+    assert(pn_delivery_writable(d1));
+
+    // send some data, then close and destroy the link:
+    pn_link_send(tx, "ABC", 4);
+    pn_link_advance(tx);
+
+    pn_link_close(tx);
+    pn_link_free(tx);
+
+    while (pump(t1, t2)) {
+        process_endpoints(c1);
+        process_endpoints(c2);
+    }
+
+    // the data transfer will complete and the link close
+    // should have been sent to the peer
+    assert(pn_link_current(rx));
+    assert(pn_link_state(rx) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED));
+
+    pn_transport_unbind(t1);
+    pn_transport_free(t1);
+    pn_connection_free(c1);
+
+    pn_transport_unbind(t2);
+    pn_transport_free(t2);
+    pn_connection_free(c2);
+
+    return 0;
+}
+
+
+typedef int (*test_ptr_t)(int argc, char **argv);
+
+test_ptr_t tests[] = {test_free_connection,
+                      test_free_session,
+                      test_free_link,
+                      NULL};
+
+int main(int argc, char **argv)
+{
+    test_ptr_t *test = tests;
+    while (*test) {
+        int rc = (*test++)(argc, argv);
+        if (rc)
+            return rc;
+    }
+    return 0;
+}

Propchange: qpid/proton/trunk/proton-c/src/tests/engine.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1570659&r1=1570658&r2=1570659&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Fri Feb 21 18:50:36 2014
@@ -73,7 +73,9 @@ pn_delivery_state_t *pn_delivery_map_pus
 
 void pn_delivery_map_del(pn_delivery_map_t *db, pn_delivery_t *delivery)
 {
-  pn_hash_del(db->deliveries, delivery->state.id);
+  if (delivery->state.init) {
+    pn_hash_del(db->deliveries, delivery->state.id);
+  }
   delivery->state.init = false;
   delivery->state.sent = false;
 }
@@ -111,14 +113,21 @@ static void pni_default_tracer(pn_transp
   fprintf(stderr, "[%p]:%s\n", (void *) transport, message);
 }
 
-void pn_transport_init(pn_transport_t *transport)
+static void pn_transport_initialize(void *object)
 {
+  pn_transport_t *transport = (pn_transport_t *)object;
+  transport->freed = false;
+  transport->output_buf = NULL;
+  transport->output_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+  transport->input_buf = NULL;
+  transport->input_size =  PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
   transport->tracer = pni_default_tracer;
   transport->header_count = 0;
   transport->sasl = NULL;
   transport->ssl = NULL;
   transport->scratch = pn_string(NULL);
   transport->disp = pn_dispatcher(0, transport);
+  transport->connection = NULL;
 
   pn_io_layer_t *io_layer = transport->io_layers;
   while (io_layer != &transport->io_layers[PN_IO_AMQP]) {
@@ -196,38 +205,55 @@ static void pn_map_channel(pn_transport_
   session->state.remote_channel = channel;
 }
 
-static void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn)
+void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn)
 {
-  pn_hash_del(transport->remote_channels, ssn->state.remote_channel);
+  uint16_t channel = ssn->state.remote_channel;
   ssn->state.remote_channel = -2;
+  // note: may free the session:
+  pn_hash_del(transport->remote_channels, channel);
 }
 
+
+static void pn_transport_finalize(void *object);
+#define pn_transport_hashcode NULL
+#define pn_transport_compare NULL
+#define pn_transport_inspect NULL
+
 pn_transport_t *pn_transport()
 {
-  pn_transport_t *transport = (pn_transport_t *) malloc(sizeof(pn_transport_t));
+  static pn_class_t clazz = PN_CLASS(pn_transport);
+  pn_transport_t *transport = (pn_transport_t *) pn_new(sizeof(pn_transport_t),
+                                                        &clazz);
   if (!transport) return NULL;
-  transport->output_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+
   transport->output_buf = (char *) malloc(transport->output_size);
   if (!transport->output_buf) {
-    free(transport);
+    pn_transport_free(transport);
     return NULL;
   }
-  transport->input_size =  PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+
   transport->input_buf = (char *) malloc(transport->input_size);
   if (!transport->input_buf) {
-    free(transport->output_buf);
-    free(transport);
+    pn_transport_free(transport);
     return NULL;
   }
-
-  transport->connection = NULL;
-  pn_transport_init(transport);
   return transport;
 }
 
 void pn_transport_free(pn_transport_t *transport)
 {
   if (!transport) return;
+  assert(!transport->freed);
+  transport->freed = true;
+  // once the application frees the transport, no further I/O
+  // processing can be done to the connection:
+  pn_transport_unbind(transport);
+  pn_decref(transport);
+}
+
+static void pn_transport_finalize(void *object)
+{
+  pn_transport_t *transport = (pn_transport_t *) object;
 
   pn_ssl_free(transport->ssl);
   pn_sasl_free(transport->sasl);
@@ -242,10 +268,9 @@ void pn_transport_free(pn_transport_t *t
   pn_condition_tini(&transport->remote_condition);
   pn_free(transport->local_channels);
   pn_free(transport->remote_channels);
-  free(transport->input_buf);
-  free(transport->output_buf);
+  if (transport->input_buf) free(transport->input_buf);
+  if (transport->output_buf) free(transport->output_buf);
   pn_free(transport->scratch);
-  free(transport);
 }
 
 int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
@@ -255,6 +280,7 @@ int pn_transport_bind(pn_transport_t *tr
   if (connection->transport) return PN_STATE_ERR;
   transport->connection = connection;
   connection->transport = transport;
+  pn_incref(connection);
   if (transport->open_rcvd) {
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
     pn_event_t *event = pn_collector_put(connection->collector, PN_CONNECTION_STATE);
@@ -276,7 +302,6 @@ int pn_transport_unbind(pn_transport_t *
 
   pn_connection_t *conn = transport->connection;
   transport->connection = NULL;
-  conn->transport = NULL;
 
   pn_session_t *ssn = pn_session_head(conn, 0);
   while (ssn) {
@@ -292,6 +317,8 @@ int pn_transport_unbind(pn_transport_t *
     endpoint = endpoint->endpoint_next;
   }
 
+  pn_connection_unbound(conn);
+  pn_decref(conn);
   return 0;
 }
 
@@ -306,10 +333,12 @@ static void pn_map_handle(pn_session_t *
   pn_hash_put(ssn->state.remote_handles, handle, link);
 }
 
-static void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link)
+void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link)
 {
-  pn_hash_del(ssn->state.remote_handles, link->state.remote_handle);
+  uint32_t handle = link->state.remote_handle;
   link->state.remote_handle = -2;
+  // may delete link:
+  pn_hash_del(ssn->state.remote_handles, handle);
 }
 
 pn_link_t *pn_handle_state(pn_session_t *ssn, uint32_t handle)
@@ -661,14 +690,12 @@ int pn_do_attach(pn_dispatcher_t *disp)
 
 int pn_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t *link);
 
-void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery)
+// free the delivery
+static void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery)
 {
   assert(!delivery->work);
-  if (delivery->state.init) {
-    pn_delivery_map_del(db, delivery);
-  }
-  pn_real_settle(delivery);
   pn_clear_tpwork(delivery);
+  pn_real_settle(delivery);
 }
 
 int pn_do_transfer(pn_dispatcher_t *disp)
@@ -913,12 +940,13 @@ int pn_do_detach(pn_dispatcher_t *disp)
     return pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", disp->channel);
   }
   pn_link_t *link = pn_handle_state(ssn, handle);
+  if (!link) {
+    return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle);
+  }
 
   err = pn_scan_error(disp->args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH);
   if (err) return err;
 
-  pn_unmap_handle(ssn, link);
-
   if (closed)
   {
     PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED);
@@ -930,6 +958,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
     // TODO: implement
   }
 
+  pn_unmap_handle(ssn, link);
   return 0;
 }
 
@@ -939,12 +968,12 @@ int pn_do_end(pn_dispatcher_t *disp)
   pn_session_t *ssn = pn_channel_state(transport, disp->channel);
   int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT);
   if (err) return err;
-  pn_unmap_channel(transport, ssn);
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
   pn_event_t *event = pn_collector_put(transport->connection->collector, PN_SESSION_STATE);
   if (event) {
     pn_event_init_session(event, ssn);
   }
+  pn_unmap_channel(transport, ssn);
   return 0;
 }
 
@@ -1385,8 +1414,9 @@ int pn_post_disp(pn_transport_t *transpo
   return 0;
 }
 
-int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery)
+int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, bool *settle)
 {
+  *settle = false;
   pn_link_t *link = delivery->link;
   pn_session_state_t *ssn_state = &link->session->state;
   pn_link_state_t *link_state = &link->state;
@@ -1435,15 +1465,13 @@ int pn_process_tpwork_sender(pn_transpor
     if (err) return err;
   }
 
-  if (delivery->local.settled && state && state->sent) {
-    pn_full_settle(&ssn_state->outgoing, delivery);
-  }
-
+  *settle = delivery->local.settled && state && state->sent;
   return 0;
 }
 
-int pn_process_tpwork_receiver(pn_transport_t *transport, pn_delivery_t *delivery)
+int pn_process_tpwork_receiver(pn_transport_t *transport, pn_delivery_t *delivery, bool *settle)
 {
+  *settle = false;
   pn_link_t *link = delivery->link;
   // XXX: need to prevent duplicate disposition sending
   pn_session_t *ssn = link->session;
@@ -1452,16 +1480,13 @@ int pn_process_tpwork_receiver(pn_transp
     if (err) return err;
   }
 
-  if (delivery->local.settled) {
-    pn_full_settle(&ssn->state.incoming, delivery);
-  }
-
   // XXX: need to centralize this policy and improve it
   if (!ssn->state.incoming_window) {
     int err = pn_post_flow(transport, ssn, link);
     if (err) return err;
   }
 
+  *settle = delivery->local.settled;
   return 0;
 }
 
@@ -1474,17 +1499,23 @@ int pn_process_tpwork(pn_transport_t *tr
     while (delivery)
     {
       pn_delivery_t *tp_next = delivery->tpwork_next;
+      bool settle = false;
 
       pn_link_t *link = delivery->link;
+      pn_delivery_map_t *dm = NULL;
       if (pn_link_is_sender(link)) {
-        int err = pn_process_tpwork_sender(transport, delivery);
+        dm = &link->session->state.outgoing;
+        int err = pn_process_tpwork_sender(transport, delivery, &settle);
         if (err) return err;
       } else {
-        int err = pn_process_tpwork_receiver(transport, delivery);
+        dm = &link->session->state.incoming;
+        int err = pn_process_tpwork_receiver(transport, delivery, &settle);
         if (err) return err;
       }
 
-      if (!pn_delivery_buffered(delivery)) {
+      if (settle) {
+        pn_full_settle(dm, delivery);
+      } else if (!pn_delivery_buffered(delivery)) {
         pn_clear_tpwork(delivery);
       }
 
@@ -1561,6 +1592,7 @@ int pn_process_link_teardown(pn_transpor
       int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH,
                               state->local_handle, true, (bool) name, ERROR, name, description, info);
       if (err) return err;
+      pn_hash_del(ssn_state->local_handles, state->local_handle);
       state->local_handle = -2;
     }
 
@@ -1617,6 +1649,7 @@ int pn_process_ssn_teardown(pn_transport
       int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END,
                               (bool) name, ERROR, name, description, info);
       if (err) return err;
+      pn_hash_del(transport->local_channels, state->local_channel);
       state->local_channel = -2;
     }
 

Modified: qpid/proton/trunk/proton-c/src/transport/transport.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.h?rev=1570659&r1=1570658&r2=1570659&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.h (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.h Fri Feb 21 18:50:36 2014
@@ -25,5 +25,7 @@
 void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next);
 void pn_delivery_map_del(pn_delivery_map_t *db, pn_delivery_t *delivery);
 void pn_delivery_map_free(pn_delivery_map_t *db);
+void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link);
+void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn);
 
 #endif /* transport.h */



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org