You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/08/26 17:49:40 UTC

svn commit: r1620643 - in /qpid/proton/trunk: proton-c/src/engine/ proton-c/src/transport/ proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ tests/python/proton_tests/

Author: rhs
Date: Tue Aug 26 15:49:39 2014
New Revision: 1620643

URL: http://svn.apache.org/r1620643
Log:
PROTON-641: fixed link cleanup on connection/session abort; made free not close automatically

Modified:
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/tests/python/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Aug 26 15:49:39 2014
@@ -111,8 +111,6 @@ void pn_endpoint_tini(pn_endpoint_t *end
 void pn_connection_free(pn_connection_t *connection)
 {
   assert(!connection->endpoint.freed);
-  if (pn_connection_state(connection) & PN_LOCAL_ACTIVE)
-    pn_connection_close(connection);
   // free those endpoints that haven't been freed by the application
   LL_REMOVE(connection, endpoint, &connection->endpoint);
   while (connection->endpoint_head) {
@@ -226,8 +224,6 @@ void pn_session_free(pn_session_t *sessi
     pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0);
     pn_link_free(link);
   }
-  if (pn_session_state(session) & PN_LOCAL_ACTIVE)
-    pn_session_close(session);
   pn_remove_session(session->connection, session);
   pn_endpoint_t *endpoint = (pn_endpoint_t *) session;
   LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
@@ -282,8 +278,6 @@ void pn_terminus_free(pn_terminus_t *ter
 void pn_link_free(pn_link_t *link)
 {
   assert(!link->endpoint.freed);
-  if (pn_link_state(link) & PN_LOCAL_ACTIVE)
-    pn_link_close(link);
   pn_remove_link(link->session, link);
   pn_endpoint_t *endpoint = (pn_endpoint_t *) link;
   LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Tue Aug 26 15:49:39 2014
@@ -182,14 +182,20 @@ pn_session_t *pn_channel_state(pn_transp
   return (pn_session_t *) pn_hash_get(transport->remote_channels, channel);
 }
 
-static void pn_map_channel(pn_transport_t *transport, uint16_t channel, pn_session_t *session)
+static void pni_map_remote_channel(pn_session_t *session, uint16_t channel)
 {
+  pn_transport_t *transport = session->connection->transport;
   pn_hash_put(transport->remote_channels, channel, session);
   session->state.remote_channel = channel;
 }
 
-void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn)
+void pni_transport_unbind_handles(pn_hash_t *handles);
+
+static void pni_unmap_remote_channel(pn_session_t *ssn)
 {
+  // XXX: should really update link state also
+  pni_transport_unbind_handles(ssn->state.remote_handles);
+  pn_transport_t *transport = ssn->connection->transport;
   uint16_t channel = ssn->state.remote_channel;
   ssn->state.remote_channel = -2;
   // note: may free the session:
@@ -327,18 +333,18 @@ pn_error_t *pn_transport_error(pn_transp
   return NULL;
 }
 
-static void pn_map_handle(pn_session_t *ssn, uint32_t handle, pn_link_t *link)
+static void pni_map_remote_handle(pn_link_t *link, uint32_t handle)
 {
   link->state.remote_handle = handle;
-  pn_hash_put(ssn->state.remote_handles, handle, link);
+  pn_hash_put(link->session->state.remote_handles, handle, link);
 }
 
-void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link)
+static void pni_unmap_remote_handle(pn_link_t *link)
 {
-  uint32_t handle = link->state.remote_handle;
+  uintptr_t handle = link->state.remote_handle;
   link->state.remote_handle = -2;
   // may delete link:
-  pn_hash_del(ssn->state.remote_handles, handle);
+  pn_hash_del(link->session->state.remote_handles, handle);
 }
 
 pn_link_t *pn_handle_state(pn_session_t *ssn, uint32_t handle)
@@ -500,7 +506,7 @@ int pn_do_begin(pn_dispatcher_t *disp)
     ssn = pn_session(transport->connection);
   }
   ssn->state.incoming_transfer_count = next;
-  pn_map_channel(transport, disp->channel, ssn);
+  pni_map_remote_channel(ssn, disp->channel);
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE);
   pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_OPEN, ssn);
   return 0;
@@ -618,7 +624,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
     free(strheap);
   }
 
-  pn_map_handle(ssn, handle, link);
+  pni_map_remote_handle(link, handle);
   PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
   pn_terminus_t *rsrc = &link->remote_source;
   if (source.start || src_dynamic) {
@@ -937,7 +943,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
     // TODO: implement
   }
 
-  pn_unmap_handle(ssn, link);
+  pni_unmap_remote_handle(link);
   return 0;
 }
 
@@ -949,7 +955,7 @@ int pn_do_end(pn_dispatcher_t *disp)
   if (err) return err;
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
   pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_CLOSE, ssn);
-  pn_unmap_channel(transport, ssn);
+  pni_unmap_remote_channel(ssn);
   return 0;
 }
 
@@ -1191,6 +1197,15 @@ size_t pn_session_incoming_window(pn_ses
   }
 }
 
+static void pni_map_local_channel(pn_session_t *ssn)
+{
+  pn_transport_t *transport = ssn->connection->transport;
+  pn_session_state_t *state = &ssn->state;
+  uint16_t channel = allocate_alias(transport->local_channels);
+  state->local_channel = channel;
+  pn_hash_put(transport->local_channels, channel, ssn);
+}
+
 int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
   if (endpoint->type == SESSION && transport->open_sent)
@@ -1199,16 +1214,14 @@ int pn_process_ssn_setup(pn_transport_t 
     pn_session_state_t *state = &ssn->state;
     if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
     {
-      uint16_t channel = allocate_alias(transport->local_channels);
+      pni_map_local_channel(ssn);
       state->incoming_window = pn_session_incoming_window(ssn);
       state->outgoing_window = pn_session_outgoing_window(ssn);
-      pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN,
+      pn_post_frame(transport->disp, state->local_channel, "DL[?HIII]", BEGIN,
                     ((int16_t) state->remote_channel >= 0), state->remote_channel,
                     state->outgoing_transfer_count,
                     state->incoming_window,
                     state->outgoing_window);
-      state->local_channel = channel;
-      pn_hash_put(transport->local_channels, channel, ssn);
     }
   }
 
@@ -1231,6 +1244,13 @@ static const char *expiry_symbol(pn_expi
   return NULL;
 }
 
+static void pni_map_local_handle(pn_link_t *link) {
+  pn_link_state_t *state = &link->state;
+  pn_session_state_t *ssn_state = &link->session->state;
+  state->local_handle = allocate_alias(ssn_state->local_handles);
+  pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+}
+
 int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
   if (transport->open_sent && (endpoint->type == SENDER ||
@@ -1242,8 +1262,7 @@ int pn_process_link_setup(pn_transport_t
     if (((int16_t) ssn_state->local_channel >= 0) &&
         !(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1)
     {
-      state->local_handle = allocate_alias(ssn_state->local_handles);
-      pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+      pni_map_local_handle(link);
       const pn_distribution_mode_t dist_mode = link->source.distribution_mode;
       int err = pn_post_frame(transport->disp, ssn_state->local_channel,
                               "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH,
@@ -1536,6 +1555,14 @@ int pn_process_flow_sender(pn_transport_
   return 0;
 }
 
+static void pni_unmap_local_handle(pn_link_t *link) {
+  pn_link_state_t *state = &link->state;
+  uintptr_t handle = state->local_handle;
+  state->local_handle = -2;
+  // may delete link
+  pn_hash_del(link->session->state.local_handles, handle);
+}
+
 int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
   if (endpoint->type == SENDER || endpoint->type == RECEIVER)
@@ -1564,8 +1591,7 @@ int pn_process_link_teardown(pn_transpor
       int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH,
                               state->local_handle, true, (bool) name, ERROR, name, description, info);
       if (err) return err;
-      pn_hash_del(ssn_state->local_handles, state->local_handle);
-      state->local_handle = -2;
+      pni_unmap_local_handle(link);
     }
 
     pn_clear_modified(transport->connection, endpoint);
@@ -1597,6 +1623,17 @@ bool pn_pointful_buffering(pn_transport_
   return false;
 }
 
+static void pni_unmap_local_channel(pn_session_t *ssn) {
+  // XXX: should really update link state also
+  pni_transport_unbind_handles(ssn->state.local_handles);
+  pn_transport_t *transport = ssn->connection->transport;
+  pn_session_state_t *state = &ssn->state;
+  uintptr_t channel = state->local_channel;
+  state->local_channel = -2;
+  // may delete session
+  pn_hash_del(transport->local_channels, channel);
+}
+
 int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
   if (endpoint->type == SESSION)
@@ -1606,7 +1643,9 @@ int pn_process_ssn_teardown(pn_transport
     if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0
         && !transport->close_sent)
     {
-      if (pn_pointful_buffering(transport, session)) return 0;
+      if (pn_pointful_buffering(transport, session)) {
+        return 0;
+      }
 
       const char *name = NULL;
       const char *description = NULL;
@@ -1621,8 +1660,7 @@ int pn_process_ssn_teardown(pn_transport
       int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END,
                               (bool) name, ERROR, name, description, info);
       if (err) return err;
-      pn_hash_del(transport->local_channels, state->local_channel);
-      state->local_channel = -2;
+      pni_unmap_local_channel(session);
     }
 
     pn_clear_modified(transport->connection, endpoint);

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java Tue Aug 26 15:49:39 2014
@@ -190,11 +190,6 @@ public abstract class EndpointImpl imple
         freed = true;
 
         doFree();
-
-        if (_localState == EndpointState.ACTIVE) {
-            close();
-        }
-
         decref();
     }
 

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Tue Aug 26 15:49:39 2014
@@ -400,10 +400,6 @@ public class TransportImpl extends Endpo
 
 
                         writeFrame(transportSession.getLocalChannel(), detach, null, null);
-
-                        // TODO - temporary hack for PROTON-154, this line should be removed and replaced
-                        //        with proper handling for closed links
-                        link.free();
                     }
 
                     endpoint.clearModified();
@@ -1158,14 +1154,13 @@ public class TransportImpl extends Endpo
                 LinkImpl link = transportLink.getLink();
                 transportLink.receivedDetach();
                 transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
+                _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
                 transportLink.clearRemoteHandle();
                 link.setRemoteState(EndpointState.CLOSED);
                 if(detach.getError() != null)
                 {
                     link.getRemoteCondition().copyFrom(detach.getError());
                 }
-
-                _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
             }
             else
             {

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Tue Aug 26 15:49:39 2014
@@ -1444,14 +1444,16 @@ public class MessengerImpl implements Me
         {
             _receivers++;
             _blocked.add((Receiver)link);
+            link.setContext(Boolean.TRUE);
         }
     }
 
     // a link is being removed, account for it.
     private void linkRemoved(Link _link)
     {
-        if (_link instanceof Receiver)
+        if (_link instanceof Receiver && (Boolean) _link.getContext())
         {
+            _link.setContext(Boolean.FALSE);
             Receiver link = (Receiver)_link;
             assert _receivers > 0;
             _receivers--;

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1620643&r1=1620642&r2=1620643&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Tue Aug 26 15:49:39 2014
@@ -2067,14 +2067,11 @@ class DeliveryTest(Test):
   def testCustom(self):
     self.testDisposition(type=0x12345, value=CustomValue([1, 2, 3]))
 
-class EventTest(Test):
+class CollectorTest(Test):
 
   def setup(self):
     self.collector = Collector()
 
-  def teardown(self):
-    self.cleanup()
-
   def drain(self):
     result = []
     while True:
@@ -2104,6 +2101,11 @@ class EventTest(Test):
 
     assert False, "actual events %s did not match any of the expected sequences: %s" % (events, sequences)
 
+class EventTest(CollectorTest):
+
+  def teardown(self):
+    self.cleanup()
+
   def testEndpointEvents(self):
     c1, c2 = self.connection()
     c1.collect(self.collector)
@@ -2145,13 +2147,11 @@ class EventTest(Test):
     self.expect(Event.LINK_FINAL)
     ssn2.free()
     del ssn2
-    self.expect(Event.SESSION_CLOSE, Event.TRANSPORT)
     self.pump()
-    self.expect(Event.SESSION_FINAL)
     c1.free()
     c1._transport.unbind()
-    self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT,
-                Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL)
+    self.expect(Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL,
+                Event.CONNECTION_FINAL)
 
   def testConnectionINIT_FINAL(self):
     c = Connection()
@@ -2215,12 +2215,8 @@ class EventTest(Test):
     self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY)
     rcv.session.connection._transport.unbind()
     rcv.session.connection.free()
-    self.expect_oneof((Event.TRANSPORT, Event.LINK_CLOSE, Event.SESSION_CLOSE,
-                       Event.CONNECTION_CLOSE, Event.LINK_FINAL,
-                       Event.SESSION_FINAL, Event.CONNECTION_FINAL),
-                      (Event.TRANSPORT, Event.LINK_CLOSE, Event.LINK_FINAL,
-                       Event.SESSION_CLOSE, Event.SESSION_FINAL,
-                       Event.CONNECTION_CLOSE, Event.CONNECTION_FINAL))
+    self.expect(Event.TRANSPORT, Event.LINK_FINAL, Event.SESSION_FINAL,
+                Event.CONNECTION_FINAL)
 
   def testDeliveryEventsDisp(self):
     snd, rcv = self.testFlowEvents()
@@ -2238,3 +2234,88 @@ class EventTest(Test):
     self.pump()
     event = self.expect(Event.DELIVERY)
     assert event.delivery == dlv
+
+class PeerTest(CollectorTest):
+
+  def setup(self):
+    CollectorTest.setup(self)
+    self.connection = Connection()
+    self.connection.collect(self.collector)
+    self.transport = Transport()
+    self.transport.bind(self.connection)
+    self.peer = Connection()
+    self.peer_transport = Transport()
+    self.peer_transport.bind(self.peer)
+    self.peer_transport.trace(Transport.TRACE_OFF)
+
+  def pump(self):
+    pump(self.transport, self.peer_transport)
+
+class TeardownLeakTest(PeerTest):
+
+  def doLeak(self, local, remote):
+    self.connection.open()
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_OPEN, Event.TRANSPORT)
+
+    ssn = self.connection.session()
+    ssn.open()
+    self.expect(Event.SESSION_INIT, Event.SESSION_OPEN, Event.TRANSPORT)
+
+    snd = ssn.sender("sender")
+    snd.open()
+    self.expect(Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT)
+
+
+    self.pump()
+
+    self.peer.open()
+    self.peer.session_head(0).open()
+    self.peer.link_head(0).open()
+
+    self.pump()
+    self.expect_oneof((Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN,
+                       Event.LINK_REMOTE_OPEN, Event.LINK_FLOW),
+                      (Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN,
+                       Event.LINK_REMOTE_OPEN))
+
+    if local:
+      snd.close() # ha!!
+      self.expect(Event.LINK_CLOSE, Event.TRANSPORT)
+    ssn.close()
+    self.expect(Event.SESSION_CLOSE, Event.TRANSPORT)
+    self.connection.close()
+    self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT)
+
+    if remote:
+      self.peer.link_head(0).close() # ha!!
+    self.peer.session_head(0).close()
+    self.peer.close()
+
+    self.pump()
+
+    if remote:
+      self.expect_oneof((Event.LINK_REMOTE_CLOSE, Event.SESSION_REMOTE_CLOSE,
+                         Event.CONNECTION_REMOTE_CLOSE),
+                        (Event.LINK_REMOTE_CLOSE, Event.LINK_FINAL,
+                         Event.SESSION_REMOTE_CLOSE,
+                         Event.CONNECTION_REMOTE_CLOSE))
+    else:
+      self.expect(Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE)
+
+    self.connection.free()
+    self.transport.unbind()
+
+    self.expect_oneof((Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL),
+                      (Event.SESSION_FINAL, Event.CONNECTION_FINAL))
+
+  def testLocalRemoteLeak(self):
+    self.doLeak(True, True)
+
+  def testLocalLeak(self):
+    self.doLeak(True, False)
+
+  def testRemoteLeak(self):
+    self.doLeak(False, True)
+
+  def testLeak(self):
+    self.doLeak(False, False)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org