You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/09/23 19:45:56 UTC
svn commit: r1627098 - in /qpid/proton/trunk: proton-c/bindings/python/
proton-c/include/proton/ proton-c/src/engine/ proton-c/src/events/
proton-c/src/messenger/ proton-c/src/transport/
proton-j/src/main/java/org/apache/qpid/proton/engine/ proton-j/sr...
Author: rhs
Date: Tue Sep 23 17:45:56 2014
New Revision: 1627098
URL: http://svn.apache.org/r1627098
Log:
PROTON-677: added support for detach
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/event.h
qpid/proton/trunk/proton-c/include/proton/link.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/events/event.c
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-c/src/transport/transport.c
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/main/resources/cengine.py
qpid/proton/trunk/tests/python/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Sep 23 17:45:56 2014
@@ -2648,6 +2648,9 @@ class Link(Endpoint):
def drained(self):
return pn_link_drained(self._link)
+ def detach(self):
+ return pn_link_detach(self._link)
+
class Terminus(object):
UNSPECIFIED = PN_UNSPECIFIED
@@ -3385,8 +3388,10 @@ class Event:
LINK_INIT = PN_LINK_INIT
LINK_OPEN = PN_LINK_OPEN
LINK_CLOSE = PN_LINK_CLOSE
+ LINK_DETACH = PN_LINK_DETACH
LINK_REMOTE_OPEN = PN_LINK_REMOTE_OPEN
LINK_REMOTE_CLOSE = PN_LINK_REMOTE_CLOSE
+ LINK_REMOTE_DETACH = PN_LINK_REMOTE_DETACH
LINK_FLOW = PN_LINK_FLOW
LINK_FINAL = PN_LINK_FINAL
Modified: qpid/proton/trunk/proton-c/include/proton/event.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/event.h?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/event.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/event.h Tue Sep 23 17:45:56 2014
@@ -202,6 +202,18 @@ typedef enum {
PN_LINK_REMOTE_CLOSE,
/**
+ * The local link endpoint has been detached. Events of this type
+ * point to the relevant link.
+ */
+ PN_LINK_DETACH,
+
+ /**
+ * The remote endpoint has detached the link. Events of this type
+ * point to the relevant link.
+ */
+ PN_LINK_REMOTE_DETACH,
+
+ /**
* The flow control state for a link has changed. Events of this
* type point to the relevant link.
*/
Modified: qpid/proton/trunk/proton-c/include/proton/link.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/link.h?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/link.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/link.h Tue Sep 23 17:45:56 2014
@@ -249,6 +249,13 @@ PN_EXTERN void pn_link_open(pn_link_t *l
PN_EXTERN void pn_link_close(pn_link_t *link);
/**
+ * Detach a link.
+ *
+ * @param[in] link a link object
+ */
+PN_EXTERN void pn_link_detach(pn_link_t *link);
+
+/**
* Access the locally defined source definition for a link.
*
* The pointer returned by this operation is valid until the link
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Sep 23 17:45:56 2014
@@ -252,6 +252,7 @@ struct pn_link_t {
uint8_t remote_rcv_settle_mode;
bool drain_flag_mode; // receiver only
bool drain;
+ bool detached;
};
struct pn_disposition_t {
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Sep 23 17:45:56 2014
@@ -265,6 +265,15 @@ void pn_link_close(pn_link_t *link)
pn_endpoint_close(&link->endpoint);
}
+void pn_link_detach(pn_link_t *link)
+{
+ assert(link);
+ link->detached = true;
+ pn_collector_put(link->session->connection->collector, PN_OBJECT, link, PN_LINK_DETACH);
+ pn_modified(link->session->connection, &link->endpoint, true);
+
+}
+
void pn_terminus_free(pn_terminus_t *terminus)
{
pn_free(terminus->address);
@@ -861,6 +870,7 @@ pn_link_t *pn_link_new(int type, pn_sess
link->rcv_settle_mode = PN_RCV_FIRST;
link->remote_snd_settle_mode = PN_SND_MIXED;
link->remote_rcv_settle_mode = PN_RCV_FIRST;
+ link->detached = false;
// begin transport state
link->state.local_handle = -1;
Modified: qpid/proton/trunk/proton-c/src/events/event.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/events/event.c?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/events/event.c (original)
+++ qpid/proton/trunk/proton-c/src/events/event.c Tue Sep 23 17:45:56 2014
@@ -270,6 +270,10 @@ const char *pn_event_type_name(pn_event_
return "PN_LINK_OPEN";
case PN_LINK_REMOTE_CLOSE:
return "PN_LINK_REMOTE_CLOSE";
+ case PN_LINK_DETACH:
+ return "PN_LINK_DETACH";
+ case PN_LINK_REMOTE_DETACH:
+ return "PN_LINK_REMOTE_DETACH";
case PN_LINK_CLOSE:
return "PN_LINK_CLOSE";
case PN_LINK_FLOW:
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Sep 23 17:45:56 2014
@@ -1265,8 +1265,10 @@ int pn_messenger_process_events(pn_messe
break;
case PN_LINK_REMOTE_OPEN:
case PN_LINK_REMOTE_CLOSE:
+ case PN_LINK_REMOTE_DETACH:
case PN_LINK_OPEN:
case PN_LINK_CLOSE:
+ case PN_LINK_DETACH:
pn_messenger_process_link(messenger, event);
break;
case PN_LINK_FLOW:
Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Tue Sep 23 17:45:56 2014
@@ -968,7 +968,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED);
pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_CLOSE);
} else {
- // TODO: implement
+ pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_DETACH);
}
pni_unmap_remote_handle(link);
@@ -1613,7 +1613,7 @@ int pn_process_link_teardown(pn_transpor
pn_session_t *session = link->session;
pn_session_state_t *ssn_state = &session->state;
pn_link_state_t *state = &link->state;
- if (endpoint->state & PN_LOCAL_CLOSED && (int32_t) state->local_handle >= 0 &&
+ if (((endpoint->state & PN_LOCAL_CLOSED) || link->detached) && (int32_t) state->local_handle >= 0 &&
(int16_t) ssn_state->local_channel >= 0 && !transport->close_sent) {
if (pn_link_is_sender(link) && pn_link_queued(link) &&
(int32_t) state->remote_handle != -2 &&
@@ -1630,8 +1630,10 @@ int pn_process_link_teardown(pn_transpor
info = pn_condition_info(&endpoint->condition);
}
- int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH,
- state->local_handle, true, (bool) name, ERROR, name, description, info);
+ int err =
+ pn_post_frame(transport->disp, ssn_state->local_channel,
+ "DL[Io?DL[sSC]]", DETACH, state->local_handle, !link->detached,
+ (bool)name, ERROR, name, description, info);
if (err) return err;
pni_unmap_local_handle(link);
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java Tue Sep 23 17:45:56 2014
@@ -51,6 +51,8 @@ public interface Event
LINK_REMOTE_OPEN,
LINK_CLOSE,
LINK_REMOTE_CLOSE,
+ LINK_DETACH,
+ LINK_REMOTE_DETACH,
LINK_FLOW,
LINK_FINAL,
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java Tue Sep 23 17:45:56 2014
@@ -184,4 +184,7 @@ public interface Link extends Endpoint
public int getRemoteCredit();
public boolean getDrain();
+
+ public void detach();
+
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Tue Sep 23 17:45:56 2014
@@ -55,6 +55,7 @@ public abstract class LinkImpl extends E
private LinkNode<LinkImpl> _node;
private boolean _drain;
+ private boolean _detached;
LinkImpl(SessionImpl session, String name)
{
@@ -398,4 +399,17 @@ public abstract class LinkImpl extends E
{
getConnectionImpl().put(Event.Type.LINK_CLOSE, this);
}
+
+ public void detach()
+ {
+ _detached = true;
+ getConnectionImpl().put(Event.Type.LINK_DETACH, this);
+ modified();
+ }
+
+ boolean detached()
+ {
+ return _detached;
+ }
+
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Tue Sep 23 17:45:56 2014
@@ -375,7 +375,7 @@ public class TransportImpl extends Endpo
SessionImpl session = link.getSession();
TransportSession transportSession = getTransportState(session);
- if(link.getLocalState() == EndpointState.CLOSED
+ if(((link.getLocalState() == EndpointState.CLOSED) || link.detached())
&& transportLink.isLocalHandleSet()
&& !_isCloseSent)
{
@@ -395,8 +395,7 @@ public class TransportImpl extends Endpo
Detach detach = new Detach();
detach.setHandle(localHandle);
- // TODO - need an API for detaching rather than closing the link
- detach.setClosed(true);
+ detach.setClosed(!link.detached());
ErrorCondition localError = link.getCondition();
if( localError.getCondition() !=null )
@@ -1165,7 +1164,11 @@ public class TransportImpl extends Endpo
LinkImpl link = transportLink.getLink();
transportLink.receivedDetach();
transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
- _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
+ if (detach.getClosed()) {
+ _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
+ } else {
+ _connectionEndpoint.put(Event.Type.LINK_REMOTE_DETACH, link);
+ }
transportLink.clearRemoteHandle();
link.setRemoteState(EndpointState.CLOSED);
if(detach.getError() != null)
Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Tue Sep 23 17:45:56 2014
@@ -625,6 +625,10 @@ def pn_link_close(link):
link.on_close()
link.impl.close()
+def pn_link_detach(link):
+ link.on_close()
+ link.impl.detach()
+
def pn_link_flow(link, n):
link.impl.flow(n)
@@ -961,6 +965,8 @@ PN_LINK_OPEN = Event.Type.LINK_OPEN
PN_LINK_REMOTE_OPEN = Event.Type.LINK_REMOTE_OPEN
PN_LINK_CLOSE = Event.Type.LINK_CLOSE
PN_LINK_REMOTE_CLOSE = Event.Type.LINK_REMOTE_CLOSE
+PN_LINK_DETACH = Event.Type.LINK_DETACH
+PN_LINK_REMOTE_DETACH = Event.Type.LINK_REMOTE_DETACH
PN_LINK_FLOW = Event.Type.LINK_FLOW
PN_LINK_FINAL = Event.Type.LINK_FINAL
PN_DELIVERY = Event.Type.DELIVERY
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1627098&r1=1627097&r2=1627098&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Tue Sep 23 17:45:56 2014
@@ -2101,6 +2101,11 @@ class CollectorTest(Test):
assert False, "actual events %s did not match any of the expected sequences: %s" % (events, sequences)
+ def expect_until(self, *types):
+ events = self.drain()
+ etypes = tuple([e.type for e in events[-len(types):]])
+ assert etypes == types, "actual events %s did not end in expect sequence: %s" % (events, types)
+
class EventTest(CollectorTest):
def teardown(self):
@@ -2289,6 +2294,28 @@ class EventTest(CollectorTest):
self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT,
Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+ def testLinkDetach(self):
+ c1 = Connection()
+ c1.collect(self.collector)
+ t1 = Transport()
+ t1.bind(c1)
+ c1.open()
+ s1 = c1.session()
+ s1.open()
+ l1 = s1.sender("asdf")
+ l1.open()
+ l1.detach()
+ self.expect_until(Event.LINK_DETACH, Event.TRANSPORT)
+
+ c2 = Connection()
+ c2.collect(self.collector)
+ t2 = Transport()
+ t2.bind(c2)
+
+ pump(t1, t2)
+
+ self.expect_until(Event.LINK_REMOTE_DETACH)
+
class PeerTest(CollectorTest):
def setup(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org