You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/10/01 13:33:10 UTC
svn commit: r1392286 [1/2] - in /qpid/proton/trunk: ./
proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/
proton-c/src/codec/ proton-c/src/engine/ proton-c/src/message/
proton-j/src/main/scripts/ proton-j/src/test/java/org/apache/qpid/pro...
Author: rhs
Date: Mon Oct 1 11:33:09 2012
New Revision: 1392286
URL: http://svn.apache.org/viewvc?rev=1392286&view=rev
Log:
added idiomatic API for lower level proton functionality; changed test suite over to the idiomatic API; changed java tests to shim to the idiomatic API rather than the swigged API
Added:
qpid/proton/trunk/proton-j/src/main/scripts/proton.py
Removed:
qpid/proton/trunk/proton-j/src/main/scripts/jproton.py
qpid/proton/trunk/proton-j/src/main/scripts/xproton.py
Modified:
qpid/proton/trunk/config.sh
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/bindings/python/python.i
qpid/proton/trunk/proton-c/include/proton/codec.h
qpid/proton/trunk/proton-c/src/buffer.c
qpid/proton/trunk/proton-c/src/codec/codec.c
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/message/message.c
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/test/JythonTest.java
qpid/proton/trunk/tests/proton_tests/codec.py
qpid/proton/trunk/tests/proton_tests/common.py
qpid/proton/trunk/tests/proton_tests/engine.py
qpid/proton/trunk/tests/proton_tests/message.py
qpid/proton/trunk/tests/proton_tests/messenger.py
qpid/proton/trunk/tests/proton_tests/sasl.py
qpid/proton/trunk/tests/proton_tests/transport.py
Modified: qpid/proton/trunk/config.sh
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/config.sh?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/config.sh (original)
+++ qpid/proton/trunk/config.sh Mon Oct 1 11:33:09 2012
@@ -32,8 +32,8 @@ fi
# Python & Jython
export PYTHON_BINDINGS=$PROTON_BINDINGS/python
-export COMMON_PYPATH=$PROTON_HOME/tests:$PROTON_HOME/proton-c/bindings/python
-export PYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-c:$PYTHON_BINDINGS
+export COMMON_PYPATH=$PROTON_HOME/tests
+export PYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-c/bindings/python:$PYTHON_BINDINGS
export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/src/main/scripts:$PROTON_HOME/proton-j/target/qpid-proton-1.0-SNAPSHOT.jar
# PHP
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Mon Oct 1 11:33:09 2012
@@ -30,7 +30,7 @@ The proton APIs consist of the following
"""
-from xproton import *
+from cproton import *
class ProtonException(Exception):
"""
@@ -328,6 +328,8 @@ class Message(object):
AMQP = PN_AMQP
JSON = PN_JSON
+ DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
+
def __init__(self):
self._msg = pn_message()
@@ -564,6 +566,20 @@ The group-id for any replies.
The format of the message.
""")
+ def encode(self):
+ sz = 16
+ while True:
+ err, data = pn_message_encode(self._msg, sz)
+ if err == PN_OVERFLOW:
+ sz *= 2
+ continue
+ else:
+ self._check(err)
+ return data
+
+ def decode(self, data):
+ return self._check(pn_message_decode(self._msg, data, len(data)))
+
def load(self, data):
self._check(pn_message_load(self._msg, data))
@@ -1149,5 +1165,365 @@ class Data:
def dump(self):
pn_data_dump(self._data)
+class ConnectionException(ProtonException):
+ pass
+
+class Endpoint(object):
+
+ LOCAL_UNINIT = PN_LOCAL_UNINIT
+ REMOTE_UNINIT = PN_REMOTE_UNINIT
+ LOCAL_ACTIVE = PN_LOCAL_ACTIVE
+ REMOTE_ACTIVE = PN_REMOTE_ACTIVE
+ LOCAL_CLOSED = PN_LOCAL_CLOSED
+ REMOTE_CLOSED = PN_REMOTE_CLOSED
+
+def wrap_connection(conn):
+ if not conn: return None
+ ctx = pn_connection_context(conn)
+ if ctx: return ctx
+ wrapper = Connection(_conn=conn)
+ return wrapper
+
+class Connection(Endpoint):
+
+ def __init__(self, _conn=None):
+ if _conn:
+ self._conn = _conn
+ else:
+ self._conn = pn_connection()
+ pn_connection_set_context(self._conn, self)
+
+ def __del__(self):
+ if hasattr(self, "_conn"):
+ pn_connection_free(self._conn)
+ del self._conn
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, ConnectionException)
+ raise exc("[%s]: %s" % (err, pn_connection_error(self._conn)))
+ else:
+ return err
+
+ def _get_container(self):
+ return pn_connection_container(self._conn)
+ def _set_container(self, name):
+ return pn_connection_set_container(self._conn, name)
+
+ container = property(_get_container, _set_container)
+
+ def _get_hostname(self):
+ return pn_connection_hostname(self._conn)
+ def _set_hostname(self, name):
+ return pn_connection_set_hostname(self._conn, name)
+
+ hostname = property(_get_hostname, _set_hostname)
+
+ @property
+ def remote_container(self):
+ return pn_connection_remote_container(self._conn)
+
+ @property
+ def remote_hostname(self):
+ return pn_connection_remote_hostname(self._conn)
+
+ def open(self):
+ pn_connection_open(self._conn)
+
+ def close(self):
+ pn_connection_close(self._conn)
+
+ @property
+ def state(self):
+ return pn_connection_state(self._conn)
+
+ def session(self):
+ return wrap_session(pn_session(self._conn))
+
+ def session_head(self, mask):
+ return wrap_session(pn_session_head(self._conn, mask))
+
+ def link_head(self, mask):
+ return wrap_link(pn_link_head(self._conn, mask))
+
+ @property
+ def work_head(self):
+ return wrap_delivery(pn_work_head(self._conn))
+
+class SessionException(ProtonException):
+ pass
+
+def wrap_session(ssn):
+ if ssn is None: return None
+ ctx = pn_session_context(ssn)
+ if ctx:
+ return ctx
+ else:
+ wrapper = Session(ssn)
+ pn_session_set_context(ssn, wrapper)
+ return wrapper
+
+class Session(Endpoint):
+
+ def __init__(self, ssn):
+ self._ssn = ssn
+
+ def __del__(self):
+ if hasattr(self, "_ssn"):
+ pn_session_free(self._ssn)
+ del self._ssn
+
+ def open(self):
+ pn_session_open(self._ssn)
+
+ def close(self):
+ pn_session_close(self._ssn)
+
+ @property
+ def state(self):
+ return pn_session_state(self._ssn)
+
+ @property
+ def connection(self):
+ return wrap_connection(pn_get_connection(self._ssn))
+
+ def sender(self, name):
+ return wrap_link(pn_sender(self._ssn, name))
+
+ def receiver(self, name):
+ return wrap_link(pn_receiver(self._ssn, name))
+
+class LinkException(ProtonException):
+ pass
+
+def wrap_link(link):
+ if link is None: return None
+ ctx = pn_link_context(link)
+ if ctx:
+ return ctx
+ else:
+ if pn_is_sender(link):
+ wrapper = Sender(link)
+ else:
+ wrapper = Receiver(link)
+ pn_link_set_context(link, wrapper)
+ return wrapper
+
+class Link(Endpoint):
+
+ def __init__(self, link):
+ self._link = link
+
+ def __del__(self):
+ if hasattr(self, "_link"):
+ pn_link_free(self._link)
+ del self._link
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, LinkException)
+ raise exc("[%s]: %s" % (err, pn_connection_error(self._conn)))
+ else:
+ return err
+
+ def open(self):
+ pn_link_open(self._link)
+
+ def close(self):
+ pn_link_close(self._link)
+
+ @property
+ def state(self):
+ return pn_link_state(self._link)
+
+ @property
+ def session(self):
+ return wrap_session(pn_get_session(self._link))
+
+ def delivery(self, tag):
+ return wrap_delivery(pn_delivery(self._link, tag))
+
+ @property
+ def current(self):
+ return wrap_delivery(pn_current(self._link))
+
+ def advance(self):
+ return pn_advance(self._link)
+
+ @property
+ def unsettled(self):
+ return pn_unsettled(self._link)
+
+ @property
+ def credit(self):
+ return pn_credit(self._link)
+
+ @property
+ def queued(self):
+ return pn_queued(self._link)
+
+ def next(self, mask):
+ return wrap_link(pn_link_next(self._link, mask))
+
+class Sender(Link):
+
+ def send(self, bytes):
+ return self._check(pn_send(self._link, bytes))
+
+ def drained(self):
+ pn_drained(self._link)
+
+class Receiver(Link):
+
+ def flow(self, n):
+ pn_flow(self._link, n)
+
+ def recv(self, limit):
+ n, bytes = pn_recv(self._link, limit)
+ if n == PN_EOS:
+ return None
+ else:
+ self._check(n)
+ return bytes
+
+ def drain(self, n):
+ pn_drain(self._link, n)
+
+def wrap_delivery(dlv):
+ if not dlv: return None
+ ctx = pn_delivery_context(dlv)
+ if ctx: return ctx
+ wrapper = Delivery(dlv)
+ pn_delivery_set_context(dlv, wrapper)
+ return wrapper
+
+class Delivery(object):
+
+ ACCEPTED = PN_ACCEPTED
+
+ def __init__(self, dlv):
+ self._dlv = dlv
+
+ @property
+ def tag(self):
+ return pn_delivery_tag(self._dlv)
+
+ @property
+ def writable(self):
+ return pn_writable(self._dlv)
+
+ @property
+ def readable(self):
+ return pn_readable(self._dlv)
+
+ @property
+ def updated(self):
+ return pn_updated(self._dlv)
+
+ def disposition(self, disp):
+ pn_disposition(self._dlv, disp)
+
+ @property
+ def local_disposition(self):
+ return pn_local_disposition(self._dlv)
+
+ @property
+ def remote_disposition(self):
+ return pn_remote_disposition(self._dlv)
+
+ @property
+ def remote_settled(self):
+ return pn_remote_settled(self._dlv)
+
+ def settle(self):
+ pn_settle(self._dlv)
+
+ @property
+ def work_next(self):
+ return wrap_delivery(pn_work_next(self._dlv))
+
+class TransportException(ProtonException):
+ pass
+
+class Transport(object):
+
+ TRACE_DRV = PN_TRACE_DRV
+ TRACE_FRM = PN_TRACE_FRM
+ TRACE_RAW = PN_TRACE_RAW
+
+ def __init__(self):
+ self._trans = pn_transport()
+
+ def __del__(self):
+ if hasattr(self, "_trans"):
+ pn_transport_free(self._trans)
+ del self._trans
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, TransportException)
+ raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans))))
+ else:
+ return err
+
+ def bind(self, connection):
+ self._check(pn_transport_bind(self._trans, connection._conn))
+
+ def trace(self, n):
+ pn_trace(self._trans, n)
+
+ def tick(self, now):
+ return pn_tick(self._trans, now)
+
+ def output(self, n):
+ cd, out = pn_output(self._trans, n)
+ if cd == PN_EOS:
+ return None
+ else:
+ self._check(cd)
+ return out
+
+ def input(self, binary):
+ n = pn_input(self._trans, binary)
+ if n == PN_EOS:
+ return None
+ else:
+ return self._check(n)
+
+class SASL(object):
+
+ OK = PN_SASL_OK
+ AUTH = PN_SASL_AUTH
+
+ def __init__(self, transport):
+ self._sasl = pn_sasl(transport._trans)
+
+ def mechanisms(self, mechs):
+ pn_sasl_mechanisms(self._sasl, mechs)
+
+ def client(self):
+ pn_sasl_client(self._sasl)
+
+ def server(self):
+ pn_sasl_server(self._sasl)
+
+ @property
+ def outcome(self):
+ outcome = pn_sasl_outcome(self._sasl)
+ if outcome == PN_SASL_NONE:
+ return None
+ else:
+ return outcome
+
+ def done(self, outcome):
+ pn_sasl_done(self._sasl, outcome)
+
+class SSL(object):
+
+ def __init__(self, transport):
+ self._ssl = pn_ssl(transport._trans)
+
__all__ = ["Messenger", "Message", "ProtonException", "MessengerException",
- "MessageException", "Timeout", "Data"]
+ "MessageException", "Timeout", "Data", "Endpoint", "Connection",
+ "Session", "Link", "Sender", "Receiver", "Delivery", "Transport",
+ "TransportException", "SASL", "SSL", "PN_SESSION_WINDOW"]
Modified: qpid/proton/trunk/proton-c/bindings/python/python.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/python.i?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/python.i (original)
+++ qpid/proton/trunk/proton-c/bindings/python/python.i Mon Oct 1 11:33:09 2012
@@ -222,6 +222,142 @@ ssize_t pn_input(pn_transport_t *transpo
%}
%ignore pn_connector_free;
+%rename(pn_connection_context) wrap_pn_connection_context;
+%inline {
+ PyObject *wrap_pn_connection_context(pn_connection_t *c) {
+ PyObject *result = pn_connection_context(c);
+ if (result) {
+ Py_INCREF(result);
+ return result;
+ } else {
+ Py_RETURN_NONE;
+ }
+ }
+}
+%ignore pn_connection_context;
+
+%rename(pn_connection_set_context) wrap_pn_connection_set_context;
+%inline {
+ void wrap_pn_connection_set_context(pn_connection_t *c, PyObject *context) {
+ Py_XDECREF(pn_connection_context(c));
+ Py_XINCREF(context);
+ pn_connection_set_context(c, context);
+ }
+}
+%ignore pn_connection_set_context;
+
+%rename(pn_connection_free) wrap_pn_connection_free;
+%inline %{
+ void wrap_pn_connection_free(pn_connection_t *c) {
+ PyObject *obj = pn_connection_context(c);
+ Py_XDECREF(obj);
+ pn_connection_free(c);
+ }
+%}
+%ignore pn_connection_free;
+
+%rename(pn_session_context) wrap_pn_session_context;
+%inline {
+ PyObject *wrap_pn_session_context(pn_session_t *s) {
+ PyObject *result = pn_session_context(s);
+ if (result) {
+ Py_INCREF(result);
+ return result;
+ } else {
+ Py_RETURN_NONE;
+ }
+ }
+}
+%ignore pn_session_context;
+
+%rename(pn_session_set_context) wrap_pn_session_set_context;
+%inline {
+ void wrap_pn_session_set_context(pn_session_t *s, PyObject *context) {
+ Py_XDECREF(pn_session_context(s));
+ Py_XINCREF(context);
+ pn_session_set_context(s, context);
+ }
+}
+%ignore pn_session_set_context;
+
+%rename(pn_session_free) wrap_pn_session_free;
+%inline %{
+ void wrap_pn_session_free(pn_session_t *s) {
+ PyObject *obj = pn_session_context(s);
+ Py_XDECREF(obj);
+ pn_session_free(s);
+ }
+%}
+%ignore pn_session_free;
+
+%rename(pn_link_context) wrap_pn_link_context;
+%inline {
+ PyObject *wrap_pn_link_context(pn_link_t *l) {
+ PyObject *result = pn_link_context(l);
+ if (result) {
+ Py_INCREF(result);
+ return result;
+ } else {
+ Py_RETURN_NONE;
+ }
+ }
+}
+%ignore pn_link_context;
+
+%rename(pn_link_set_context) wrap_pn_link_set_context;
+%inline {
+ void wrap_pn_link_set_context(pn_link_t *l, PyObject *context) {
+ Py_XDECREF(pn_link_context(l));
+ Py_XINCREF(context);
+ pn_link_set_context(l, context);
+ }
+}
+%ignore pn_link_set_context;
+
+%rename(pn_link_free) wrap_pn_link_free;
+%inline %{
+ void wrap_pn_link_free(pn_link_t *l) {
+ PyObject *obj = pn_link_context(l);
+ Py_XDECREF(obj);
+ pn_link_free(l);
+ }
+%}
+%ignore pn_link_free;
+
+%rename(pn_delivery_context) wrap_pn_delivery_context;
+%inline {
+ PyObject *wrap_pn_delivery_context(pn_delivery_t *d) {
+ PyObject *result = pn_delivery_context(d);
+ if (result) {
+ Py_INCREF(result);
+ return result;
+ } else {
+ Py_RETURN_NONE;
+ }
+ }
+}
+%ignore pn_delivery_context;
+
+%rename(pn_delivery_set_context) wrap_pn_delivery_set_context;
+%inline {
+ void wrap_pn_delivery_set_context(pn_delivery_t *d, PyObject *context) {
+ Py_XDECREF(pn_delivery_context(d));
+ Py_XINCREF(context);
+ pn_delivery_set_context(d, context);
+ }
+}
+%ignore pn_delivery_set_context;
+
+%rename(pn_settle) wrap_pn_settle;
+%inline %{
+ void wrap_pn_settle(pn_delivery_t *d) {
+ PyObject *obj = pn_delivery_context(d);
+ Py_XDECREF(obj);
+ pn_settle(d);
+ }
+%}
+%ignore pn_settle;
+
ssize_t pn_data_decode(pn_data_t *data, char *STRING, size_t LENGTH);
%ignore pn_data_decode;
Modified: qpid/proton/trunk/proton-c/include/proton/codec.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/codec.h?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/codec.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/codec.h Mon Oct 1 11:33:09 2012
@@ -90,7 +90,7 @@ int pn_data_fill(pn_data_t *data, const
int pn_data_vscan(pn_data_t *data, const char *fmt, va_list ap);
int pn_data_scan(pn_data_t *data, const char *fmt, ...);
-int pn_data_clear(pn_data_t *data);
+void pn_data_clear(pn_data_t *data);
size_t pn_data_size(pn_data_t *data);
void pn_data_rewind(pn_data_t *data);
bool pn_data_next(pn_data_t *data, pn_type_t *type);
Modified: qpid/proton/trunk/proton-c/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/buffer.c?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/buffer.c (original)
+++ qpid/proton/trunk/proton-c/src/buffer.c Mon Oct 1 11:33:09 2012
@@ -193,13 +193,16 @@ size_t pn_buffer_index(pn_buffer_t *buf,
size_t pn_buffer_get(pn_buffer_t *buf, size_t offset, size_t size, char *dst)
{
+ size = pn_min(size, buf->size);
size_t start = pn_buffer_index(buf, offset);
- size_t stop = pn_buffer_index(buf, pn_min(offset + size, buf->size));
+ size_t stop = pn_buffer_index(buf, offset + size);
+
+ if (size == 0) return 0;
size_t sz1;
size_t sz2;
- if (start > stop) {
+ if (start >= stop) {
sz1 = buf->capacity - start;
sz2 = stop;
} else {
Modified: qpid/proton/trunk/proton-c/src/codec/codec.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/codec/codec.c?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/codec/codec.c (original)
+++ qpid/proton/trunk/proton-c/src/codec/codec.c Mon Oct 1 11:33:09 2012
@@ -1715,7 +1715,7 @@ size_t pn_data_size(pn_data_t *data)
return data ? data->size : 0;
}
-int pn_data_clear(pn_data_t *data)
+void pn_data_clear(pn_data_t *data)
{
if (data) {
data->size = 0;
@@ -1724,7 +1724,6 @@ int pn_data_clear(pn_data_t *data)
data->current = 0;
pn_buffer_clear(data->buf);
}
- return 0;
}
int pn_data_grow(pn_data_t *data)
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Oct 1 11:33:09 2012
@@ -290,7 +290,8 @@ void pn_session_free(pn_session_t *sessi
while (session->link_count)
pn_link_free(session->links[session->link_count - 1]);
- pn_remove_session(session->connection, session);
+ if (session->connection)
+ pn_remove_session(session->connection, session);
free(session->links);
pn_endpoint_tini(&session->endpoint);
free(session);
Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Mon Oct 1 11:33:09 2012
@@ -71,6 +71,7 @@ struct pn_message_t {
pn_format_t format;
pn_parser_t *parser;
+ pn_error_t *error;
};
pn_message_t *pn_message()
@@ -98,6 +99,7 @@ pn_message_t *pn_message()
msg->body = NULL;
msg->format = PN_DATA;
msg->parser = NULL;
+ msg->error = pn_error();
return msg;
}
@@ -115,6 +117,7 @@ void pn_message_free(pn_message_t *msg)
pn_data_free(msg->data);
pn_data_free(msg->body);
pn_parser_free(msg->parser);
+ pn_error_free(msg->error);
free(msg);
}
}
@@ -145,8 +148,8 @@ void pn_message_clear(pn_message_t *msg)
int pn_message_errno(pn_message_t *msg)
{
- if (msg && msg->parser) {
- return pn_parser_errno(msg->parser);
+ if (msg) {
+ return pn_error_code(msg->error);
} else {
return 0;
}
@@ -154,8 +157,8 @@ int pn_message_errno(pn_message_t *msg)
const char *pn_message_error(pn_message_t *msg)
{
- if (msg && msg->parser) {
- return pn_parser_error(msg->parser);
+ if (msg) {
+ return pn_error_text(msg->error);
} else {
return NULL;
}
@@ -423,13 +426,15 @@ int pn_message_decode(pn_message_t *msg,
while (size) {
pn_data_clear(msg->data);
ssize_t used = pn_data_decode(msg->data, (char *) bytes, size);
- if (used < 0) return used;
+ if (used < 0) return pn_error_format(msg->error, used, "data error: %s",
+ pn_data_error(msg->data));
size -= used;
bytes += used;
bool scanned;
uint64_t desc;
int err = pn_data_scan(msg->data, "D?L.", &scanned, &desc);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "data error: %s",
+ pn_data_error(msg->data));
if (!scanned){
desc = 0;
}
@@ -447,25 +452,26 @@ int pn_message_decode(pn_message_t *msg,
&subject, &reply_to, &ctype, &cencoding,
&msg->expiry_time, &msg->creation_time, &group_id,
&msg->group_sequence, &reply_to_group_id);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "data error: %s",
+ pn_data_error(msg->data));
err = pn_buffer_set_bytes(&msg->user_id, user_id);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "error setting user_id");
err = pn_buffer_set_strn(&msg->address, address.start, address.size);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "error setting address");
err = pn_buffer_set_strn(&msg->subject, subject.start, subject.size);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "error setting subject");
err = pn_buffer_set_strn(&msg->reply_to, reply_to.start, reply_to.size);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "error setting reply_to");
err = pn_buffer_set_strn(&msg->content_type, ctype.start, ctype.size);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "error setting content_type");
err = pn_buffer_set_strn(&msg->content_encoding, cencoding.start,
cencoding.size);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "error setting content_encoding");
err = pn_buffer_set_strn(&msg->group_id, group_id.start, group_id.size);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "error setting group_id");
err = pn_buffer_set_strn(&msg->reply_to_group_id, reply_to_group_id.start,
reply_to_group_id.size);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "error setting reply_to_group_id");
}
break;
case DELIVERY_ANNOTATIONS:
@@ -481,7 +487,8 @@ int pn_message_decode(pn_message_t *msg,
}
}
- return pn_data_clear(msg->data);
+ pn_data_clear(msg->data);
+ return 0;
}
int pn_message_encode(pn_message_t *msg, char *bytes, size_t *size)
@@ -494,13 +501,14 @@ int pn_message_encode(pn_message_t *msg,
msg->body = pn_data(64);
}
- int err = pn_data_clear(msg->data);
- if (err) return err;
+ pn_data_clear(msg->data);
- err = pn_data_fill(msg->data, "DL[oBIoI]", HEADER, msg->durable,
- msg->priority, msg->ttl, msg->first_acquirer,
- msg->delivery_count);
- if (err) return err;
+ int err = pn_data_fill(msg->data, "DL[oBIoI]", HEADER, msg->durable,
+ msg->priority, msg->ttl, msg->first_acquirer,
+ msg->delivery_count);
+ if (err)
+ return pn_error_format(msg->error, err, "data error: %s",
+ pn_data_error(msg->data));
err = pn_data_fill(msg->data, "DL[nzSSSnssLLSiS]", PROPERTIES,
pn_buffer_bytes(msg->user_id),
@@ -514,17 +522,23 @@ int pn_message_encode(pn_message_t *msg,
pn_buffer_str(msg->group_id),
msg->group_sequence,
pn_buffer_str(msg->reply_to_group_id));
- if (err) return err;
+ if (err)
+ return pn_error_format(msg->error, err, "data error: %s",
+ pn_data_error(msg->data));
size_t remaining = *size;
ssize_t encoded = pn_data_encode(msg->data, bytes, remaining);
- if (encoded < 0) return encoded;
+ if (encoded < 0)
+ return pn_error_format(msg->error, encoded, "data error: %s",
+ pn_data_error(msg->data));
bytes += encoded;
remaining -= encoded;
encoded = pn_data_encode(msg->body, bytes, remaining);
- if (encoded < 0) return encoded;
+ if (encoded < 0)
+ return pn_error_format(msg->error, encoded, "data error: %s",
+ pn_data_error(msg->body));
bytes += encoded;
remaining -= encoded;
@@ -568,7 +582,13 @@ int pn_message_load_data(pn_message_t *m
}
pn_data_clear(msg->body);
- return pn_data_fill(msg->body, "DLz", DATA, size, data);
+ int err = pn_data_fill(msg->body, "DLz", DATA, size, data);
+ if (err) {
+ return pn_error_format(msg->error, err, "data error: %s",
+ pn_data_error(msg->body));
+ } else {
+ return 0;
+ }
}
int pn_message_load_text(pn_message_t *msg, const char *data, size_t size)
@@ -579,7 +599,13 @@ int pn_message_load_text(pn_message_t *m
}
pn_data_clear(msg->body);
- return pn_data_fill(msg->body, "DLS", AMQP_VALUE, data);
+ int err = pn_data_fill(msg->body, "DLS", AMQP_VALUE, data);
+ if (err) {
+ return pn_error_format(msg->error, err, "data error: %s",
+ pn_data_error(msg->body));
+ } else {
+ return 0;
+ }
}
int pn_message_load_amqp(pn_message_t *msg, const char *data, size_t size)
@@ -593,7 +619,13 @@ int pn_message_load_amqp(pn_message_t *m
pn_parser_t *parser = pn_message_parser(msg);
pn_data_clear(msg->body);
- return pn_parser_parse(parser, data, msg->body);
+ int err = pn_parser_parse(parser, data, msg->body);
+ if (err) {
+ return pn_error_format(msg->error, err, "parse error: %s",
+ pn_parser_error(parser));
+ } else {
+ return 0;
+ }
}
int pn_message_load_json(pn_message_t *msg, const char *data, size_t size)
@@ -632,7 +664,8 @@ int pn_message_save_data(pn_message_t *m
pn_bytes_t bytes;
bool scanned;
int err = pn_data_scan(msg->body, "DL?z", &desc, &scanned, &bytes);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "data error: %s",
+ pn_data_error(msg->body));
if (desc == DATA && scanned) {
if (bytes.size > *size) {
return PN_OVERFLOW;
@@ -659,7 +692,8 @@ int pn_message_save_text(pn_message_t *m
pn_bytes_t str = {0,0};
bool scanned, dscanned;
int err = pn_data_scan(msg->body, "?DL?S", &dscanned, &desc, &scanned, &str);
- if (err) return err;
+ if (err) return pn_error_format(msg->error, err, "data error: %s",
+ pn_data_error(msg->body));
if (dscanned && desc == AMQP_VALUE) {
if (scanned && str.size >= *size) {
return PN_OVERFLOW;
@@ -683,7 +717,11 @@ int pn_message_save_amqp(pn_message_t *m
return 0;
}
- return pn_data_format(msg->body, data, size);
+ int err = pn_data_format(msg->body, data, size);
+ if (err) return pn_error_format(msg->error, err, "data error: %s",
+ pn_data_error(msg->body));
+
+ return 0;
}
int pn_message_save_json(pn_message_t *msg, char *data, size_t *size)
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Mon Oct 1 11:33:09 2012
@@ -655,6 +655,7 @@ int pn_messenger_get(pn_messenger_t *mes
pn_link_t *l = pn_link(d);
ssize_t n = pn_recv(l, buf, 1024);
pn_settle(d);
+ if (n == PN_EOS) n = 0;
if (n < 0) return n;
if (msg) {
int err = pn_message_decode(msg, buf, n);
Added: qpid/proton/trunk/proton-j/src/main/scripts/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/scripts/proton.py?rev=1392286&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/scripts/proton.py (added)
+++ qpid/proton/trunk/proton-j/src/main/scripts/proton.py Mon Oct 1 11:33:09 2012
@@ -0,0 +1,460 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from org.apache.qpid.proton.engine import EndpointState, Accepted
+from org.apache.qpid.proton.engine.impl import ConnectionImpl, SessionImpl, \
+ SenderImpl, ReceiverImpl, TransportImpl
+from org.apache.qpid.proton.message import Message as MessageImpl, \
+ MessageFormat
+from jarray import zeros
+from java.util import EnumSet
+
+class Skipped(Exception):
+ skipped = True
+
+PN_SESSION_WINDOW = TransportImpl.SESSION_WINDOW
+
+class ProtonException(Exception):
+ pass
+
+class Endpoint(object):
+
+ LOCAL_UNINIT = 1
+ LOCAL_ACTIVE = 2
+ LOCAL_CLOSED = 4
+ REMOTE_UNINIT = 8
+ REMOTE_ACTIVE = 16
+ REMOTE_CLOSED = 32
+
+ @property
+ def state(self):
+ local = self.impl.getLocalState()
+ remote = self.impl.getRemoteState()
+
+ result = 0
+
+ if (local == EndpointState.UNINITIALIZED):
+ result = result | self.LOCAL_UNINIT
+ elif (local == EndpointState.ACTIVE):
+ result = result | self.LOCAL_ACTIVE
+ elif (local == EndpointState.CLOSED):
+ result = result | self.LOCAL_CLOSED
+
+ if (remote == EndpointState.UNINITIALIZED):
+ result = result | self.REMOTE_UNINIT
+ elif (remote == EndpointState.ACTIVE):
+ result = result | self.REMOTE_ACTIVE
+ elif (remote == EndpointState.CLOSED):
+ result = result | self.REMOTE_CLOSED
+
+ return result
+
+ def _enums(self, mask):
+ local = []
+ if (self.LOCAL_UNINIT | mask):
+ local.append(EndpointState.UNINITIALIZED)
+ if (self.LOCAL_ACTIVE | mask):
+ local.append(EndpointState.ACTIVE)
+ if (self.LOCAL_CLOSED | mask):
+ local.append(EndpointState.CLOSED)
+
+ remote = []
+ if (self.REMOTE_UNINIT | mask):
+ remote.append(EndpointState.UNINITIALIZED)
+ if (self.REMOTE_ACTIVE | mask):
+ remote.append(EndpointState.ACTIVE)
+ if (self.REMOTE_CLOSED | mask):
+ remote.append(EndpointState.CLOSED)
+
+ return EnumSet.of(*local), EnumSet.of(*remote)
+
+
+ def open(self):
+ self.impl.open()
+
+ def close(self):
+ self.impl.close()
+
+def wrap_connection(impl):
+ if impl: return Connection(_impl = impl)
+
+class Connection(Endpoint):
+
+ def __init__(self, _impl=None):
+ self.impl = _impl or ConnectionImpl()
+
+ def session(self):
+ return wrap_session(self.impl.session())
+
+ def session_head(self, mask):
+ return wrap_session(self.impl.sessionHead(*self._enums(mask)))
+
+ def link_head(self, mask):
+ return wrap_link(self.impl.linkHead(*self._enums(mask)))
+
+ @property
+ def work_head(self):
+ return wrap_delivery(self.impl.getWorkHead())
+
+def wrap_session(impl):
+ # XXX
+ if impl: return Session(impl)
+
+class Session(Endpoint):
+
+ def __init__(self, impl):
+ self.impl = impl
+
+ @property
+ def connection(self):
+ return wrap_connection(self.impl.getConnection())
+
+ def sender(self, name):
+ return wrap_link(self.impl.sender(name))
+
+ def receiver(self, name):
+ return wrap_link(self.impl.receiver(name))
+
+def wrap_link(impl):
+ if impl is None: return None
+ elif isinstance(impl, SenderImpl):
+ return Sender(impl)
+ elif isinstance(impl, ReceiverImpl):
+ return Receiver(impl)
+ else:
+ raise Exception("unknown type")
+
+class Link(Endpoint):
+
+ def __init__(self, impl):
+ self.impl = impl
+
+ @property
+ def session(self):
+ return wrap_session(self.impl.getSession())
+
+ def delivery(self, tag):
+ return wrap_delivery(self.impl.delivery(tag, 0, len(tag)))
+
+ @property
+ def current(self):
+ return wrap_delivery(self.impl.current())
+
+ def advance(self):
+ return self.impl.advance()
+
+ @property
+ def unsettled(self):
+ return self.impl.getUnsettled()
+
+ @property
+ def credit(self):
+ return self.impl.getCredit()
+
+ @property
+ def queued(self):
+ return self.impl.getQueued()
+
+ def next(self, mask):
+ return self.impl.next(*self._enums(mask))
+
+class Sender(Link):
+
+ def send(self, bytes):
+ return self.impl.send(bytes, 0, len(bytes))
+
+ def drained(self):
+ self.impl.drained()
+
+class Receiver(Link):
+
+ def flow(self, n):
+ self.impl.flow(n)
+
+ def drain(self, n):
+ self.impl.drain(n)
+
+ def recv(self, size):
+ output = zeros(size, "b")
+ n = self.impl.recv(output, 0, size)
+ if n >= 0:
+ return output.tostring()[:n]
+ elif n == TransportImpl.END_OF_STREAM:
+ return None
+ else:
+ raise Exception(n)
+
+def wrap_delivery(impl):
+ if impl: return Delivery(impl)
+
+class Delivery(object):
+
+ RECEIVED = 1
+ ACCEPTED = 2
+ REJECTED = 3
+ RELEASED = 4
+ MODIFIED = 5
+
+ def __init__(self, impl):
+ self.impl = impl
+
+ @property
+ def tag(self):
+ return self.impl.getTag().tostring()
+
+ @property
+ def writable(self):
+ return self.impl.isWritable()
+
+ @property
+ def readable(self):
+ return self.impl.isReadable()
+
+ @property
+ def updated(self):
+ return self.impl.isUpdated()
+
+ def disposition(self, disp):
+ if disp == self.ACCEPTED:
+ self.impl.disposition(Accepted.getInstance())
+ else:
+ raise Exception("xxx: %s" % disp)
+
+ @property
+ def remote_disposition(self):
+ rd = self.impl.getRemoteState()
+ if(rd == Accepted.getInstance()):
+ return self.ACCEPTED
+ else:
+ raise Exception("xxx: %s" % rd)
+
+ @property
+ def local_disposition(self):
+ ld = self.impl.getLocalState()
+ if(ld == Accepted.getInstance()):
+ return self.ACCEPTED
+ else:
+ raise Exception("xxx: %s" % ld)
+
+ def settle(self):
+ self.impl.settle()
+
+ @property
+ def remote_settled(self):
+ return self.impl.remotelySettled()
+
+ @property
+ def work_next(self):
+ return wrap_delivery(self.impl.getWorkNext())
+
+class TransportException(ProtonException):
+ pass
+
+class Transport(object):
+
+ TRACE_OFF = 0
+ TRACE_RAW = 1
+ TRACE_FRM = 2
+ TRACE_DRV = 4
+
+ def __init__(self):
+ self.impl = TransportImpl()
+
+ def trace(self, mask):
+ self.impl.trace(mask)
+
+ def bind(self, connection):
+ self.impl.bind(connection.impl)
+
+ def output(self, size):
+ output = zeros(size, "b")
+ n = self.impl.output(output, 0, size)
+ if n >= 0:
+ return output.tostring()[:n]
+ elif n == TransportImpl.END_OF_STREAM:
+ return None
+ else:
+ raise Exception("XXX: %s" % n)
+
+ def input(self, bytes):
+ return self.impl.input(bytes, 0, len(bytes))
+
+class Data(object):
+
+ def __init__(self, *args, **kwargs):
+ raise Skipped()
+
+class Messenger(object):
+
+ def __init__(self, *args, **kwargs):
+ raise Skipped()
+
+class Message(object):
+
+ AMQP = MessageFormat.AMQP
+ TEXT = MessageFormat.TEXT
+ DATA = MessageFormat.DATA
+ JSON = MessageFormat.JSON
+
+ DEFAULT_PRIORITY = MessageImpl.DEFAULT_PRIORITY
+
+ def __init__(self):
+ self.impl = MessageImpl()
+
+ def clear(self):
+ self.impl.clear()
+
+ def save(self):
+ saved = self.impl.save()
+ if saved is None:
+ saved = ""
+ elif not isinstance(saved, unicode):
+ saved = saved.tostring()
+ return saved
+
+ def load(self, data):
+ self.impl.load(data)
+
+ def encode(self):
+ size = 1024
+ output = zeros(size, "b")
+ while True:
+ n = self.impl.encode(output, 0, size)
+ # XXX: need to check for overflow
+ if n > 0:
+ return output.tostring()[:n]
+ else:
+ raise Exception(n)
+
+ def decode(self, data):
+ self.impl.decode(data,0,len(data))
+
+ def _get_ttl(self):
+ return self.impl.getTtl()
+ def _set_ttl(self, ttl):
+ self.impl.setTtl(ttl)
+ ttl = property(_get_ttl, _set_ttl)
+
+ def _get_priority(self):
+ return self.impl.getPriority()
+ def _set_priority(self, priority):
+ self.impl.setPriority(priority)
+ priority = property(_get_priority, _set_priority)
+
+ def _get_address(self):
+ return self.impl.getAddress()
+ def _set_address(self, address):
+ self.impl.setAddress(address)
+ address = property(_get_address, _set_address)
+
+ def _get_subject(self):
+ return self.impl.getSubject()
+ def _set_subject(self, subject):
+ self.impl.setSubject(subject)
+ subject = property(_get_subject, _set_subject)
+
+ def _get_user_id(self):
+ u = self.impl.getUserId()
+ if u is None: return ""
+ else: return u.tostring()
+ def _set_user_id(self, user_id):
+ self.impl.setUserId(user_id)
+ user_id = property(_get_user_id, _set_user_id)
+
+ def _get_reply_to(self):
+ return self.impl.getReplyTo()
+ def _set_reply_to(self, reply_to):
+ self.impl.setReplyTo(reply_to)
+ reply_to = property(_get_reply_to, _set_reply_to)
+
+ def _get_reply_to_group_id(self):
+ return self.impl.getReplyToGroupId()
+ def _set_reply_to_group_id(self, reply_to_group_id):
+ self.impl.setReplyToGroupId(reply_to_group_id)
+ reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id)
+
+ def _get_group_id(self):
+ return self.impl.getGroupId()
+ def _set_group_id(self, group_id):
+ self.impl.setGroupId(group_id)
+ group_id = property(_get_group_id, _set_group_id)
+
+ def _get_group_sequence(self):
+ return self.impl.getGroupSequence()
+ def _set_group_sequence(self, group_sequence):
+ self.impl.setGroupSequence(group_sequence)
+ group_sequence = property(_get_group_sequence, _set_group_sequence)
+
+ def _is_first_acquirer(self):
+ return self.impl.isFirstAcquirer()
+ def _set_first_acquirer(self, b):
+ self.impl.setFirstAcquirer(b)
+ first_acquirer = property(_is_first_acquirer, _set_first_acquirer)
+
+ def _get_expiry_time(self):
+ return self.impl.getExpiryTime()
+ def _set_expiry_time(self, expiry_time):
+ self.impl.setExpiryTime(expiry_time)
+ expiry_time = property(_get_expiry_time, _set_expiry_time)
+
+ def _is_durable(self):
+ return self.impl.isDurable()
+ def _set_durable(self, durable):
+ self.impl.setDurable(durable)
+ durable = property(_is_durable, _set_durable)
+
+ def _get_delivery_count(self):
+ return self.impl.getDeliveryCount()
+ def _set_delivery_count(self, delivery_count):
+ self.impl.setDeliveryCount(delivery_count)
+ delivery_count = property(_get_delivery_count, _set_delivery_count)
+
+ def _get_creation_time(self):
+ return self.impl.getCreationTime()
+ def _set_creation_time(self, creation_time):
+ self.impl.setCreationTime(creation_time)
+ creation_time = property(_get_creation_time, _set_creation_time)
+
+ def _get_content_type(self):
+ return self.impl.getContentType()
+ def _set_content_type(self, content_type):
+ self.impl.setContentType(content_type)
+ content_type = property(_get_content_type, _set_content_type)
+
+ def _get_content_encoding(self):
+ return self.impl.getContentEncoding()
+ def _set_content_encoding(self, content_encoding):
+ self.impl.setContentEncoding(content_encoding)
+ content_encoding = property(_get_content_encoding, _set_content_encoding)
+
+class SASL(object):
+
+ def __init__(self, *args, **kwargs):
+ raise Skipped()
+
+class SSL(object):
+
+ def __init__(self, *args, **kwargs):
+ raise Skipped()
+
+
+__all__ = ["Messenger", "Message", "ProtonException", "MessengerException",
+ "MessageException", "Timeout", "Data", "Endpoint", "Connection",
+ "Session", "Link", "Sender", "Receiver", "Delivery", "Transport",
+ "TransportException", "SASL", "SSL", "PN_SESSION_WINDOW"]
Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/test/JythonTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/test/JythonTest.java?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/test/JythonTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/test/JythonTest.java Mon Oct 1 11:33:09 2012
@@ -32,7 +32,6 @@ public class JythonTest
{
static final private String PROTON_TESTS = "PROTON_TESTS";
- static final private String PROTON_COMMON = "PROTON_COMMON";
@Test
public void test() throws Exception
@@ -57,30 +56,12 @@ public class JythonTest
}
}
- File commonDir;
- String protonCommonVar = System.getenv(PROTON_COMMON);
- if( protonCommonVar != null && protonCommonVar.trim().length()>0 )
- {
- commonDir = new File(protonCommonVar).getCanonicalFile();
- assertTrue(PROTON_COMMON + " env variable set incorrectly: " + protonCommonVar, commonDir.isDirectory());
- }
- else
- {
- commonDir = new File(basedir, "../proton-c/bindings/python");
- if( !commonDir.isDirectory() )
- {
- return;
- }
- }
- // /proton-c/bindings/python
-
File classesDir = new File(basedir, "target/classes");
PythonInterpreter interp = new PythonInterpreter();
interp.exec(
"import sys\n"+
"sys.path.insert(0,\""+classesDir.getCanonicalPath()+"\")\n"+
- "sys.path.insert(0,\""+commonDir.getCanonicalPath()+"\")\n"+
"sys.path.insert(0,\""+testDir.getCanonicalPath()+"\")\n"
);
interp.execfile(new File(testDir, "proton-test").getCanonicalPath());
Modified: qpid/proton/trunk/tests/proton_tests/codec.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/codec.py?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/codec.py (original)
+++ qpid/proton/trunk/tests/proton_tests/codec.py Mon Oct 1 11:33:09 2012
@@ -17,7 +17,7 @@
# under the License.
#
-import os, common, xproton
+import os, common
from proton import *
class Test(common.Test):
Modified: qpid/proton/trunk/tests/proton_tests/common.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/common.py?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/common.py (original)
+++ qpid/proton/trunk/tests/proton_tests/common.py Mon Oct 1 11:33:09 2012
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,4 +21,3 @@ class Test:
def __init__(self, name):
self.name = name
-
Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1392286&r1=1392285&r2=1392286&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Mon Oct 1 11:33:09 2012
@@ -17,8 +17,8 @@
# under the License.
#
-import os, common, xproton
-from xproton import *
+import os, common
+from proton import *
# future test areas
# + different permutations of setup
@@ -31,20 +31,16 @@ OUTPUT_SIZE = 10*1024
def pump(t1, t2):
while True:
- cd, out1 = pn_output(t1, OUTPUT_SIZE)
- assert cd >= 0 or cd == PN_EOS, (cd, out1, len(out1))
- cd, out2 = pn_output(t2, OUTPUT_SIZE)
- assert cd >= 0 or cd == PN_EOS, (cd, out2, len(out2))
+ out1 = t1.output(OUTPUT_SIZE)
+ out2 = t2.output(OUTPUT_SIZE)
if out1 or out2:
if out1:
- cd = pn_input(t2, out1)
- assert cd == PN_EOS or cd == len(out1), \
- (cd, out1, len(out1), pn_error_text(pn_transport_error(t2)))
+ n = t2.input(out1)
+ assert n is None or n == len(out1), (n, out1, len(out1))
if out2:
- cd = pn_input(t1, out2)
- assert cd == PN_EOS or cd == len(out2), \
- (cd, out2, len(out2), pn_error_text(pn_transport_error(t1)))
+ n = t1.input(out2)
+ assert n is None or n == len(out2), (n, out2, len(out2))
else:
return
@@ -55,40 +51,36 @@ class Test(common.Test):
self._wires = []
def connection(self):
- c1 = pn_connection()
- c2 = pn_connection()
- t1 = pn_transport()
- pn_transport_bind(t1, c1)
- t2 = pn_transport()
- pn_transport_bind(t2, c2)
+ c1 = Connection()
+ c2 = Connection()
+ t1 = Transport()
+ t1.bind(c1)
+ t2 = Transport()
+ t2.bind(c2)
self._wires.append((c1, t1, c2, t2))
trc = os.environ.get("PN_TRACE_FRM")
if trc and trc.lower() in ("1", "2", "yes", "true"):
- pn_trace(t1, PN_TRACE_FRM)
+ t1.trace(Transport.TRACE_FRM)
if trc == "2":
- pn_trace(t2, PN_TRACE_FRM)
+ t2.trace(Transport.TRACE_FRM)
return c1, c2
def link(self, name):
c1, c2 = self.connection()
- pn_connection_open(c1)
- pn_connection_open(c2)
- ssn1 = pn_session(c1)
- pn_session_open(ssn1)
+ c1.open()
+ c2.open()
+ ssn1 = c1.session()
+ ssn1.open()
self.pump()
- ssn2 = pn_session_head(c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
- pn_session_open(ssn2)
+ ssn2 = c2.session_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
+ ssn2.open()
self.pump()
- snd = pn_sender(ssn1, name)
- rcv = pn_receiver(ssn2, name)
+ snd = ssn1.sender(name)
+ rcv = ssn2.receiver(name)
return snd, rcv
def cleanup(self):
- for c1, t1, c2, t2 in self._wires:
- pn_connection_free(c1)
- pn_transport_free(t1)
- pn_connection_free(c2)
- pn_transport_free(t2)
+ pass
def pump(self):
for c1, t1, c2, t2 in self._wires:
@@ -103,139 +95,139 @@ class ConnectionTest(Test):
self.cleanup()
def test_open_close(self):
- assert pn_connection_state(self.c1) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
- assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert self.c1.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
+ assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
- pn_connection_open(self.c1)
+ self.c1.open()
self.pump()
- assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
- assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
+ assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
+ assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE
- pn_connection_open(self.c2)
+ self.c2.open()
self.pump()
- assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
- assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
- pn_connection_close(self.c1)
+ self.c1.close()
self.pump()
- assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
- assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+ assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
- pn_connection_close(self.c2)
+ self.c2.close()
self.pump()
- assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
- assert pn_connection_state(self.c2) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+ assert self.c2.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_simultaneous_open_close(self):
- assert pn_connection_state(self.c1) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
- assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert self.c1.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
+ assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
- pn_connection_open(self.c1)
- pn_connection_open(self.c2)
+ self.c1.open()
+ self.c2.open()
self.pump()
- assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
- assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
- pn_connection_close(self.c1)
- pn_connection_close(self.c2)
+ self.c1.close()
+ self.c2.close()
self.pump()
- assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
- assert pn_connection_state(self.c2) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+ assert self.c2.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
class SessionTest(Test):
def setup(self):
self.c1, self.c2 = self.connection()
- self.ssn = pn_session(self.c1)
- pn_connection_open(self.c1)
- pn_connection_open(self.c2)
+ self.ssn = self.c1.session()
+ self.c1.open()
+ self.c2.open()
def teardown(self):
self.cleanup()
def test_open_close(self):
- assert pn_session_state(self.ssn) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert self.ssn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
- pn_session_open(self.ssn)
+ self.ssn.open()
- assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
self.pump()
- assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
- ssn = pn_session_head(self.c2, PN_REMOTE_ACTIVE | PN_LOCAL_UNINIT)
+ ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT)
assert ssn != None
- assert pn_session_state(ssn) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
- assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert ssn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE
+ assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
- pn_session_open(ssn)
+ ssn.open()
- assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
- assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
self.pump()
- assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
- assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
- pn_session_close(ssn)
+ ssn.close()
- assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
- assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.pump()
- assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
- assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+ assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
- pn_session_close(self.ssn)
+ self.ssn.close()
- assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
- assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
self.pump()
- assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
- assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+ assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_simultaneous_close(self):
- pn_session_open(self.ssn)
+ self.ssn.open()
self.pump()
- ssn = pn_session_head(self.c2, PN_REMOTE_ACTIVE | PN_LOCAL_UNINIT)
+ ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT)
assert ssn != None
- pn_session_open(ssn)
+ ssn.open()
self.pump()
- assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
- assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
- pn_session_close(self.ssn)
- pn_session_close(ssn)
+ self.ssn.close()
+ ssn.close()
- assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
- assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
self.pump()
- assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
- assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+ assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_closing_connection(self):
- pn_session_open(self.ssn)
+ self.ssn.open()
self.pump()
- pn_connection_close(self.c1)
+ self.c1.close()
self.pump()
- pn_session_close(self.ssn)
+ self.ssn.close()
self.pump()
@@ -248,359 +240,349 @@ class LinkTest(Test):
self.cleanup()
def test_open_close(self):
- assert pn_link_state(self.snd) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
- assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
+ assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
- pn_link_open(self.snd)
+ self.snd.open()
- assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
- assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
+ assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
self.pump()
- assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
- assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
+ assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
+ assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE
- pn_link_open(self.rcv)
+ self.rcv.open()
- assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
- assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.pump()
- assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
- assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
- pn_link_close(self.snd)
+ self.snd.close()
- assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
- assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
self.pump()
- assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
- assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+ assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
- pn_link_close(self.rcv)
+ self.rcv.close()
- assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
- assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
self.pump()
- assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
- assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+ assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_simultaneous_open_close(self):
- assert pn_link_state(self.snd) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
- assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
+ assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
- pn_link_open(self.snd)
- pn_link_open(self.rcv)
+ self.snd.open()
+ self.rcv.open()
- assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
- assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
self.pump()
- assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
- assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
- pn_link_close(self.snd)
- pn_link_close(self.rcv)
+ self.snd.close()
+ self.rcv.close()
- assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
- assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
+ assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
self.pump()
- assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
- assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+ assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
def test_multiple(self):
- rcv = pn_receiver(pn_get_session(self.snd), "second-rcv")
- pn_link_open(self.snd)
- pn_link_open(rcv)
+ rcv = self.snd.session.receiver("second-rcv")
+ self.snd.open()
+ rcv.open()
self.pump()
- c2 = pn_get_connection(pn_get_session(self.rcv))
- l = pn_link_head(c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+ c2 = self.rcv.session.connection
+ l = c2.link_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
while l:
- pn_link_open(l)
- l = pn_link_next(l, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+ l.open()
+ l = l.next(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
self.pump()
assert self.snd
assert rcv
- pn_link_close(self.snd)
- pn_link_close(rcv)
- ssn = pn_get_session(rcv)
- conn = pn_get_connection(ssn)
- pn_session_close(ssn)
- pn_connection_close(conn)
+ self.snd.close()
+ rcv.close()
+ ssn = rcv.session
+ conn = ssn.connection
+ ssn.close()
+ conn.close()
self.pump()
def test_closing_session(self):
- pn_link_open(self.snd)
- pn_link_open(self.rcv)
- ssn1 = pn_get_session(self.snd)
+ self.snd.open()
+ self.rcv.open()
+ ssn1 = self.snd.session
self.pump()
- pn_session_close(ssn1)
+ ssn1.close()
self.pump()
- pn_link_close(self.snd)
+ self.snd.close()
self.pump()
def test_closing_connection(self):
- pn_link_open(self.snd)
- pn_link_open(self.rcv)
- ssn1 = pn_get_session(self.snd)
- c1 = pn_get_connection(ssn1)
+ self.snd.open()
+ self.rcv.open()
+ ssn1 = self.snd.session
+ c1 = ssn1.connection
self.pump()
- pn_connection_close(c1)
+ c1.close()
self.pump()
- pn_link_close(self.snd)
+ self.snd.close()
self.pump()
class TransferTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
- self.c1 = pn_get_connection(pn_get_session(self.snd))
- self.c2 = pn_get_connection(pn_get_session(self.rcv))
- pn_link_open(self.snd)
- pn_link_open(self.rcv)
+ self.c1 = self.snd.session.connection
+ self.c2 = self.rcv.session.connection
+ self.snd.open()
+ self.rcv.open()
self.pump()
def teardown(self):
self.cleanup()
def test_work_queue(self):
- assert pn_work_head(self.c1) is None
- pn_delivery(self.snd, "tag")
- assert pn_work_head(self.c1) is None
- pn_flow(self.rcv, 1)
+ assert self.c1.work_head is None
+ self.snd.delivery("tag")
+ assert self.c1.work_head is None
+ self.rcv.flow(1)
self.pump()
- d = pn_work_head(self.c1)
+ d = self.c1.work_head
assert d is not None
- tag = pn_delivery_tag(d)
+ tag = d.tag
assert tag == "tag", tag
- assert pn_writable(d)
+ assert d.writable
- n = pn_send(self.snd, "this is a test")
- assert pn_advance(self.snd)
- assert pn_work_head(self.c1) is None
+ n = self.snd.send("this is a test")
+ assert self.snd.advance()
+ assert self.c1.work_head is None
self.pump()
- d = pn_work_head(self.c2)
- assert pn_delivery_tag(d) == "tag"
- assert pn_readable(d)
+ d = self.c2.work_head
+ assert d.tag == "tag"
+ assert d.readable
def test_multiframe(self):
- pn_flow(self.rcv, 1)
- pn_delivery(self.snd, "tag")
+ self.rcv.flow(1)
+ self.snd.delivery("tag")
msg = "this is a test"
- n = pn_send(self.snd, msg)
+ n = self.snd.send(msg)
assert n == len(msg)
self.pump()
- d = pn_current(self.rcv)
+ d = self.rcv.current
assert d
- assert pn_delivery_tag(d) == "tag", repr(pn_delivery_tag(d))
- assert pn_readable(d)
+ assert d.tag == "tag", repr(d.tag)
+ assert d.readable
- cd, bytes = pn_recv(self.rcv, 1024)
+ bytes = self.rcv.recv(1024)
assert bytes == msg
- assert cd == len(bytes)
- cd, bytes = pn_recv(self.rcv, 1024)
- assert cd == 0
+ bytes = self.rcv.recv(1024)
assert bytes == ""
msg = "this is more"
- n = pn_send(self.snd, msg)
+ n = self.snd.send(msg)
assert n == len(msg)
- assert pn_advance(self.snd)
+ assert self.snd.advance()
self.pump()
- cd, bytes = pn_recv(self.rcv, 1024)
- assert cd == len(bytes)
+ bytes = self.rcv.recv(1024)
assert bytes == msg
- cd, bytes = pn_recv(self.rcv, 1024)
- assert cd == PN_EOS
- assert bytes == ""
+ bytes = self.rcv.recv(1024)
+ assert bytes is None
def test_disposition(self):
- pn_flow(self.rcv, 1)
+ self.rcv.flow(1)
self.pump()
- sd = pn_delivery(self.snd, "tag")
+ sd = self.snd.delivery("tag")
msg = "this is a test"
- n = pn_send(self.snd, msg)
+ n = self.snd.send(msg)
assert n == len(msg)
- assert pn_advance(self.snd)
+ assert self.snd.advance()
self.pump()
- rd = pn_current(self.rcv)
+ rd = self.rcv.current
assert rd is not None
- assert pn_delivery_tag(rd) == pn_delivery_tag(sd)
- cd, rmsg = pn_recv(self.rcv, 1024)
- assert cd == len(rmsg)
+ assert rd.tag == sd.tag
+ rmsg = self.rcv.recv(1024)
assert rmsg == msg
- pn_disposition(rd, PN_ACCEPTED)
+ rd.disposition(Delivery.ACCEPTED)
self.pump()
- rdisp = pn_remote_disposition(sd)
- ldisp = pn_local_disposition(rd)
- assert rdisp == ldisp == PN_ACCEPTED, (rdisp, ldisp)
- assert pn_updated(sd)
+ rdisp = sd.remote_disposition
+ ldisp = rd.local_disposition
+ assert rdisp == ldisp == Delivery.ACCEPTED, (rdisp, ldisp)
+ assert sd.updated
- pn_disposition(sd, PN_ACCEPTED)
- pn_settle(sd)
+ sd.disposition(Delivery.ACCEPTED)
+ sd.settle()
self.pump()
- assert pn_local_disposition(sd) == pn_remote_disposition(rd) == PN_ACCEPTED
+ assert sd.local_disposition == rd.remote_disposition == Delivery.ACCEPTED
class CreditTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
- self.c1 = pn_get_connection(pn_get_session(self.snd))
- self.c2 = pn_get_connection(pn_get_session(self.rcv))
- pn_link_open(self.snd)
- pn_link_open(self.rcv)
+ self.c1 = self.snd.session.connection
+ self.c2 = self.rcv.session.connection
+ self.snd.open()
+ self.rcv.open()
self.pump()
def teardown(self):
self.cleanup()
def testCreditSender(self):
- credit = pn_credit(self.snd)
+ credit = self.snd.credit
assert credit == 0, credit
- pn_flow(self.rcv, 10)
+ self.rcv.flow(10)
self.pump()
- credit = pn_credit(self.snd)
+ credit = self.snd.credit
assert credit == 10, credit
- pn_flow(self.rcv, PN_SESSION_WINDOW)
+ self.rcv.flow(PN_SESSION_WINDOW)
self.pump()
- credit = pn_credit(self.snd)
+ credit = self.snd.credit
assert credit == 10 + PN_SESSION_WINDOW, credit
def testCreditReceiver(self):
- pn_flow(self.rcv, 10)
+ self.rcv.flow(10)
self.pump()
- assert pn_credit(self.rcv) == 10, pn_credit(self.rcv)
+ assert self.rcv.credit == 10, self.rcv.credit
- d = pn_delivery(self.snd, "tag")
+ d = self.snd.delivery("tag")
assert d
- assert pn_advance(self.snd)
+ assert self.snd.advance()
self.pump()
- assert pn_credit(self.rcv) == 10, pn_credit(self.rcv)
- assert pn_queued(self.rcv) == 1, pn_queued(self.rcv)
- c = pn_current(self.rcv)
- assert pn_delivery_tag(c) == "tag", pn_delivery_tag(c)
- assert pn_advance(self.rcv)
- assert pn_credit(self.rcv) == 9, pn_credit(self.rcv)
- assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
+ assert self.rcv.credit == 10, self.rcv.credit
+ assert self.rcv.queued == 1, self.rcv.queued
+ c = self.rcv.current
+ assert c.tag == "tag", c.tag
+ assert self.rcv.advance()
+ assert self.rcv.credit == 9, self.rcv.credit
+ assert self.rcv.queued == 0, self.rcv.queued
def settle(self):
result = []
- d = pn_work_head(self.c1)
+ d = self.c1.work_head
while d:
- if pn_updated(d):
- result.append(pn_delivery_tag(d))
- pn_settle(d)
- d = pn_work_next(d)
+ if d.updated:
+ result.append(d.tag)
+ d.settle()
+ d = d.work_next
return result
def testBuffering(self):
- pn_flow(self.rcv, PN_SESSION_WINDOW + 10)
+ self.rcv.flow(PN_SESSION_WINDOW + 10)
self.pump()
- assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
+ assert self.rcv.queued == 0, self.rcv.queued
idx = 0
- while pn_credit(self.snd):
- d = pn_delivery(self.snd, "tag%s" % idx)
+ while self.snd.credit:
+ d = self.snd.delivery("tag%s" % idx)
assert d
- assert pn_advance(self.snd)
+ assert self.snd.advance()
self.pump()
idx += 1
assert idx == PN_SESSION_WINDOW + 10, idx
- assert pn_queued(self.rcv) == PN_SESSION_WINDOW, pn_queued(self.rcv)
+ assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
- extra = pn_delivery(self.snd, "extra")
+ extra = self.snd.delivery("extra")
assert extra
- assert pn_advance(self.snd)
+ assert self.snd.advance()
self.pump()
- assert pn_queued(self.rcv) == PN_SESSION_WINDOW, pn_queued(self.rcv)
+ assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
for i in range(10):
- d = pn_current(self.rcv)
- assert pn_delivery_tag(d) == "tag%s" % i, pn_delivery_tag(d)
- assert pn_advance(self.rcv)
- pn_settle(d)
+ d = self.rcv.current
+ assert d.tag == "tag%s" % i, d.tag
+ assert self.rcv.advance()
+ d.settle()
self.pump()
- assert pn_queued(self.rcv) == PN_SESSION_WINDOW - (i+1), pn_queued(self.rcv)
+ assert self.rcv.queued == PN_SESSION_WINDOW - (i+1), self.rcv.queued
tags = self.settle()
assert tags == ["tag%s" % i for i in range(10)], tags
self.pump()
- assert pn_queued(self.rcv) == PN_SESSION_WINDOW, pn_queued(self.rcv)
+ assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
for i in range(PN_SESSION_WINDOW):
- d = pn_current(self.rcv)
+ d = self.rcv.current
assert d, i
- assert pn_delivery_tag(d) == "tag%s" % (i+10), pn_delivery_tag(d)
- assert pn_advance(self.rcv)
- pn_settle(d)
+ assert d.tag == "tag%s" % (i+10), d.tag
+ assert self.rcv.advance()
+ d.settle()
self.pump()
- assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
+ assert self.rcv.queued == 0, self.rcv.queued
tags = self.settle()
assert tags == ["tag%s" % (i+10) for i in range(PN_SESSION_WINDOW)]
- assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
+ assert self.rcv.queued == 0, self.rcv.queued
def _testBufferingOnClose(self, a, b):
for i in range(10):
- d = pn_delivery(self.snd, "tag-%s" % i)
+ d = self.snd.delivery("tag-%s" % i)
assert d
- pn_settle(d)
+ d.settle()
self.pump()
- assert pn_queued(self.snd) == 10
+ assert self.snd.queued == 10
endpoints = {"connection": (self.c1, self.c2),
- "session": (pn_get_session(self.snd), pn_get_session(self.rcv)),
+ "session": (self.snd.session, self.rcv.session),
"link": (self.snd, self.rcv)}
local_a, remote_a = endpoints[a]
local_b, remote_b = endpoints[b]
- a_close = getattr(xproton, "pn_%s_close" % a)
- a_state = getattr(xproton, "pn_%s_state" % a)
- b_close = getattr(xproton, "pn_%s_close" % b)
- b_state = getattr(xproton, "pn_%s_state" % b)
-
- b_close(remote_b)
+ remote_b.close()
self.pump()
- assert b_state(local_b) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
- a_close(local_a)
+ assert local_b.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
+ local_a.close()
self.pump()
- assert a_state(remote_a) & PN_REMOTE_CLOSED
- assert pn_queued(self.snd) == 10
+ assert remote_a.state & Endpoint.REMOTE_CLOSED
+ assert self.snd.queued == 10
def testBufferingOnCloseLinkLink(self):
self._testBufferingOnClose("link", "link")
@@ -630,217 +612,217 @@ class CreditTest(Test):
self._testBufferingOnClose("connection", "connection")
def testCreditWithBuffering(self):
- pn_flow(self.rcv, PN_SESSION_WINDOW + 10)
+ self.rcv.flow(PN_SESSION_WINDOW + 10)
self.pump()
- assert pn_credit(self.snd) == PN_SESSION_WINDOW + 10, pn_credit(self.snd)
- assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
+ assert self.snd.credit == PN_SESSION_WINDOW + 10, self.snd.credit
+ assert self.rcv.queued == 0, self.rcv.queued
idx = 0
- while pn_credit(self.snd):
- d = pn_delivery(self.snd, "tag%s" % idx)
+ while self.snd.credit:
+ d = self.snd.delivery("tag%s" % idx)
assert d
- assert pn_advance(self.snd)
+ assert self.snd.advance()
self.pump()
idx += 1
assert idx == PN_SESSION_WINDOW + 10, idx
- assert pn_queued(self.rcv) == PN_SESSION_WINDOW, pn_queued(self.rcv)
+ assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
- pn_flow(self.rcv, 1)
+ self.rcv.flow(1)
self.pump()
- assert pn_credit(self.snd) == 1, pn_credit(self.snd)
+ assert self.snd.credit == 1, self.snd.credit
def testFullDrain(self):
- assert pn_credit(self.rcv) == 0
- assert pn_credit(self.snd) == 0
- pn_drain(self.rcv, 10)
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 0
- self.pump()
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 10
- pn_drained(self.snd)
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 0
+ assert self.rcv.credit == 0
+ assert self.snd.credit == 0
+ self.rcv.drain(10)
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 0
+ self.pump()
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 10
+ self.snd.drained()
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 0
self.pump()
- assert pn_credit(self.rcv) == 0
- assert pn_credit(self.snd) == 0
+ assert self.rcv.credit == 0
+ assert self.snd.credit == 0
def testPartialDrain(self):
- pn_drain(self.rcv, 2)
+ self.rcv.drain(2)
self.pump()
- d = pn_delivery(self.snd, "tag")
+ d = self.snd.delivery("tag")
assert d
- assert pn_advance(self.snd)
- pn_drained(self.snd)
+ assert self.snd.advance()
+ self.snd.drained()
self.pump()
- c = pn_current(self.rcv)
- assert pn_queued(self.rcv) == 1, pn_queued(self.rcv)
- assert pn_delivery_tag(c) == pn_delivery_tag(d), pn_delivery_tag(c)
- assert pn_advance(self.rcv)
- assert not pn_current(self.rcv)
- assert pn_credit(self.rcv) == 0, pn_credit(self.rcv)
+ c = self.rcv.current
+ assert self.rcv.queued == 1, self.rcv.queued
+ assert c.tag == d.tag, c.tag
+ assert self.rcv.advance()
+ assert not self.rcv.current
+ assert self.rcv.credit == 0, self.rcv.credit
def testDrainFlow(self):
- assert pn_credit(self.rcv) == 0
- assert pn_credit(self.snd) == 0
- pn_drain(self.rcv, 10)
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 0
- self.pump()
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 10
- pn_drained(self.snd)
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 0
- self.pump()
- assert pn_credit(self.rcv) == 0
- assert pn_credit(self.snd) == 0
- pn_flow(self.rcv, 10)
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 0
- self.pump()
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 10
- pn_drained(self.snd)
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 10
+ assert self.rcv.credit == 0
+ assert self.snd.credit == 0
+ self.rcv.drain(10)
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 0
+ self.pump()
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 10
+ self.snd.drained()
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 0
+ self.pump()
+ assert self.rcv.credit == 0
+ assert self.snd.credit == 0
+ self.rcv.flow(10)
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 0
+ self.pump()
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 10
+ self.snd.drained()
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 10
self.pump()
- assert pn_credit(self.rcv) == 10
- assert pn_credit(self.snd) == 10
+ assert self.rcv.credit == 10
+ assert self.snd.credit == 10
def testNegative(self):
- assert pn_credit(self.snd) == 0
- d = pn_delivery(self.snd, "tag")
+ assert self.snd.credit == 0
+ d = self.snd.delivery("tag")
assert d
- assert pn_advance(self.snd)
+ assert self.snd.advance()
self.pump()
- assert pn_credit(self.rcv) == 0
- assert pn_queued(self.rcv) == 0
+ assert self.rcv.credit == 0
+ assert self.rcv.queued == 0
- pn_flow(self.rcv, 1)
- assert pn_credit(self.rcv) == 1
- assert pn_queued(self.rcv) == 0
+ self.rcv.flow(1)
+ assert self.rcv.credit == 1
+ assert self.rcv.queued == 0
self.pump()
- assert pn_credit(self.rcv) == 1
- assert pn_queued(self.rcv) == 1
+ assert self.rcv.credit == 1
+ assert self.rcv.queued == 1
- c = pn_current(self.rcv)
+ c = self.rcv.current
assert c
- assert pn_delivery_tag(c) == "tag"
- assert pn_advance(self.rcv)
- assert pn_credit(self.rcv) == 0
- assert pn_queued(self.rcv) == 0
+ assert c.tag == "tag"
+ assert self.rcv.advance()
+ assert self.rcv.credit == 0
+ assert self.rcv.queued == 0
def testDrainZero(self):
- assert pn_credit(self.snd) == 0
- assert pn_credit(self.rcv) == 0
- assert pn_queued(self.rcv) == 0
+ assert self.snd.credit == 0
+ assert self.rcv.credit == 0
+ assert self.rcv.queued == 0
- pn_flow(self.rcv, 10)
+ self.rcv.flow(10)
self.pump()
- assert pn_credit(self.snd) == 10
- assert pn_credit(self.rcv) == 10
- assert pn_queued(self.rcv) == 0
+ assert self.snd.credit == 10
+ assert self.rcv.credit == 10
+ assert self.rcv.queued == 0
- pn_drained(self.snd)
+ self.snd.drained()
self.pump()
- assert pn_credit(self.snd) == 10
- assert pn_credit(self.rcv) == 10
- assert pn_queued(self.rcv) == 0
+ assert self.snd.credit == 10
+ assert self.rcv.credit == 10
+ assert self.rcv.queued == 0
- pn_drain(self.rcv, 0)
- assert pn_credit(self.snd) == 10
- assert pn_credit(self.rcv) == 10
- assert pn_queued(self.rcv) == 0
+ self.rcv.drain(0)
+ assert self.snd.credit == 10
+ assert self.rcv.credit == 10
+ assert self.rcv.queued == 0
self.pump()
- assert pn_credit(self.snd) == 10
- assert pn_credit(self.rcv) == 10
- assert pn_queued(self.rcv) == 0
+ assert self.snd.credit == 10
+ assert self.rcv.credit == 10
+ assert self.rcv.queued == 0
- pn_drained(self.snd)
- assert pn_credit(self.snd) == 0
- assert pn_credit(self.rcv) == 10
- assert pn_queued(self.rcv) == 0
+ self.snd.drained()
+ assert self.snd.credit == 0
+ assert self.rcv.credit == 10
+ assert self.rcv.queued == 0
self.pump()
- assert pn_credit(self.snd) == 0
- assert pn_credit(self.rcv) == 0
- assert pn_queued(self.rcv) == 0
+ assert self.snd.credit == 0
+ assert self.rcv.credit == 0
+ assert self.rcv.queued == 0
class SettlementTest(Test):
def setup(self):
self.snd, self.rcv = self.link("test-link")
- self.c1 = pn_get_connection(pn_get_session(self.snd))
- self.c2 = pn_get_connection(pn_get_session(self.rcv))
- pn_link_open(self.snd)
- pn_link_open(self.rcv)
+ self.c1 = self.snd.session.connection
+ self.c2 = self.rcv.session.connection
+ self.snd.open()
+ self.rcv.open()
self.pump()
def teardown(self):
self.cleanup()
def testSettleCurrent(self):
- pn_flow(self.rcv, 10)
+ self.rcv.flow(10)
self.pump()
- assert pn_credit(self.snd) == 10, pn_credit(self.snd)
- d = pn_delivery(self.snd, "tag")
- e = pn_delivery(self.snd, "tag2")
+ assert self.snd.credit == 10, self.snd.credit
+ d = self.snd.delivery("tag")
+ e = self.snd.delivery("tag2")
assert d
assert e
- c = pn_current(self.snd)
- assert pn_delivery_tag(c) == "tag", pn_delivery_tag(c)
- pn_settle(c)
- c = pn_current(self.snd)
- assert pn_delivery_tag(c) == "tag2", pn_delivery_tag(c)
- pn_settle(c)
- c = pn_current(self.snd)
+ c = self.snd.current
+ assert c.tag == "tag", c.tag
+ c.settle()
+ c = self.snd.current
+ assert c.tag == "tag2", c.tag
+ c.settle()
+ c = self.snd.current
assert not c
self.pump()
- c = pn_current(self.rcv)
+ c = self.rcv.current
assert c
- assert pn_delivery_tag(c) == "tag", pn_delivery_tag(c)
- assert pn_remote_settled(c)
- pn_settle(c)
- c = pn_current(self.rcv)
+ assert c.tag == "tag", c.tag
+ assert c.remote_settled
+ c.settle()
+ c = self.rcv.current
assert c
- assert pn_delivery_tag(c) == "tag2", pn_delivery_tag(c)
- assert pn_remote_settled(c)
- pn_settle(c)
- c = pn_current(self.rcv)
+ assert c.tag == "tag2", c.tag
+ assert c.remote_settled
+ c.settle()
+ c = self.rcv.current
assert not c
def testUnsettled(self):
- pn_flow(self.rcv, 10)
+ self.rcv.flow(10)
self.pump()
- assert pn_unsettled(self.snd) == 0, pn_unsettled(self.snd)
- assert pn_unsettled(self.rcv) == 0, pn_unsettled(self.rcv)
+ assert self.snd.unsettled == 0, self.snd.unsettled
+ assert self.rcv.unsettled == 0, self.rcv.unsettled
- d = pn_delivery(self.snd, "tag")
+ d = self.snd.delivery("tag")
assert d
- assert pn_unsettled(self.snd) == 1, pn_unsettled(self.snd)
- assert pn_unsettled(self.rcv) == 0, pn_unsettled(self.rcv)
- assert pn_advance(self.snd)
+ assert self.snd.unsettled == 1, self.snd.unsettled
+ assert self.rcv.unsettled == 0, self.rcv.unsettled
+ assert self.snd.advance()
self.pump()
- assert pn_unsettled(self.snd) == 1, pn_unsettled(self.snd)
- assert pn_unsettled(self.rcv) == 1, pn_unsettled(self.rcv)
+ assert self.snd.unsettled == 1, self.snd.unsettled
+ assert self.rcv.unsettled == 1, self.rcv.unsettled
- c = pn_current(self.rcv)
+ c = self.rcv.current
assert c
- pn_settle(c)
+ c.settle()
- assert pn_unsettled(self.snd) == 1, pn_unsettled(self.snd)
- assert pn_unsettled(self.rcv) == 0, pn_unsettled(self.rcv)
+ assert self.snd.unsettled == 1, self.snd.unsettled
+ assert self.rcv.unsettled == 0, self.rcv.unsettled
class PipelineTest(Test):
@@ -851,54 +833,54 @@ class PipelineTest(Test):
self.cleanup()
def test(self):
- ssn = pn_session(self.c1)
- snd = pn_sender(ssn, "sender")
- pn_connection_open(self.c1)
- pn_session_open(ssn)
- pn_link_open(snd)
+ ssn = self.c1.session()
+ snd = ssn.sender("sender")
+ self.c1.open()
+ ssn.open()
+ snd.open()
for i in range(10):
- d = pn_delivery(snd, "delivery-%s" % i)
- pn_send(snd, "delivery-%s" % i)
- pn_settle(d)
+ d = snd.delivery("delivery-%s" % i)
+ snd.send("delivery-%s" % i)
+ d.settle()
- pn_link_close(snd)
- pn_session_close(ssn)
- pn_connection_close(self.c1)
+ snd.close()
+ ssn.close()
+ self.c1.close()
self.pump()
- state = pn_connection_state(self.c2)
- assert state == (PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE), "%x" % state
- ssn2 = pn_session_head(self.c2, PN_LOCAL_UNINIT)
+ state = self.c2.state
+ assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state
+ ssn2 = self.c2.session_head(Endpoint.LOCAL_UNINIT)
assert ssn2
- state == pn_session_state(ssn2)
- assert state == (PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE), "%x" % state
- rcv = pn_link_head(self.c2, PN_LOCAL_UNINIT)
+ state == ssn2.state
+ assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state
+ rcv = self.c2.link_head(Endpoint.LOCAL_UNINIT)
assert rcv
- state = pn_link_state(rcv)
- assert state == (PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE), "%x" % state
+ state = rcv.state
+ assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state
- pn_connection_open(self.c2)
- pn_session_open(ssn2)
- pn_link_open(rcv)
- pn_flow(rcv, 10)
- assert pn_queued(rcv) == 0, pn_queued(rcv)
+ self.c2.open()
+ ssn2.open()
+ rcv.open()
+ rcv.flow(10)
+ assert rcv.queued == 0, rcv.queued
self.pump()
- assert pn_queued(rcv) == 10, pn_queued(rcv)
- state = pn_link_state(rcv)
- assert state == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED), "%x" % state
- state = pn_session_state(ssn2)
- assert state == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED), "%x" % state
- state = pn_connection_state(self.c2)
- assert state == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED), "%x" % state
+ assert rcv.queued == 10, rcv.queued
+ state = rcv.state
+ assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state
+ state = ssn2.state
+ assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state
+ state = self.c2.state
+ assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state
- for i in range(pn_queued(rcv)):
- d = pn_current(rcv)
+ for i in range(rcv.queued):
+ d = rcv.current
assert d
- assert pn_delivery_tag(d) == "delivery-%s" % i
- pn_settle(d)
+ assert d.tag == "delivery-%s" % i
+ d.settle()
- assert pn_queued(rcv) == 0, pn_queued(rcv)
+ assert rcv.queued == 0, rcv.queued
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org