You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/10/10 14:52:17 UTC
svn commit: r1630822 - in /qpid/proton/branches/examples:
proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/engine/
proton-c/src/events/ proton-c/src/messenger/ proton-c/src/ssl/
proton-c/src/transport/ proton-j/src/main/java/org/apache/q...
Author: gsim
Date: Fri Oct 10 12:52:17 2014
New Revision: 1630822
URL: http://svn.apache.org/r1630822
Log:
Added events: PN_CONNECTION_BOUND/UNBOUND, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_CLOSED, and PN_TRANSPORT_ERROR. This should address PROTON-656
Added:
qpid/proton/branches/examples/tests/python/proton_tests/scratch.py
Modified:
qpid/proton/branches/examples/proton-c/bindings/python/proton.py
qpid/proton/branches/examples/proton-c/include/proton/event.h
qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h
qpid/proton/branches/examples/proton-c/src/events/event.c
qpid/proton/branches/examples/proton-c/src/messenger/messenger.c
qpid/proton/branches/examples/proton-c/src/ssl/openssl.c
qpid/proton/branches/examples/proton-c/src/transport/transport.c
qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py
qpid/proton/branches/examples/tests/python/proton_tests/engine.py
Modified: qpid/proton/branches/examples/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/bindings/python/proton.py?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/branches/examples/proton-c/bindings/python/proton.py Fri Oct 10 12:52:17 2014
@@ -3363,6 +3363,8 @@ class Collector:
class Event:
CONNECTION_INIT = PN_CONNECTION_INIT
+ CONNECTION_BOUND = PN_CONNECTION_BOUND
+ CONNECTION_UNBOUND = PN_CONNECTION_UNBOUND
CONNECTION_OPEN = PN_CONNECTION_OPEN
CONNECTION_CLOSE = PN_CONNECTION_CLOSE
CONNECTION_REMOTE_OPEN = PN_CONNECTION_REMOTE_OPEN
@@ -3385,7 +3387,12 @@ class Event:
LINK_FINAL = PN_LINK_FINAL
DELIVERY = PN_DELIVERY
+
TRANSPORT = PN_TRANSPORT
+ TRANSPORT_ERROR = PN_TRANSPORT_ERROR
+ TRANSPORT_HEAD_CLOSED = PN_TRANSPORT_HEAD_CLOSED
+ TRANSPORT_TAIL_CLOSED = PN_TRANSPORT_TAIL_CLOSED
+ TRANSPORT_CLOSED = PN_TRANSPORT_CLOSED
def __init__(self, clazz, context, type):
self.clazz = clazz
Modified: qpid/proton/branches/examples/proton-c/include/proton/event.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/include/proton/event.h?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-c/include/proton/event.h (original)
+++ qpid/proton/branches/examples/proton-c/include/proton/event.h Fri Oct 10 12:52:17 2014
@@ -91,130 +91,148 @@ typedef enum {
* will ever be issued for a connection. Events of this type point
* to the relevant connection.
*/
- PN_CONNECTION_INIT = 1,
+ PN_CONNECTION_INIT,
+
+ /**
+ * The connection has been bound to a transport.
+ */
+ PN_CONNECTION_BOUND,
+
+ /**
+ * The connection has been unbound from its transport.
+ */
+ PN_CONNECTION_UNBOUND,
/**
* The local connection endpoint has been closed. Events of this
* type point to the relevant connection.
*/
- PN_CONNECTION_OPEN = 2,
+ PN_CONNECTION_OPEN,
/**
* The remote endpoint has opened the connection. Events of this
* type point to the relevant connection.
*/
- PN_CONNECTION_REMOTE_OPEN = 3,
+ PN_CONNECTION_REMOTE_OPEN,
/**
* The local connection endpoint has been closed. Events of this
* type point to the relevant connection.
*/
- PN_CONNECTION_CLOSE = 4,
+ PN_CONNECTION_CLOSE,
/**
* The remote endpoint has closed the connection. Events of this
* type point to the relevant connection.
*/
- PN_CONNECTION_REMOTE_CLOSE = 5,
+ PN_CONNECTION_REMOTE_CLOSE,
/**
* The connection has been freed and any outstanding processing has
* been completed. This is the final event that will ever be issued
* for a connection.
*/
- PN_CONNECTION_FINAL = 6,
+ PN_CONNECTION_FINAL,
/**
* The session has been created. This is the first event that will
* ever be issued for a session.
*/
- PN_SESSION_INIT = 11,
+ PN_SESSION_INIT,
/**
* The local session endpoint has been opened. Events of this type
* point ot the relevant session.
*/
- PN_SESSION_OPEN = 12,
+ PN_SESSION_OPEN,
/**
* The remote endpoint has opened the session. Events of this type
* point to the relevant session.
*/
- PN_SESSION_REMOTE_OPEN = 13,
+ PN_SESSION_REMOTE_OPEN,
/**
* The local session endpoint has been closed. Events of this type
* point ot the relevant session.
*/
- PN_SESSION_CLOSE = 14,
+ PN_SESSION_CLOSE,
/**
* The remote endpoint has closed the session. Events of this type
* point to the relevant session.
*/
- PN_SESSION_REMOTE_CLOSE = 15,
+ PN_SESSION_REMOTE_CLOSE,
/**
* The session has been freed and any outstanding processing has
* been completed. This is the final event that will ever be issued
* for a session.
*/
- PN_SESSION_FINAL = 16,
+ PN_SESSION_FINAL,
/**
* The link has been created. This is the first event that will ever
* be issued for a link.
*/
- PN_LINK_INIT = 21,
+ PN_LINK_INIT,
/**
* The local link endpoint has been opened. Events of this type
* point ot the relevant link.
*/
- PN_LINK_OPEN = 22,
+ PN_LINK_OPEN,
/**
* The remote endpoint has opened the link. Events of this type
* point to the relevant link.
*/
- PN_LINK_REMOTE_OPEN = 23,
+ PN_LINK_REMOTE_OPEN,
/**
* The local link endpoint has been closed. Events of this type
* point ot the relevant link.
*/
- PN_LINK_CLOSE = 24,
+ PN_LINK_CLOSE,
/**
* The remote endpoint has closed the link. Events of this type
* point to the relevant link.
*/
- PN_LINK_REMOTE_CLOSE = 25,
+ PN_LINK_REMOTE_CLOSE,
/**
* The flow control state for a link has changed. Events of this
* type point to the relevant link.
*/
- PN_LINK_FLOW = 26,
+ PN_LINK_FLOW,
/**
* The link has been freed and any outstanding processing has been
* completed. This is the final event that will ever be issued for a
* link. Events of this type point to the relevant link.
*/
- PN_LINK_FINAL = 27,
+ PN_LINK_FINAL,
/**
* A delivery has been created or updated. Events of this type point
* to the relevant delivery.
*/
- PN_DELIVERY = 31,
+ PN_DELIVERY,
/**
* The transport has new data to read and/or write. Events of this
* type point to the relevant transport.
*/
- PN_TRANSPORT = 41
+ PN_TRANSPORT,
+
+ PN_TRANSPORT_ERROR,
+
+ PN_TRANSPORT_HEAD_CLOSED,
+
+ PN_TRANSPORT_TAIL_CLOSED,
+
+ PN_TRANSPORT_CLOSED
} pn_event_type_t;
Modified: qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h Fri Oct 10 12:52:17 2014
@@ -174,6 +174,8 @@ struct pn_transport_t {
bool tail_closed; // input stream closed by driver
bool head_closed;
bool done_processing; // if true, don't call pn_process again
+ bool posted_head_closed;
+ bool posted_tail_closed;
};
struct pn_connection_t {
@@ -314,4 +316,7 @@ int pn_do_error(pn_transport_t *transpor
void pn_session_unbound(pn_session_t* ssn);
void pn_link_unbound(pn_link_t* link);
+void pni_close_tail(pn_transport_t *transport);
+
+
#endif /* engine-internal.h */
Modified: qpid/proton/branches/examples/proton-c/src/events/event.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/events/event.c?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-c/src/events/event.c (original)
+++ qpid/proton/branches/examples/proton-c/src/events/event.c Fri Oct 10 12:52:17 2014
@@ -236,6 +236,10 @@ const char *pn_event_type_name(pn_event_
return "PN_EVENT_NONE";
case PN_CONNECTION_INIT:
return "PN_CONNECTION_INIT";
+ case PN_CONNECTION_BOUND:
+ return "PN_CONNECTION_BOUND";
+ case PN_CONNECTION_UNBOUND:
+ return "PN_CONNECTION_UNBOUND";
case PN_CONNECTION_REMOTE_OPEN:
return "PN_CONNECTION_REMOTE_OPEN";
case PN_CONNECTION_OPEN:
@@ -276,6 +280,14 @@ const char *pn_event_type_name(pn_event_
return "PN_DELIVERY";
case PN_TRANSPORT:
return "PN_TRANSPORT";
+ case PN_TRANSPORT_ERROR:
+ return "PN_TRANSPORT_ERROR";
+ case PN_TRANSPORT_HEAD_CLOSED:
+ return "PN_TRANSPORT_HEAD_CLOSED";
+ case PN_TRANSPORT_TAIL_CLOSED:
+ return "PN_TRANSPORT_TAIL_CLOSED";
+ case PN_TRANSPORT_CLOSED:
+ return "PN_TRANSPORT_CLOSED";
}
return "<unrecognized>";
Modified: qpid/proton/branches/examples/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/messenger/messenger.c?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/branches/examples/proton-c/src/messenger/messenger.c Fri Oct 10 12:52:17 2014
@@ -1259,10 +1259,18 @@ int pn_messenger_process_events(pn_messe
pn_messenger_process_delivery(messenger, event);
break;
case PN_TRANSPORT:
+ case PN_TRANSPORT_ERROR:
+ case PN_TRANSPORT_HEAD_CLOSED:
+ case PN_TRANSPORT_TAIL_CLOSED:
+ case PN_TRANSPORT_CLOSED:
pn_messenger_process_transport(messenger, event);
break;
case PN_EVENT_NONE:
break;
+ case PN_CONNECTION_BOUND:
+ break;
+ case PN_CONNECTION_UNBOUND:
+ break;
case PN_CONNECTION_FINAL:
break;
case PN_SESSION_FINAL:
Modified: qpid/proton/branches/examples/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/ssl/openssl.c?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/branches/examples/proton-c/src/ssl/openssl.c Fri Oct 10 12:52:17 2014
@@ -197,7 +197,7 @@ static int ssl_failed(pn_ssl_t *ssl)
ERR_error_string_n( ssl_err, buf, sizeof(buf) );
}
_log_ssl_error(NULL); // spit out any remaining errors to the log file
- ssl->transport->tail_closed = true;
+ pni_close_tail(ssl->transport);
pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", buf);
return PN_EOS;
}
Modified: qpid/proton/branches/examples/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/transport/transport.c?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-c/src/transport/transport.c (original)
+++ qpid/proton/branches/examples/proton-c/src/transport/transport.c Fri Oct 10 12:52:17 2014
@@ -174,6 +174,9 @@ static void pn_transport_initialize(void
transport->output_pending = 0;
transport->done_processing = false;
+
+ transport->posted_head_closed = false;
+ transport->posted_tail_closed = false;
}
pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
@@ -262,11 +265,17 @@ static void pn_transport_finalize(void *
int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
{
- if (!transport) return PN_ARG_ERR;
+ assert(transport);
+ assert(connection);
+
if (transport->connection) return PN_STATE_ERR;
if (connection->transport) return PN_STATE_ERR;
+
transport->connection = connection;
connection->transport = transport;
+
+ pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND);
+
pn_incref(connection);
if (transport->open_rcvd) {
PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
@@ -274,6 +283,7 @@ int pn_transport_bind(pn_transport_t *tr
transport->disp->halt = false;
transport_consume(transport); // blech - testBindAfterOpen
}
+
return 0;
}
@@ -304,9 +314,12 @@ int pn_transport_unbind(pn_transport_t *
assert(transport);
if (!transport->connection) return 0;
+
pn_connection_t *conn = transport->connection;
transport->connection = NULL;
+ pn_collector_put(conn->collector, PN_OBJECT, conn, PN_CONNECTION_UNBOUND);
+
// XXX: what happens if the endpoints are freed before we get here?
pn_session_t *ssn = pn_session_head(conn, 0);
while (ssn) {
@@ -415,6 +428,15 @@ int pn_post_close(pn_transport_t *transp
(bool) condition, ERROR, condition, description, info);
}
+static pn_collector_t *pni_transport_collector(pn_transport_t *transport)
+{
+ if (transport->connection && transport->connection->collector) {
+ return transport->connection->collector;
+ } else {
+ return NULL;
+ }
+}
+
int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
{
va_list ap;
@@ -433,6 +455,8 @@ int pn_do_error(pn_transport_t *transpor
}
transport->disp->halt = true;
pn_transport_logf(transport, "ERROR %s %s", condition, buf);
+ pn_collector_t *collector = pni_transport_collector(transport);
+ pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_ERROR);
return PN_ERR;
}
@@ -999,6 +1023,14 @@ ssize_t pn_transport_input(pn_transport_
return original - available;
}
+static void pni_maybe_post_closed(pn_transport_t *transport)
+{
+ pn_collector_t *collector = pni_transport_collector(transport);
+ if (transport->posted_head_closed && transport->posted_tail_closed) {
+ pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED);
+ }
+}
+
// process pending input until none remaining or EOS
static ssize_t transport_consume(pn_transport_t *transport)
{
@@ -1020,6 +1052,12 @@ static ssize_t transport_consume(pn_tran
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_transport_log(transport, " <- EOS");
transport->input_pending = 0; // XXX ???
+ if (!transport->posted_tail_closed) {
+ pn_collector_t *collector = pni_transport_collector(transport);
+ pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED);
+ transport->posted_tail_closed = true;
+ pni_maybe_post_closed(transport);
+ }
return n;
}
}
@@ -2041,6 +2079,13 @@ ssize_t pn_transport_push(pn_transport_t
}
}
+void pni_close_tail(pn_transport_t *transport)
+{
+ if (!transport->tail_closed) {
+ transport->tail_closed = true;
+ }
+}
+
int pn_transport_process(pn_transport_t *transport, size_t size)
{
assert(transport);
@@ -2050,7 +2095,7 @@ int pn_transport_process(pn_transport_t
ssize_t n = transport_consume( transport );
if (n == PN_EOS) {
- transport->tail_closed = true;
+ pni_close_tail(transport);
}
if (n < 0 && n != PN_EOS) return n;
@@ -2060,7 +2105,7 @@ int pn_transport_process(pn_transport_t
// input stream has closed
int pn_transport_close_tail(pn_transport_t *transport)
{
- transport->tail_closed = true;
+ pni_close_tail(transport);
transport_consume( transport );
return 0;
// XXX: what if not all input processed at this point? do we care???
@@ -2112,6 +2157,14 @@ void pn_transport_pop(pn_transport_t *tr
memmove( transport->output_buf, &transport->output_buf[size],
transport->output_pending );
}
+
+ if (!transport->output_pending && pn_transport_pending(transport) < 0 &&
+ !transport->posted_head_closed) {
+ pn_collector_t *collector = pni_transport_collector(transport);
+ pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED);
+ transport->posted_head_closed = true;
+ pni_maybe_post_closed(transport);
+ }
}
}
Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java (original)
+++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java Fri Oct 10 12:52:17 2014
@@ -28,56 +28,40 @@ package org.apache.qpid.proton.engine;
public interface Event
{
- public enum Category {
- CONNECTION,
- SESSION,
- LINK,
- DELIVERY,
- TRANSPORT;
- }
public enum Type {
- CONNECTION_INIT(Category.CONNECTION, 1),
- CONNECTION_OPEN(Category.CONNECTION, 2),
- CONNECTION_REMOTE_OPEN(Category.CONNECTION, 3),
- CONNECTION_CLOSE(Category.CONNECTION, 4),
- CONNECTION_REMOTE_CLOSE(Category.CONNECTION, 5),
- CONNECTION_FINAL(Category.CONNECTION, 6),
-
- SESSION_INIT(Category.SESSION, 1),
- SESSION_OPEN(Category.SESSION, 2),
- SESSION_REMOTE_OPEN(Category.SESSION, 3),
- SESSION_CLOSE(Category.SESSION, 4),
- SESSION_REMOTE_CLOSE(Category.SESSION, 5),
- SESSION_FINAL(Category.SESSION, 6),
-
- LINK_INIT(Category.LINK, 1),
- LINK_OPEN(Category.LINK, 2),
- LINK_REMOTE_OPEN(Category.LINK, 3),
- LINK_CLOSE(Category.LINK, 4),
- LINK_REMOTE_CLOSE(Category.LINK, 5),
- LINK_FLOW(Category.LINK, 6),
- LINK_FINAL(Category.LINK, 7),
-
- DELIVERY(Category.DELIVERY, 1),
- TRANSPORT(Category.TRANSPORT, 1);
-
- private int _opcode;
- private Category _category;
-
- private Type(Category c, int o)
- {
- this._category = c;
- this._opcode = o;
- }
-
- public Category getCategory()
- {
- return this._category;
- }
- }
+ CONNECTION_INIT,
+ CONNECTION_BOUND,
+ CONNECTION_UNBOUND,
+ CONNECTION_OPEN,
+ CONNECTION_REMOTE_OPEN,
+ CONNECTION_CLOSE,
+ CONNECTION_REMOTE_CLOSE,
+ CONNECTION_FINAL,
+
+ SESSION_INIT,
+ SESSION_OPEN,
+ SESSION_REMOTE_OPEN,
+ SESSION_CLOSE,
+ SESSION_REMOTE_CLOSE,
+ SESSION_FINAL,
+
+ LINK_INIT,
+ LINK_OPEN,
+ LINK_REMOTE_OPEN,
+ LINK_CLOSE,
+ LINK_REMOTE_CLOSE,
+ LINK_FLOW,
+ LINK_FINAL,
- Category getCategory();
+ DELIVERY,
+
+ TRANSPORT,
+ TRANSPORT_ERROR,
+ TRANSPORT_HEAD_CLOSED,
+ TRANSPORT_TAIL_CLOSED,
+ TRANSPORT_CLOSED
+ }
Type getType();
Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java (original)
+++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java Fri Oct 10 12:52:17 2014
@@ -56,11 +56,6 @@ class EventImpl implements Event
context = null;
}
- public Category getCategory()
- {
- return type.getCategory();
- }
-
public Type getType()
{
return type;
@@ -73,16 +68,15 @@ class EventImpl implements Event
public Connection getConnection()
{
- switch (type.getCategory()) {
- case CONNECTION:
+ if (context instanceof Connection) {
return (Connection) context;
- case TRANSPORT:
+ } else if (context instanceof Transport) {
Transport transport = getTransport();
if (transport == null) {
return null;
}
return ((TransportImpl) transport).getConnectionImpl();
- default:
+ } else {
Session ssn = getSession();
if (ssn == null) {
return null;
@@ -93,10 +87,9 @@ class EventImpl implements Event
public Session getSession()
{
- switch (type.getCategory()) {
- case SESSION:
+ if (context instanceof Session) {
return (Session) context;
- default:
+ } else {
Link link = getLink();
if (link == null) {
return null;
@@ -107,10 +100,9 @@ class EventImpl implements Event
public Link getLink()
{
- switch (type.getCategory()) {
- case LINK:
+ if (context instanceof Link) {
return (Link) context;
- default:
+ } else {
Delivery dlv = getDelivery();
if (dlv == null) {
return null;
@@ -121,20 +113,18 @@ class EventImpl implements Event
public Delivery getDelivery()
{
- switch (type.getCategory()) {
- case DELIVERY:
+ if (context instanceof Delivery) {
return (Delivery) context;
- default:
+ } else {
return null;
}
}
public Transport getTransport()
{
- switch (type.getCategory()) {
- case TRANSPORT:
+ if (context instanceof Transport) {
return (Transport) context;
- default:
+ } else {
return null;
}
}
@@ -150,4 +140,5 @@ class EventImpl implements Event
{
return "EventImpl{" + "type=" + type + ", context=" + context + '}';
}
+
}
Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Fri Oct 10 12:52:17 2014
@@ -123,6 +123,9 @@ public class TransportImpl extends Endpo
private boolean _head_closed = false;
private TransportException _tail_error = null;
+ private boolean postedHeadClosed = false;
+ private boolean postedTailClosed = false;
+
/**
* @deprecated This constructor's visibility will be reduced to the default scope in a future release.
* Client code outside this module should use a {@link EngineFactory} instead
@@ -210,8 +213,10 @@ public class TransportImpl extends Endpo
@Override
public void bind(Connection conn)
{
- _connectionEndpoint = (ConnectionImpl) conn;
// TODO - check if already bound
+
+ _connectionEndpoint = (ConnectionImpl) conn;
+ put(Event.Type.CONNECTION_BOUND, conn);
_connectionEndpoint.setTransport(this);
_connectionEndpoint.incref();
@@ -230,6 +235,7 @@ public class TransportImpl extends Endpo
@Override
public void unbind()
{
+ put(Event.Type.CONNECTION_UNBOUND, _connectionEndpoint);
_connectionEndpoint.modifyEndpoints();
_connectionEndpoint.setTransport(null);
@@ -1236,6 +1242,19 @@ public class TransportImpl extends Endpo
return _closeReceived;
}
+ void put(Event.Type type, Object context) {
+ if (_connectionEndpoint != null) {
+ _connectionEndpoint.put(type, context);
+ }
+ }
+
+ private void maybePostClosed()
+ {
+ if (postedHeadClosed && postedTailClosed) {
+ put(Event.Type.TRANSPORT_CLOSED, this);
+ }
+ }
+
@Override
public void closed(TransportException error)
{
@@ -1247,6 +1266,14 @@ public class TransportImpl extends Endpo
}
_head_closed = true;
}
+ if (_tail_error != null) {
+ put(Event.Type.TRANSPORT_ERROR, this);
+ }
+ if (!postedTailClosed) {
+ put(Event.Type.TRANSPORT_TAIL_CLOSED, this);
+ postedTailClosed = true;
+ maybePostClosed();
+ }
}
@Override
@@ -1351,6 +1378,13 @@ public class TransportImpl extends Endpo
{
init();
_outputProcessor.pop(bytes);
+
+ int p = pending();
+ if (p < 0 && !postedHeadClosed) {
+ put(Event.Type.TRANSPORT_HEAD_CLOSED, this);
+ postedHeadClosed = true;
+ maybePostClosed();
+ }
}
@Override
Modified: qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py Fri Oct 10 12:52:17 2014
@@ -942,13 +942,9 @@ def pn_transport_closed(trans):
from org.apache.qpid.proton.engine import Event
-PN_EVENT_CATEGORY_CONNECTION = Event.Category.CONNECTION
-PN_EVENT_CATEGORY_SESSION = Event.Category.SESSION
-PN_EVENT_CATEGORY_LINK = Event.Category.LINK
-PN_EVENT_CATEGORY_DELIVERY = Event.Category.DELIVERY
-PN_EVENT_CATEGORY_TRANSPORT = Event.Category.TRANSPORT
-
PN_CONNECTION_INIT = Event.Type.CONNECTION_INIT
+PN_CONNECTION_BOUND = Event.Type.CONNECTION_BOUND
+PN_CONNECTION_UNBOUND = Event.Type.CONNECTION_UNBOUND
PN_CONNECTION_OPEN = Event.Type.CONNECTION_OPEN
PN_CONNECTION_REMOTE_OPEN = Event.Type.CONNECTION_REMOTE_OPEN
PN_CONNECTION_CLOSE = Event.Type.CONNECTION_CLOSE
@@ -969,6 +965,10 @@ PN_LINK_FLOW = Event.Type.LINK_FLOW
PN_LINK_FINAL = Event.Type.LINK_FINAL
PN_DELIVERY = Event.Type.DELIVERY
PN_TRANSPORT = Event.Type.TRANSPORT
+PN_TRANSPORT_ERROR = Event.Type.TRANSPORT_ERROR
+PN_TRANSPORT_HEAD_CLOSED = Event.Type.TRANSPORT_HEAD_CLOSED
+PN_TRANSPORT_TAIL_CLOSED = Event.Type.TRANSPORT_TAIL_CLOSED
+PN_TRANSPORT_CLOSED = Event.Type.TRANSPORT_CLOSED
def pn_collector():
return Proton.collector()
Modified: qpid/proton/branches/examples/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tests/python/proton_tests/engine.py?rev=1630822&r1=1630821&r2=1630822&view=diff
==============================================================================
--- qpid/proton/branches/examples/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/branches/examples/tests/python/proton_tests/engine.py Fri Oct 10 12:52:17 2014
@@ -2150,8 +2150,8 @@ class EventTest(CollectorTest):
self.pump()
c1.free()
c1._transport.unbind()
- self.expect(Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL,
- Event.CONNECTION_FINAL)
+ self.expect(Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.LINK_FINAL,
+ Event.SESSION_FINAL, Event.CONNECTION_FINAL)
def testConnectionINIT_FINAL(self):
c = Connection()
@@ -2215,8 +2215,8 @@ class EventTest(CollectorTest):
self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY)
rcv.session.connection._transport.unbind()
rcv.session.connection.free()
- self.expect(Event.TRANSPORT, Event.LINK_FINAL, Event.SESSION_FINAL,
- Event.CONNECTION_FINAL)
+ self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT, Event.LINK_FINAL,
+ Event.SESSION_FINAL, Event.CONNECTION_FINAL)
def testDeliveryEventsDisp(self):
snd, rcv = self.testFlowEvents()
@@ -2235,6 +2235,60 @@ class EventTest(CollectorTest):
event = self.expect(Event.DELIVERY)
assert event.context == dlv
+ def testConnectionBOUND_UNBOUND(self):
+ c = Connection()
+ c.collect(self.collector)
+ self.expect(Event.CONNECTION_INIT)
+ t = Transport()
+ t.bind(c)
+ self.expect(Event.CONNECTION_BOUND)
+ t.unbind()
+ self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT)
+
+ def testTransportERROR_CLOSE(self):
+ c = Connection()
+ c.collect(self.collector)
+ self.expect(Event.CONNECTION_INIT)
+ t = Transport()
+ t.bind(c)
+ self.expect(Event.CONNECTION_BOUND)
+ t.push("asdf")
+ self.expect(Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED)
+ p = t.pending()
+ assert p > 0
+ # XXX: can't include this because java behaviour is different
+ #assert "AMQP header mismatch" in t.peek(p), repr(t.peek(p))
+ t.pop(p)
+ self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+ def testTransportCLOSED(self):
+ c = Connection()
+ c.collect(self.collector)
+ self.expect(Event.CONNECTION_INIT)
+ t = Transport()
+ t.bind(c)
+ c.open()
+
+ self.expect(Event.CONNECTION_BOUND, Event.CONNECTION_OPEN, Event.TRANSPORT)
+
+ c2 = Connection()
+ t2 = Transport()
+ t2.bind(c2)
+ c2.open()
+ c2.close()
+
+ pump(t, t2)
+
+ self.expect(Event.CONNECTION_REMOTE_OPEN, Event.CONNECTION_REMOTE_CLOSE,
+ Event.TRANSPORT_TAIL_CLOSED)
+
+ c.close()
+
+ pump(t, t2)
+
+ self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT,
+ Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
class PeerTest(CollectorTest):
def setup(self):
@@ -2255,7 +2309,8 @@ class TeardownLeakTest(PeerTest):
def doLeak(self, local, remote):
self.connection.open()
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_OPEN, Event.TRANSPORT)
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_OPEN, Event.TRANSPORT)
ssn = self.connection.session()
ssn.open()
@@ -2294,19 +2349,23 @@ class TeardownLeakTest(PeerTest):
self.pump()
if remote:
- self.expect_oneof((Event.LINK_REMOTE_CLOSE, Event.SESSION_REMOTE_CLOSE,
- Event.CONNECTION_REMOTE_CLOSE),
- (Event.LINK_REMOTE_CLOSE, Event.LINK_FINAL,
- Event.SESSION_REMOTE_CLOSE,
- Event.CONNECTION_REMOTE_CLOSE))
+ self.expect_oneof((Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE,
+ Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE,
+ Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_CLOSED),
+ (Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE,
+ Event.LINK_FINAL, Event.SESSION_REMOTE_CLOSE,
+ Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED,
+ Event.TRANSPORT_CLOSED))
else:
- self.expect(Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE)
+ self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.SESSION_REMOTE_CLOSE,
+ Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED,
+ Event.TRANSPORT_CLOSED)
self.connection.free()
self.transport.unbind()
- self.expect_oneof((Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL),
- (Event.SESSION_FINAL, Event.CONNECTION_FINAL))
+ self.expect_oneof((Event.LINK_FINAL, Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.CONNECTION_FINAL),
+ (Event.CONNECTION_UNBOUND, Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL))
def testLocalRemoteLeak(self):
self.doLeak(True, True)
Added: qpid/proton/branches/examples/tests/python/proton_tests/scratch.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tests/python/proton_tests/scratch.py?rev=1630822&view=auto
==============================================================================
--- qpid/proton/branches/examples/tests/python/proton_tests/scratch.py (added)
+++ qpid/proton/branches/examples/tests/python/proton_tests/scratch.py Fri Oct 10 12:52:17 2014
@@ -0,0 +1,44 @@
+ def xxx_test_reopen_on_same_session(self):
+ ssn1 = self.snd.session
+ ssn2 = self.rcv.session
+
+ self.snd.open()
+ self.rcv.open()
+ self.pump()
+
+ assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+
+ self.snd.close()
+ self.rcv.close()
+ self.pump()
+
+ assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+ assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+
+ print self.snd._link
+ self.snd = ssn1.sender("test-link")
+ print self.snd._link
+ self.rcv = ssn2.receiver("test-link")
+ self.snd.open()
+ self.rcv.open()
+ self.pump()
+
+ assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+
+class SessionPipelineTest(PeerTest):
+
+ def xxx_test(self):
+ self.connection.open()
+ self.peer.open()
+ self.pump()
+ ssn = self.connection.session()
+ ssn.open()
+ self.pump()
+ peer_ssn = self.peer.session_head(0)
+ ssn.close()
+ self.pump()
+ peer_ssn.close()
+ self.peer.close()
+ self.pump()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org