You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/02/13 04:01:55 UTC

svn commit: r1567834 - in /qpid/proton/trunk: proton-c/bindings/python/proton.py proton-c/src/engine/engine-internal.h proton-c/src/engine/engine.c proton-c/src/engine/event.c proton-c/src/transport/transport.c tests/python/proton_tests/engine.py

Author: rhs
Date: Thu Feb 13 03:01:55 2014
New Revision: 1567834

URL: http://svn.apache.org/r1567834
Log:
added test for delivery events; added ref counting for events; fixed spurious events

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/engine/event.c
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/tests/python/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Feb 13 03:01:55 2014
@@ -2161,7 +2161,10 @@ class Connection(Endpoint):
     return pn_connection_remote_condition(self._conn)
 
   def collect(self, collector):
-    pn_connection_collect(self._conn, collector._impl)
+    if collector is None:
+      pn_connection_collect(self._conn, None)
+    else:
+      pn_connection_collect(self._conn, collector._impl)
 
   def _get_container(self):
     return pn_connection_get_container(self._conn)

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Feb 13 03:01:55 2014
@@ -301,7 +301,7 @@ pn_timestamp_t pn_io_layer_tick_passthru
 
 void pn_condition_init(pn_condition_t *condition);
 void pn_condition_tini(pn_condition_t *condition);
-void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit);
 void pn_real_settle(pn_delivery_t *delivery);
 void pn_clear_tpwork(pn_delivery_t *delivery);
 void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Feb 13 03:01:55 2014
@@ -51,20 +51,18 @@ pn_connection_t *pn_ep_get_connection(pn
   return NULL;
 }
 
-void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
-
 void pn_open(pn_endpoint_t *endpoint)
 {
   // TODO: do we care about the current state?
   PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE);
-  pn_modified(pn_ep_get_connection(endpoint), endpoint);
+  pn_modified(pn_ep_get_connection(endpoint), endpoint, true);
 }
 
 void pn_close(pn_endpoint_t *endpoint)
 {
   // TODO: do we care about the current state?
   PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED);
-  pn_modified(pn_ep_get_connection(endpoint), endpoint);
+  pn_modified(pn_ep_get_connection(endpoint), endpoint, true);
 }
 
 void pn_connection_reset(pn_connection_t *connection)
@@ -88,7 +86,7 @@ void pn_endpoint_tini(pn_endpoint_t *end
 
 void pn_connection_free(pn_connection_t *connection)
 {
-  pn_free(connection);
+  pn_decref(connection);
 }
 
 void *pn_connection_get_context(pn_connection_t *conn)
@@ -145,8 +143,11 @@ void pn_session_close(pn_session_t *sess
 
 void pn_session_free(pn_session_t *session)
 {
-  if (session && session->connection)
+  if (session && session->connection) {
     pn_remove_session(session->connection, session);
+    pn_endpoint_t *endpoint = (pn_endpoint_t *) session;
+    LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
+  }
 }
 
 void *pn_session_get_context(pn_session_t *session)
@@ -431,7 +432,7 @@ void pn_add_tpwork(pn_delivery_t *delive
     LL_ADD(connection, tpwork, delivery);
     delivery->tpwork = true;
   }
-  pn_modified(connection, &connection->endpoint);
+  pn_modified(connection, &connection->endpoint, true);
 }
 
 void pn_clear_tpwork(pn_delivery_t *delivery)
@@ -457,15 +458,18 @@ void pn_dump(pn_connection_t *conn)
   printf("\n");
 }
 
-void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit)
 {
   if (!endpoint->modified) {
     LL_ADD(connection, transport, endpoint);
     endpoint->modified = true;
   }
-  pn_event_t *event = pn_collector_put(connection->collector, PN_TRANSPORT);
-  if (event) {
-    pn_event_init_connection(event, connection);
+
+  if (emit) {
+    pn_event_t *event = pn_collector_put(connection->collector, PN_TRANSPORT);
+    if (event) {
+      pn_event_init_connection(event, connection);
+    }
   }
 }
 
@@ -1307,7 +1311,7 @@ int pn_link_drained(pn_link_t *link)
     if (link->drain && link->credit > 0) {
       link->drained = link->credit;
       link->credit = 0;
-      pn_modified(link->session->connection, &link->endpoint);
+      pn_modified(link->session->connection, &link->endpoint, true);
       drained = link->drained;
     }
   } else {
@@ -1345,7 +1349,7 @@ void pn_link_flow(pn_link_t *receiver, i
   assert(receiver);
   assert(pn_link_is_receiver(receiver));
   receiver->credit += credit;
-  pn_modified(receiver->session->connection, &receiver->endpoint);
+  pn_modified(receiver->session->connection, &receiver->endpoint, true);
   if (!receiver->drain_flag_mode) {
     pn_link_set_drain(receiver, false);
     receiver->drain_flag_mode = false;
@@ -1366,7 +1370,7 @@ void pn_link_set_drain(pn_link_t *receiv
   assert(receiver);
   assert(pn_link_is_receiver(receiver));
   receiver->drain = drain;
-  pn_modified(receiver->session->connection, &receiver->endpoint);
+  pn_modified(receiver->session->connection, &receiver->endpoint, true);
   receiver->drain_flag_mode = true;
 }
 

Modified: qpid/proton/trunk/proton-c/src/engine/event.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/event.c?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/event.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/event.c Thu Feb 13 03:01:55 2014
@@ -136,6 +136,11 @@ bool pn_collector_pop(pn_collector_t *co
 
   event->next = collector->free_head;
   collector->free_head = event;
+
+  if (event->connection) {
+    pn_decref(event->connection);
+  }
+
   return true;
 }
 
@@ -191,6 +196,7 @@ void pn_event_init_connection(pn_event_t
 {
   event->connection = connection;
   pn_event_init_transport(event, event->connection->transport);
+  pn_incref(event->connection);
 }
 
 void pn_event_init_session(pn_event_t *event, pn_session_t *session)

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Thu Feb 13 03:01:55 2014
@@ -288,7 +288,7 @@ int pn_transport_unbind(pn_transport_t *
   pn_endpoint_t *endpoint = conn->endpoint_head;
   while (endpoint) {
     pn_condition_clear(&endpoint->remote_condition);
-    pn_modified(conn, endpoint);
+    pn_modified(conn, endpoint, true);
     endpoint = endpoint->endpoint_next;
   }
 
@@ -889,6 +889,11 @@ int pn_do_disposition(pn_dispatcher_t *d
       remote->settled = settled;
       delivery->updated = true;
       pn_work_update(transport->connection, delivery);
+
+      pn_event_t *event = pn_collector_put(transport->connection->collector, PN_DELIVERY);
+      if (event) {
+        pn_event_init_delivery(event, delivery);
+      }
     }
   }
 
@@ -951,7 +956,7 @@ int pn_do_close(pn_dispatcher_t *disp)
   if (err) return err;
   transport->close_rcvd = true;
   PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
-  pn_event_t *event = pn_collector_put(transport->connection->collector, PN_LINK_STATE);
+  pn_event_t *event = pn_collector_put(transport->connection->collector, PN_CONNECTION_STATE);
   if (event) {
     pn_event_init_connection(event, conn);
   }
@@ -1329,7 +1334,7 @@ int pn_post_disp(pn_transport_t *transpo
   pn_link_t *link = delivery->link;
   pn_session_t *ssn = link->session;
   pn_session_state_t *ssn_state = &ssn->state;
-  pn_modified(transport->connection, &link->session->endpoint);
+  pn_modified(transport->connection, &link->session->endpoint, false);
   pn_delivery_state_t *state = &delivery->state;
   assert(state->init);
   bool role = (link->endpoint.type == RECEIVER);
@@ -1667,7 +1672,7 @@ int pn_process(pn_transport_t *transport
   if ((err = pn_phase(transport, pn_process_conn_teardown))) return err;
 
   if (transport->connection->tpwork_head) {
-    pn_modified(transport->connection, &transport->connection->endpoint);
+    pn_modified(transport->connection, &transport->connection->endpoint, false);
   }
 
   return 0;

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Feb 13 03:01:55 2014
@@ -1923,6 +1923,10 @@ class EventTest(Test):
   def expect(self, collector, *types):
     events = self.list(collector)
     assert types == tuple([e.type for e in events]), (types, events)
+    if len(events) == 1:
+      return events[0]
+    elif len(events) > 1:
+      return events
 
   def testEndpointEvents(self):
     c1, c2 = self.connection()
@@ -1957,6 +1961,7 @@ class EventTest(Test):
     rcv.flow(10)
     self.pump()
     self.expect(coll, Event.LINK_FLOW)
+    return snd, rcv, coll
 
   def testDeliveryEvents(self):
     snd, rcv = self.link("test-link")
@@ -1974,3 +1979,22 @@ class EventTest(Test):
     snd.open()
     self.pump()
     self.expect(coll, Event.LINK_STATE, Event.DELIVERY)
+
+  def testDeliveryEventsDisp(self):
+    # XXX: we can't let coll go out of scope or the connection will be
+    # pointing to garbage
+    snd, rcv, coll = self.testFlowEvents()
+    snd.open()
+    dlv = snd.delivery("delivery")
+    snd.send("Hello World!")
+    assert snd.advance()
+    self.expect(coll, Event.TRANSPORT, Event.TRANSPORT, Event.TRANSPORT)
+    self.pump()
+    self.expect(coll)
+    rdlv = rcv.current
+    assert rdlv != None
+    assert rdlv.tag == "delivery"
+    rdlv.update(Delivery.ACCEPTED)
+    self.pump()
+    event = self.expect(coll, Event.DELIVERY)
+    assert event.delivery == dlv



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org