You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/09/23 19:45:56 UTC

svn commit: r1627098 - in /qpid/proton/trunk: proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/engine/ proton-c/src/events/ proton-c/src/messenger/ proton-c/src/transport/ proton-j/src/main/java/org/apache/qpid/proton/engine/ proton-j/sr...

Author: rhs
Date: Tue Sep 23 17:45:56 2014
New Revision: 1627098

URL: http://svn.apache.org/r1627098
Log:
PROTON-677: added support for detach

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/event.h
    qpid/proton/trunk/proton-c/include/proton/link.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/events/event.c
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/src/main/resources/cengine.py
    qpid/proton/trunk/tests/python/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Sep 23 17:45:56 2014
@@ -2648,6 +2648,9 @@ class Link(Endpoint):
   def drained(self):
     return pn_link_drained(self._link)
 
+  def detach(self):
+    return pn_link_detach(self._link)
+
 class Terminus(object):
 
   UNSPECIFIED = PN_UNSPECIFIED
@@ -3385,8 +3388,10 @@ class Event:
   LINK_INIT = PN_LINK_INIT
   LINK_OPEN = PN_LINK_OPEN
   LINK_CLOSE = PN_LINK_CLOSE
+  LINK_DETACH = PN_LINK_DETACH
   LINK_REMOTE_OPEN = PN_LINK_REMOTE_OPEN
   LINK_REMOTE_CLOSE = PN_LINK_REMOTE_CLOSE
+  LINK_REMOTE_DETACH = PN_LINK_REMOTE_DETACH
   LINK_FLOW = PN_LINK_FLOW
   LINK_FINAL = PN_LINK_FINAL
 

Modified: qpid/proton/trunk/proton-c/include/proton/event.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/event.h?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/event.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/event.h Tue Sep 23 17:45:56 2014
@@ -202,6 +202,18 @@ typedef enum {
   PN_LINK_REMOTE_CLOSE,
 
   /**
+   * The local link endpoint has been detached. Events of this type
+   * point to the relevant link.
+   */
+  PN_LINK_DETACH,
+
+  /**
+   * The remote endpoint has detached the link. Events of this type
+   * point to the relevant link.
+   */
+  PN_LINK_REMOTE_DETACH,
+
+  /**
    * The flow control state for a link has changed. Events of this
    * type point to the relevant link.
    */

Modified: qpid/proton/trunk/proton-c/include/proton/link.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/link.h?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/link.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/link.h Tue Sep 23 17:45:56 2014
@@ -249,6 +249,13 @@ PN_EXTERN void pn_link_open(pn_link_t *l
 PN_EXTERN void pn_link_close(pn_link_t *link);
 
 /**
+ * Detach a link.
+ *
+ * @param[in] link a link object
+ */
+PN_EXTERN void pn_link_detach(pn_link_t *link);
+
+/**
  * Access the locally defined source definition for a link.
  *
  * The pointer returned by this operation is valid until the link

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Sep 23 17:45:56 2014
@@ -252,6 +252,7 @@ struct pn_link_t {
   uint8_t remote_rcv_settle_mode;
   bool drain_flag_mode; // receiver only
   bool drain;
+  bool detached;
 };
 
 struct pn_disposition_t {

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Sep 23 17:45:56 2014
@@ -265,6 +265,15 @@ void pn_link_close(pn_link_t *link)
   pn_endpoint_close(&link->endpoint);
 }
 
+void pn_link_detach(pn_link_t *link)
+{
+  assert(link);
+  link->detached = true;
+  pn_collector_put(link->session->connection->collector, PN_OBJECT, link, PN_LINK_DETACH);
+  pn_modified(link->session->connection, &link->endpoint, true);
+
+}
+
 void pn_terminus_free(pn_terminus_t *terminus)
 {
   pn_free(terminus->address);
@@ -861,6 +870,7 @@ pn_link_t *pn_link_new(int type, pn_sess
   link->rcv_settle_mode = PN_RCV_FIRST;
   link->remote_snd_settle_mode = PN_SND_MIXED;
   link->remote_rcv_settle_mode = PN_RCV_FIRST;
+  link->detached = false;
 
   // begin transport state
   link->state.local_handle = -1;

Modified: qpid/proton/trunk/proton-c/src/events/event.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/events/event.c?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/events/event.c (original)
+++ qpid/proton/trunk/proton-c/src/events/event.c Tue Sep 23 17:45:56 2014
@@ -270,6 +270,10 @@ const char *pn_event_type_name(pn_event_
     return "PN_LINK_OPEN";
   case PN_LINK_REMOTE_CLOSE:
     return "PN_LINK_REMOTE_CLOSE";
+  case PN_LINK_DETACH:
+    return "PN_LINK_DETACH";
+  case PN_LINK_REMOTE_DETACH:
+    return "PN_LINK_REMOTE_DETACH";
   case PN_LINK_CLOSE:
     return "PN_LINK_CLOSE";
   case PN_LINK_FLOW:

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Sep 23 17:45:56 2014
@@ -1265,8 +1265,10 @@ int pn_messenger_process_events(pn_messe
       break;
     case PN_LINK_REMOTE_OPEN:
     case PN_LINK_REMOTE_CLOSE:
+    case PN_LINK_REMOTE_DETACH:
     case PN_LINK_OPEN:
     case PN_LINK_CLOSE:
+    case PN_LINK_DETACH:
       pn_messenger_process_link(messenger, event);
       break;
     case PN_LINK_FLOW:

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Tue Sep 23 17:45:56 2014
@@ -968,7 +968,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
     PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED);
     pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_CLOSE);
   } else {
-    // TODO: implement
+    pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_DETACH);
   }
 
   pni_unmap_remote_handle(link);
@@ -1613,7 +1613,7 @@ int pn_process_link_teardown(pn_transpor
     pn_session_t *session = link->session;
     pn_session_state_t *ssn_state = &session->state;
     pn_link_state_t *state = &link->state;
-    if (endpoint->state & PN_LOCAL_CLOSED && (int32_t) state->local_handle >= 0 &&
+    if (((endpoint->state & PN_LOCAL_CLOSED) || link->detached) && (int32_t) state->local_handle >= 0 &&
         (int16_t) ssn_state->local_channel >= 0 && !transport->close_sent) {
       if (pn_link_is_sender(link) && pn_link_queued(link) &&
           (int32_t) state->remote_handle != -2 &&
@@ -1630,8 +1630,10 @@ int pn_process_link_teardown(pn_transpor
         info = pn_condition_info(&endpoint->condition);
       }
 
-      int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH,
-                              state->local_handle, true, (bool) name, ERROR, name, description, info);
+      int err =
+          pn_post_frame(transport->disp, ssn_state->local_channel,
+                        "DL[Io?DL[sSC]]", DETACH, state->local_handle, !link->detached,
+                        (bool)name, ERROR, name, description, info);
       if (err) return err;
       pni_unmap_local_handle(link);
     }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java Tue Sep 23 17:45:56 2014
@@ -51,6 +51,8 @@ public interface Event
         LINK_REMOTE_OPEN,
         LINK_CLOSE,
         LINK_REMOTE_CLOSE,
+        LINK_DETACH,
+        LINK_REMOTE_DETACH,
         LINK_FLOW,
         LINK_FINAL,
 

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java Tue Sep 23 17:45:56 2014
@@ -184,4 +184,7 @@ public interface Link extends Endpoint
 
     public int getRemoteCredit();
     public boolean getDrain();
+
+    public void detach();
+
 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Tue Sep 23 17:45:56 2014
@@ -55,6 +55,7 @@ public abstract class LinkImpl extends E
 
     private LinkNode<LinkImpl> _node;
     private boolean _drain;
+    private boolean _detached;
 
     LinkImpl(SessionImpl session, String name)
     {
@@ -398,4 +399,17 @@ public abstract class LinkImpl extends E
     {
         getConnectionImpl().put(Event.Type.LINK_CLOSE, this);
     }
+
+    public void detach()
+    {
+        _detached = true;
+        getConnectionImpl().put(Event.Type.LINK_DETACH, this);
+        modified();
+    }
+
+    boolean detached()
+    {
+        return _detached;
+    }
+
 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Tue Sep 23 17:45:56 2014
@@ -375,7 +375,7 @@ public class TransportImpl extends Endpo
                     SessionImpl session = link.getSession();
                     TransportSession transportSession = getTransportState(session);
 
-                    if(link.getLocalState() == EndpointState.CLOSED
+                    if(((link.getLocalState() == EndpointState.CLOSED) || link.detached())
                        && transportLink.isLocalHandleSet()
                        && !_isCloseSent)
                     {
@@ -395,8 +395,7 @@ public class TransportImpl extends Endpo
 
                         Detach detach = new Detach();
                         detach.setHandle(localHandle);
-                        // TODO - need an API for detaching rather than closing the link
-                        detach.setClosed(true);
+                        detach.setClosed(!link.detached());
 
                         ErrorCondition localError = link.getCondition();
                         if( localError.getCondition() !=null )
@@ -1165,7 +1164,11 @@ public class TransportImpl extends Endpo
                 LinkImpl link = transportLink.getLink();
                 transportLink.receivedDetach();
                 transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
-                _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
+                if (detach.getClosed()) {
+                    _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
+                } else {
+                    _connectionEndpoint.put(Event.Type.LINK_REMOTE_DETACH, link);
+                }
                 transportLink.clearRemoteHandle();
                 link.setRemoteState(EndpointState.CLOSED);
                 if(detach.getError() != null)

Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Tue Sep 23 17:45:56 2014
@@ -625,6 +625,10 @@ def pn_link_close(link):
   link.on_close()
   link.impl.close()
 
+def pn_link_detach(link):
+  link.on_close()
+  link.impl.detach()
+
 def pn_link_flow(link, n):
   link.impl.flow(n)
 
@@ -961,6 +965,8 @@ PN_LINK_OPEN = Event.Type.LINK_OPEN
 PN_LINK_REMOTE_OPEN = Event.Type.LINK_REMOTE_OPEN
 PN_LINK_CLOSE = Event.Type.LINK_CLOSE
 PN_LINK_REMOTE_CLOSE = Event.Type.LINK_REMOTE_CLOSE
+PN_LINK_DETACH = Event.Type.LINK_DETACH
+PN_LINK_REMOTE_DETACH = Event.Type.LINK_REMOTE_DETACH
 PN_LINK_FLOW = Event.Type.LINK_FLOW
 PN_LINK_FINAL = Event.Type.LINK_FINAL
 PN_DELIVERY = Event.Type.DELIVERY

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Tue Sep 23 17:45:56 2014
@@ -2101,6 +2101,11 @@ class CollectorTest(Test):
 
     assert False, "actual events %s did not match any of the expected sequences: %s" % (events, sequences)
 
+  def expect_until(self, *types):
+    events = self.drain()
+    etypes = tuple([e.type for e in events[-len(types):]])
+    assert etypes == types, "actual events %s did not end in expect sequence: %s" % (events, types)
+
 class EventTest(CollectorTest):
 
   def teardown(self):
@@ -2289,6 +2294,28 @@ class EventTest(CollectorTest):
     self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT,
                 Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
 
+  def testLinkDetach(self):
+    c1 = Connection()
+    c1.collect(self.collector)
+    t1 = Transport()
+    t1.bind(c1)
+    c1.open()
+    s1 = c1.session()
+    s1.open()
+    l1 = s1.sender("asdf")
+    l1.open()
+    l1.detach()
+    self.expect_until(Event.LINK_DETACH, Event.TRANSPORT)
+
+    c2 = Connection()
+    c2.collect(self.collector)
+    t2 = Transport()
+    t2.bind(c2)
+
+    pump(t1, t2)
+
+    self.expect_until(Event.LINK_REMOTE_DETACH)
+
 class PeerTest(CollectorTest):
 
   def setup(self):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org