You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/08/26 17:49:40 UTC
svn commit: r1620643 - in /qpid/proton/trunk: proton-c/src/engine/
proton-c/src/transport/
proton-j/src/main/java/org/apache/qpid/proton/engine/impl/
proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/
tests/python/proton_tests/
Author: rhs
Date: Tue Aug 26 15:49:39 2014
New Revision: 1620643
URL: http://svn.apache.org/r1620643
Log:
PROTON-641: fixed link cleanup on connection/session abort; made free not close automatically
Modified:
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/transport/transport.c
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/tests/python/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Aug 26 15:49:39 2014
@@ -111,8 +111,6 @@ void pn_endpoint_tini(pn_endpoint_t *end
void pn_connection_free(pn_connection_t *connection)
{
assert(!connection->endpoint.freed);
- if (pn_connection_state(connection) & PN_LOCAL_ACTIVE)
- pn_connection_close(connection);
// free those endpoints that haven't been freed by the application
LL_REMOVE(connection, endpoint, &connection->endpoint);
while (connection->endpoint_head) {
@@ -226,8 +224,6 @@ void pn_session_free(pn_session_t *sessi
pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0);
pn_link_free(link);
}
- if (pn_session_state(session) & PN_LOCAL_ACTIVE)
- pn_session_close(session);
pn_remove_session(session->connection, session);
pn_endpoint_t *endpoint = (pn_endpoint_t *) session;
LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
@@ -282,8 +278,6 @@ void pn_terminus_free(pn_terminus_t *ter
void pn_link_free(pn_link_t *link)
{
assert(!link->endpoint.freed);
- if (pn_link_state(link) & PN_LOCAL_ACTIVE)
- pn_link_close(link);
pn_remove_link(link->session, link);
pn_endpoint_t *endpoint = (pn_endpoint_t *) link;
LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Tue Aug 26 15:49:39 2014
@@ -182,14 +182,20 @@ pn_session_t *pn_channel_state(pn_transp
return (pn_session_t *) pn_hash_get(transport->remote_channels, channel);
}
-static void pn_map_channel(pn_transport_t *transport, uint16_t channel, pn_session_t *session)
+static void pni_map_remote_channel(pn_session_t *session, uint16_t channel)
{
+ pn_transport_t *transport = session->connection->transport;
pn_hash_put(transport->remote_channels, channel, session);
session->state.remote_channel = channel;
}
-void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn)
+void pni_transport_unbind_handles(pn_hash_t *handles);
+
+static void pni_unmap_remote_channel(pn_session_t *ssn)
{
+ // XXX: should really update link state also
+ pni_transport_unbind_handles(ssn->state.remote_handles);
+ pn_transport_t *transport = ssn->connection->transport;
uint16_t channel = ssn->state.remote_channel;
ssn->state.remote_channel = -2;
// note: may free the session:
@@ -327,18 +333,18 @@ pn_error_t *pn_transport_error(pn_transp
return NULL;
}
-static void pn_map_handle(pn_session_t *ssn, uint32_t handle, pn_link_t *link)
+static void pni_map_remote_handle(pn_link_t *link, uint32_t handle)
{
link->state.remote_handle = handle;
- pn_hash_put(ssn->state.remote_handles, handle, link);
+ pn_hash_put(link->session->state.remote_handles, handle, link);
}
-void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link)
+static void pni_unmap_remote_handle(pn_link_t *link)
{
- uint32_t handle = link->state.remote_handle;
+ uintptr_t handle = link->state.remote_handle;
link->state.remote_handle = -2;
// may delete link:
- pn_hash_del(ssn->state.remote_handles, handle);
+ pn_hash_del(link->session->state.remote_handles, handle);
}
pn_link_t *pn_handle_state(pn_session_t *ssn, uint32_t handle)
@@ -500,7 +506,7 @@ int pn_do_begin(pn_dispatcher_t *disp)
ssn = pn_session(transport->connection);
}
ssn->state.incoming_transfer_count = next;
- pn_map_channel(transport, disp->channel, ssn);
+ pni_map_remote_channel(ssn, disp->channel);
PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE);
pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_OPEN, ssn);
return 0;
@@ -618,7 +624,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
free(strheap);
}
- pn_map_handle(ssn, handle, link);
+ pni_map_remote_handle(link, handle);
PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
pn_terminus_t *rsrc = &link->remote_source;
if (source.start || src_dynamic) {
@@ -937,7 +943,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
// TODO: implement
}
- pn_unmap_handle(ssn, link);
+ pni_unmap_remote_handle(link);
return 0;
}
@@ -949,7 +955,7 @@ int pn_do_end(pn_dispatcher_t *disp)
if (err) return err;
PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_CLOSE, ssn);
- pn_unmap_channel(transport, ssn);
+ pni_unmap_remote_channel(ssn);
return 0;
}
@@ -1191,6 +1197,15 @@ size_t pn_session_incoming_window(pn_ses
}
}
+static void pni_map_local_channel(pn_session_t *ssn)
+{
+ pn_transport_t *transport = ssn->connection->transport;
+ pn_session_state_t *state = &ssn->state;
+ uint16_t channel = allocate_alias(transport->local_channels);
+ state->local_channel = channel;
+ pn_hash_put(transport->local_channels, channel, ssn);
+}
+
int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SESSION && transport->open_sent)
@@ -1199,16 +1214,14 @@ int pn_process_ssn_setup(pn_transport_t
pn_session_state_t *state = &ssn->state;
if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
{
- uint16_t channel = allocate_alias(transport->local_channels);
+ pni_map_local_channel(ssn);
state->incoming_window = pn_session_incoming_window(ssn);
state->outgoing_window = pn_session_outgoing_window(ssn);
- pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN,
+ pn_post_frame(transport->disp, state->local_channel, "DL[?HIII]", BEGIN,
((int16_t) state->remote_channel >= 0), state->remote_channel,
state->outgoing_transfer_count,
state->incoming_window,
state->outgoing_window);
- state->local_channel = channel;
- pn_hash_put(transport->local_channels, channel, ssn);
}
}
@@ -1231,6 +1244,13 @@ static const char *expiry_symbol(pn_expi
return NULL;
}
+static void pni_map_local_handle(pn_link_t *link) {
+ pn_link_state_t *state = &link->state;
+ pn_session_state_t *ssn_state = &link->session->state;
+ state->local_handle = allocate_alias(ssn_state->local_handles);
+ pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+}
+
int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (transport->open_sent && (endpoint->type == SENDER ||
@@ -1242,8 +1262,7 @@ int pn_process_link_setup(pn_transport_t
if (((int16_t) ssn_state->local_channel >= 0) &&
!(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1)
{
- state->local_handle = allocate_alias(ssn_state->local_handles);
- pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+ pni_map_local_handle(link);
const pn_distribution_mode_t dist_mode = link->source.distribution_mode;
int err = pn_post_frame(transport->disp, ssn_state->local_channel,
"DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH,
@@ -1536,6 +1555,14 @@ int pn_process_flow_sender(pn_transport_
return 0;
}
+static void pni_unmap_local_handle(pn_link_t *link) {
+ pn_link_state_t *state = &link->state;
+ uintptr_t handle = state->local_handle;
+ state->local_handle = -2;
+ // may delete link
+ pn_hash_del(link->session->state.local_handles, handle);
+}
+
int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SENDER || endpoint->type == RECEIVER)
@@ -1564,8 +1591,7 @@ int pn_process_link_teardown(pn_transpor
int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH,
state->local_handle, true, (bool) name, ERROR, name, description, info);
if (err) return err;
- pn_hash_del(ssn_state->local_handles, state->local_handle);
- state->local_handle = -2;
+ pni_unmap_local_handle(link);
}
pn_clear_modified(transport->connection, endpoint);
@@ -1597,6 +1623,17 @@ bool pn_pointful_buffering(pn_transport_
return false;
}
+static void pni_unmap_local_channel(pn_session_t *ssn) {
+ // XXX: should really update link state also
+ pni_transport_unbind_handles(ssn->state.local_handles);
+ pn_transport_t *transport = ssn->connection->transport;
+ pn_session_state_t *state = &ssn->state;
+ uintptr_t channel = state->local_channel;
+ state->local_channel = -2;
+ // may delete session
+ pn_hash_del(transport->local_channels, channel);
+}
+
int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SESSION)
@@ -1606,7 +1643,9 @@ int pn_process_ssn_teardown(pn_transport
if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0
&& !transport->close_sent)
{
- if (pn_pointful_buffering(transport, session)) return 0;
+ if (pn_pointful_buffering(transport, session)) {
+ return 0;
+ }
const char *name = NULL;
const char *description = NULL;
@@ -1621,8 +1660,7 @@ int pn_process_ssn_teardown(pn_transport
int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END,
(bool) name, ERROR, name, description, info);
if (err) return err;
- pn_hash_del(transport->local_channels, state->local_channel);
- state->local_channel = -2;
+ pni_unmap_local_channel(session);
}
pn_clear_modified(transport->connection, endpoint);
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java Tue Aug 26 15:49:39 2014
@@ -190,11 +190,6 @@ public abstract class EndpointImpl imple
freed = true;
doFree();
-
- if (_localState == EndpointState.ACTIVE) {
- close();
- }
-
decref();
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Tue Aug 26 15:49:39 2014
@@ -400,10 +400,6 @@ public class TransportImpl extends Endpo
writeFrame(transportSession.getLocalChannel(), detach, null, null);
-
- // TODO - temporary hack for PROTON-154, this line should be removed and replaced
- // with proper handling for closed links
- link.free();
}
endpoint.clearModified();
@@ -1158,14 +1154,13 @@ public class TransportImpl extends Endpo
LinkImpl link = transportLink.getLink();
transportLink.receivedDetach();
transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
+ _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
transportLink.clearRemoteHandle();
link.setRemoteState(EndpointState.CLOSED);
if(detach.getError() != null)
{
link.getRemoteCondition().copyFrom(detach.getError());
}
-
- _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
}
else
{
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Tue Aug 26 15:49:39 2014
@@ -1444,14 +1444,16 @@ public class MessengerImpl implements Me
{
_receivers++;
_blocked.add((Receiver)link);
+ link.setContext(Boolean.TRUE);
}
}
// a link is being removed, account for it.
private void linkRemoved(Link _link)
{
- if (_link instanceof Receiver)
+ if (_link instanceof Receiver && (Boolean) _link.getContext())
{
+ _link.setContext(Boolean.FALSE);
Receiver link = (Receiver)_link;
assert _receivers > 0;
_receivers--;
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Tue Aug 26 15:49:39 2014
@@ -2067,14 +2067,11 @@ class DeliveryTest(Test):
def testCustom(self):
self.testDisposition(type=0x12345, value=CustomValue([1, 2, 3]))
-class EventTest(Test):
+class CollectorTest(Test):
def setup(self):
self.collector = Collector()
- def teardown(self):
- self.cleanup()
-
def drain(self):
result = []
while True:
@@ -2104,6 +2101,11 @@ class EventTest(Test):
assert False, "actual events %s did not match any of the expected sequences: %s" % (events, sequences)
+class EventTest(CollectorTest):
+
+ def teardown(self):
+ self.cleanup()
+
def testEndpointEvents(self):
c1, c2 = self.connection()
c1.collect(self.collector)
@@ -2145,13 +2147,11 @@ class EventTest(Test):
self.expect(Event.LINK_FINAL)
ssn2.free()
del ssn2
- self.expect(Event.SESSION_CLOSE, Event.TRANSPORT)
self.pump()
- self.expect(Event.SESSION_FINAL)
c1.free()
c1._transport.unbind()
- self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT,
- Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL)
+ self.expect(Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL,
+ Event.CONNECTION_FINAL)
def testConnectionINIT_FINAL(self):
c = Connection()
@@ -2215,12 +2215,8 @@ class EventTest(Test):
self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY)
rcv.session.connection._transport.unbind()
rcv.session.connection.free()
- self.expect_oneof((Event.TRANSPORT, Event.LINK_CLOSE, Event.SESSION_CLOSE,
- Event.CONNECTION_CLOSE, Event.LINK_FINAL,
- Event.SESSION_FINAL, Event.CONNECTION_FINAL),
- (Event.TRANSPORT, Event.LINK_CLOSE, Event.LINK_FINAL,
- Event.SESSION_CLOSE, Event.SESSION_FINAL,
- Event.CONNECTION_CLOSE, Event.CONNECTION_FINAL))
+ self.expect(Event.TRANSPORT, Event.LINK_FINAL, Event.SESSION_FINAL,
+ Event.CONNECTION_FINAL)
def testDeliveryEventsDisp(self):
snd, rcv = self.testFlowEvents()
@@ -2238,3 +2234,88 @@ class EventTest(Test):
self.pump()
event = self.expect(Event.DELIVERY)
assert event.delivery == dlv
+
+class PeerTest(CollectorTest):
+
+ def setup(self):
+ CollectorTest.setup(self)
+ self.connection = Connection()
+ self.connection.collect(self.collector)
+ self.transport = Transport()
+ self.transport.bind(self.connection)
+ self.peer = Connection()
+ self.peer_transport = Transport()
+ self.peer_transport.bind(self.peer)
+ self.peer_transport.trace(Transport.TRACE_OFF)
+
+ def pump(self):
+ pump(self.transport, self.peer_transport)
+
+class TeardownLeakTest(PeerTest):
+
+ def doLeak(self, local, remote):
+ self.connection.open()
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_OPEN, Event.TRANSPORT)
+
+ ssn = self.connection.session()
+ ssn.open()
+ self.expect(Event.SESSION_INIT, Event.SESSION_OPEN, Event.TRANSPORT)
+
+ snd = ssn.sender("sender")
+ snd.open()
+ self.expect(Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT)
+
+
+ self.pump()
+
+ self.peer.open()
+ self.peer.session_head(0).open()
+ self.peer.link_head(0).open()
+
+ self.pump()
+ self.expect_oneof((Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN,
+ Event.LINK_REMOTE_OPEN, Event.LINK_FLOW),
+ (Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN,
+ Event.LINK_REMOTE_OPEN))
+
+ if local:
+ snd.close() # ha!!
+ self.expect(Event.LINK_CLOSE, Event.TRANSPORT)
+ ssn.close()
+ self.expect(Event.SESSION_CLOSE, Event.TRANSPORT)
+ self.connection.close()
+ self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT)
+
+ if remote:
+ self.peer.link_head(0).close() # ha!!
+ self.peer.session_head(0).close()
+ self.peer.close()
+
+ self.pump()
+
+ if remote:
+ self.expect_oneof((Event.LINK_REMOTE_CLOSE, Event.SESSION_REMOTE_CLOSE,
+ Event.CONNECTION_REMOTE_CLOSE),
+ (Event.LINK_REMOTE_CLOSE, Event.LINK_FINAL,
+ Event.SESSION_REMOTE_CLOSE,
+ Event.CONNECTION_REMOTE_CLOSE))
+ else:
+ self.expect(Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE)
+
+ self.connection.free()
+ self.transport.unbind()
+
+ self.expect_oneof((Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL),
+ (Event.SESSION_FINAL, Event.CONNECTION_FINAL))
+
+ def testLocalRemoteLeak(self):
+ self.doLeak(True, True)
+
+ def testLocalLeak(self):
+ self.doLeak(True, False)
+
+ def testRemoteLeak(self):
+ self.doLeak(False, True)
+
+ def testLeak(self):
+ self.doLeak(False, False)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org