You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/02/13 04:01:55 UTC
svn commit: r1567834 - in /qpid/proton/trunk:
proton-c/bindings/python/proton.py proton-c/src/engine/engine-internal.h
proton-c/src/engine/engine.c proton-c/src/engine/event.c
proton-c/src/transport/transport.c tests/python/proton_tests/engine.py
Author: rhs
Date: Thu Feb 13 03:01:55 2014
New Revision: 1567834
URL: http://svn.apache.org/r1567834
Log:
added test for delivery events; added ref counting for events; fixed spurious events
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/engine/event.c
qpid/proton/trunk/proton-c/src/transport/transport.c
qpid/proton/trunk/tests/python/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Feb 13 03:01:55 2014
@@ -2161,7 +2161,10 @@ class Connection(Endpoint):
return pn_connection_remote_condition(self._conn)
def collect(self, collector):
- pn_connection_collect(self._conn, collector._impl)
+ if collector is None:
+ pn_connection_collect(self._conn, None)
+ else:
+ pn_connection_collect(self._conn, collector._impl)
def _get_container(self):
return pn_connection_get_container(self._conn)
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Feb 13 03:01:55 2014
@@ -301,7 +301,7 @@ pn_timestamp_t pn_io_layer_tick_passthru
void pn_condition_init(pn_condition_t *condition);
void pn_condition_tini(pn_condition_t *condition);
-void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit);
void pn_real_settle(pn_delivery_t *delivery);
void pn_clear_tpwork(pn_delivery_t *delivery);
void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Feb 13 03:01:55 2014
@@ -51,20 +51,18 @@ pn_connection_t *pn_ep_get_connection(pn
return NULL;
}
-void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
-
void pn_open(pn_endpoint_t *endpoint)
{
// TODO: do we care about the current state?
PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE);
- pn_modified(pn_ep_get_connection(endpoint), endpoint);
+ pn_modified(pn_ep_get_connection(endpoint), endpoint, true);
}
void pn_close(pn_endpoint_t *endpoint)
{
// TODO: do we care about the current state?
PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED);
- pn_modified(pn_ep_get_connection(endpoint), endpoint);
+ pn_modified(pn_ep_get_connection(endpoint), endpoint, true);
}
void pn_connection_reset(pn_connection_t *connection)
@@ -88,7 +86,7 @@ void pn_endpoint_tini(pn_endpoint_t *end
void pn_connection_free(pn_connection_t *connection)
{
- pn_free(connection);
+ pn_decref(connection);
}
void *pn_connection_get_context(pn_connection_t *conn)
@@ -145,8 +143,11 @@ void pn_session_close(pn_session_t *sess
void pn_session_free(pn_session_t *session)
{
- if (session && session->connection)
+ if (session && session->connection) {
pn_remove_session(session->connection, session);
+ pn_endpoint_t *endpoint = (pn_endpoint_t *) session;
+ LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
+ }
}
void *pn_session_get_context(pn_session_t *session)
@@ -431,7 +432,7 @@ void pn_add_tpwork(pn_delivery_t *delive
LL_ADD(connection, tpwork, delivery);
delivery->tpwork = true;
}
- pn_modified(connection, &connection->endpoint);
+ pn_modified(connection, &connection->endpoint, true);
}
void pn_clear_tpwork(pn_delivery_t *delivery)
@@ -457,15 +458,18 @@ void pn_dump(pn_connection_t *conn)
printf("\n");
}
-void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit)
{
if (!endpoint->modified) {
LL_ADD(connection, transport, endpoint);
endpoint->modified = true;
}
- pn_event_t *event = pn_collector_put(connection->collector, PN_TRANSPORT);
- if (event) {
- pn_event_init_connection(event, connection);
+
+ if (emit) {
+ pn_event_t *event = pn_collector_put(connection->collector, PN_TRANSPORT);
+ if (event) {
+ pn_event_init_connection(event, connection);
+ }
}
}
@@ -1307,7 +1311,7 @@ int pn_link_drained(pn_link_t *link)
if (link->drain && link->credit > 0) {
link->drained = link->credit;
link->credit = 0;
- pn_modified(link->session->connection, &link->endpoint);
+ pn_modified(link->session->connection, &link->endpoint, true);
drained = link->drained;
}
} else {
@@ -1345,7 +1349,7 @@ void pn_link_flow(pn_link_t *receiver, i
assert(receiver);
assert(pn_link_is_receiver(receiver));
receiver->credit += credit;
- pn_modified(receiver->session->connection, &receiver->endpoint);
+ pn_modified(receiver->session->connection, &receiver->endpoint, true);
if (!receiver->drain_flag_mode) {
pn_link_set_drain(receiver, false);
receiver->drain_flag_mode = false;
@@ -1366,7 +1370,7 @@ void pn_link_set_drain(pn_link_t *receiv
assert(receiver);
assert(pn_link_is_receiver(receiver));
receiver->drain = drain;
- pn_modified(receiver->session->connection, &receiver->endpoint);
+ pn_modified(receiver->session->connection, &receiver->endpoint, true);
receiver->drain_flag_mode = true;
}
Modified: qpid/proton/trunk/proton-c/src/engine/event.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/event.c?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/event.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/event.c Thu Feb 13 03:01:55 2014
@@ -136,6 +136,11 @@ bool pn_collector_pop(pn_collector_t *co
event->next = collector->free_head;
collector->free_head = event;
+
+ if (event->connection) {
+ pn_decref(event->connection);
+ }
+
return true;
}
@@ -191,6 +196,7 @@ void pn_event_init_connection(pn_event_t
{
event->connection = connection;
pn_event_init_transport(event, event->connection->transport);
+ pn_incref(event->connection);
}
void pn_event_init_session(pn_event_t *event, pn_session_t *session)
Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Thu Feb 13 03:01:55 2014
@@ -288,7 +288,7 @@ int pn_transport_unbind(pn_transport_t *
pn_endpoint_t *endpoint = conn->endpoint_head;
while (endpoint) {
pn_condition_clear(&endpoint->remote_condition);
- pn_modified(conn, endpoint);
+ pn_modified(conn, endpoint, true);
endpoint = endpoint->endpoint_next;
}
@@ -889,6 +889,11 @@ int pn_do_disposition(pn_dispatcher_t *d
remote->settled = settled;
delivery->updated = true;
pn_work_update(transport->connection, delivery);
+
+ pn_event_t *event = pn_collector_put(transport->connection->collector, PN_DELIVERY);
+ if (event) {
+ pn_event_init_delivery(event, delivery);
+ }
}
}
@@ -951,7 +956,7 @@ int pn_do_close(pn_dispatcher_t *disp)
if (err) return err;
transport->close_rcvd = true;
PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
- pn_event_t *event = pn_collector_put(transport->connection->collector, PN_LINK_STATE);
+ pn_event_t *event = pn_collector_put(transport->connection->collector, PN_CONNECTION_STATE);
if (event) {
pn_event_init_connection(event, conn);
}
@@ -1329,7 +1334,7 @@ int pn_post_disp(pn_transport_t *transpo
pn_link_t *link = delivery->link;
pn_session_t *ssn = link->session;
pn_session_state_t *ssn_state = &ssn->state;
- pn_modified(transport->connection, &link->session->endpoint);
+ pn_modified(transport->connection, &link->session->endpoint, false);
pn_delivery_state_t *state = &delivery->state;
assert(state->init);
bool role = (link->endpoint.type == RECEIVER);
@@ -1667,7 +1672,7 @@ int pn_process(pn_transport_t *transport
if ((err = pn_phase(transport, pn_process_conn_teardown))) return err;
if (transport->connection->tpwork_head) {
- pn_modified(transport->connection, &transport->connection->endpoint);
+ pn_modified(transport->connection, &transport->connection->endpoint, false);
}
return 0;
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1567834&r1=1567833&r2=1567834&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Feb 13 03:01:55 2014
@@ -1923,6 +1923,10 @@ class EventTest(Test):
def expect(self, collector, *types):
events = self.list(collector)
assert types == tuple([e.type for e in events]), (types, events)
+ if len(events) == 1:
+ return events[0]
+ elif len(events) > 1:
+ return events
def testEndpointEvents(self):
c1, c2 = self.connection()
@@ -1957,6 +1961,7 @@ class EventTest(Test):
rcv.flow(10)
self.pump()
self.expect(coll, Event.LINK_FLOW)
+ return snd, rcv, coll
def testDeliveryEvents(self):
snd, rcv = self.link("test-link")
@@ -1974,3 +1979,22 @@ class EventTest(Test):
snd.open()
self.pump()
self.expect(coll, Event.LINK_STATE, Event.DELIVERY)
+
+ def testDeliveryEventsDisp(self):
+ # XXX: we can't let coll go out of scope or the connection will be
+ # pointing to garbage
+ snd, rcv, coll = self.testFlowEvents()
+ snd.open()
+ dlv = snd.delivery("delivery")
+ snd.send("Hello World!")
+ assert snd.advance()
+ self.expect(coll, Event.TRANSPORT, Event.TRANSPORT, Event.TRANSPORT)
+ self.pump()
+ self.expect(coll)
+ rdlv = rcv.current
+ assert rdlv != None
+ assert rdlv.tag == "delivery"
+ rdlv.update(Delivery.ACCEPTED)
+ self.pump()
+ event = self.expect(coll, Event.DELIVERY)
+ assert event.delivery == dlv
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org