You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/06/17 19:54:54 UTC
svn commit: r1493859 - in /qpid/proton/trunk:
proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/
proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/
proton-c/src/engine/ proton-c/src/messenger/ proton-j/proton-api/src/...
Author: rhs
Date: Mon Jun 17 17:54:54 2013
New Revision: 1493859
URL: http://svn.apache.org/r1493859
Log:
extended the delivery API to work with all delivery states defined by the transport spec as well as custom delivery states. This addresses PROTON-97 and the C portion of PROTON-75
Modified:
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-c/src/messenger/store.c
qpid/proton/trunk/proton-c/src/proton.c
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/tests/python/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java Mon Jun 17 17:54:54 2013
@@ -25,7 +25,7 @@ import org.apache.qpid.proton.engine.Del
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.jni.Proton;
import org.apache.qpid.proton.jni.SWIGTYPE_p_pn_delivery_t;
-import org.apache.qpid.proton.jni.pn_disposition_t;
+import org.apache.qpid.proton.jni.SWIGTYPE_p_pn_disposition_t;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Received;
@@ -33,6 +33,9 @@ import org.apache.qpid.proton.amqp.messa
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import static org.apache.qpid.proton.jni.ProtonConstants.*;
+import java.math.BigInteger;
+
public class JNIDelivery implements Delivery
{
private SWIGTYPE_p_pn_delivery_t _impl;
@@ -121,53 +124,53 @@ public class JNIDelivery implements Deli
//TODO
}
- private static pn_disposition_t convertState(DeliveryState state)
+ private static BigInteger convertState(DeliveryState state)
{
//TODO - disposition properties conversion
if(state instanceof Accepted)
{
- return pn_disposition_t.PN_ACCEPTED;
+ return BigInteger.valueOf(PN_ACCEPTED);
}
else if(state instanceof Rejected)
{
- return pn_disposition_t.PN_REJECTED;
+ return BigInteger.valueOf(PN_REJECTED);
}
else if(state instanceof Modified)
{
- return pn_disposition_t.PN_MODIFIED;
+ return BigInteger.valueOf(PN_MODIFIED);
}
else if(state instanceof Received)
{
- return pn_disposition_t.PN_RECEIVED;
+ return BigInteger.valueOf(PN_RECEIVED);
}
else if(state instanceof Released)
{
- return pn_disposition_t.PN_RELEASED;
+ return BigInteger.valueOf(PN_RELEASED);
}
- return null;
+ return BigInteger.ZERO;
}
- private static DeliveryState convertDisposition(pn_disposition_t disposition)
+ private static DeliveryState convertDisposition(BigInteger disposition)
{
//TODO - disposition properties conversion
- if(pn_disposition_t.PN_ACCEPTED.equals(disposition))
+ if (BigInteger.valueOf(PN_ACCEPTED).equals(disposition))
{
return Accepted.getInstance();
}
- else if(pn_disposition_t.PN_REJECTED.equals(disposition))
+ else if(BigInteger.valueOf(PN_REJECTED).equals(disposition))
{
return new Rejected();
}
- else if(pn_disposition_t.PN_MODIFIED.equals(disposition))
+ else if(BigInteger.valueOf(PN_MODIFIED).equals(disposition))
{
return new Modified();
}
- else if(pn_disposition_t.PN_MODIFIED.equals(disposition))
+ else if(BigInteger.valueOf(PN_RECEIVED).equals(disposition))
{
- return new Modified();
+ return new Received();
}
- else if(pn_disposition_t.PN_RELEASED.equals(disposition))
+ else if(BigInteger.valueOf(PN_RELEASED).equals(disposition))
{
return new Released();
}
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Mon Jun 17 17:54:54 2013
@@ -1807,27 +1807,11 @@ class Endpoint(object):
self.condition = None
def _update_cond(self):
- impl = self._get_cond_impl()
- pn_condition_clear(impl)
- if self.condition:
- pn_condition_set_name(impl, self.condition.name)
- pn_condition_set_description(impl, self.condition.description)
- info = Data(pn_condition_info(impl))
- if self.condition.info:
- info.put_object(self.condition.info)
+ obj2cond(self.condition, self._get_cond_impl())
@property
def remote_condition(self):
- impl = self._get_remote_cond_impl()
- if pn_condition_is_set(impl):
- info_impl = Data(pn_condition_info(impl))
- info_impl.rewind()
- info_impl.next()
- info = info_impl.get_object()
- info_impl.rewind()
- return Condition(pn_condition_get_name(impl),
- pn_condition_get_description(impl),
- info)
+ return cond2obj(self._get_remote_cond_impl())
class Condition:
@@ -1847,6 +1831,36 @@ class Condition:
self.description == o.description and \
self.info == o.info
+def obj2cond(obj, cond):
+ pn_condition_clear(cond)
+ if obj:
+ pn_condition_set_name(cond, str(obj.name))
+ pn_condition_set_description(cond, obj.description)
+ info = Data(pn_condition_info(cond))
+ if obj.info:
+ info.put_object(obj.info)
+
+def cond2obj(cond):
+ if pn_condition_is_set(cond):
+ return Condition(pn_condition_get_name(cond),
+ pn_condition_get_description(cond),
+ dat2obj(pn_condition_info(cond)))
+ else:
+ return None
+
+def dat2obj(dimpl):
+ d = Data(dimpl)
+ d.rewind()
+ d.next()
+ obj = d.get_object()
+ d.rewind()
+ return obj
+
+def obj2dat(obj, dimpl):
+ if obj is not None:
+ d = Data(dimpl)
+ d.put_object(obj)
+
def wrap_connection(conn):
if not conn: return None
ctx = pn_connection_get_context(conn)
@@ -1907,37 +1921,24 @@ class Connection(Endpoint):
def remote_hostname(self):
return pn_connection_remote_hostname(self._conn)
- def _dat2obj(self, dimpl):
- d = Data(dimpl)
- d.rewind()
- d.next()
- obj = d.get_object()
- d.rewind()
- return obj
-
@property
def remote_offered_capabilities(self):
- return self._dat2obj(pn_connection_remote_offered_capabilities(self._conn))
+ return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
@property
def remote_desired_capabilities(self):
- return self._dat2obj(pn_connection_remote_desired_capabilities(self._conn))
+ return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
@property
def remote_properties(self):
- return self._dat2obj(pn_connection_remote_properties(self._conn))
-
- def _obj2dat(self, obj, dimpl):
- if obj is not None:
- d = Data(dimpl)
- d.put_object(obj)
+ return dat2obj(pn_connection_remote_properties(self._conn))
def open(self):
- self._obj2dat(self.offered_capabilities,
- pn_connection_offered_capabilities(self._conn))
- self._obj2dat(self.desired_capabilities,
- pn_connection_desired_capabilities(self._conn))
- self._obj2dat(self.properties, pn_connection_properties(self._conn))
+ obj2dat(self.offered_capabilities,
+ pn_connection_offered_capabilities(self._conn))
+ obj2dat(self.desired_capabilities,
+ pn_connection_desired_capabilities(self._conn))
+ obj2dat(self.properties, pn_connection_properties(self._conn))
pn_connection_open(self._conn)
def close(self):
@@ -2259,12 +2260,97 @@ def wrap_delivery(dlv):
pn_delivery_set_context(dlv, wrapper)
return wrapper
-class Delivery(object):
+class Disposition(object):
+ RECEIVED = PN_RECEIVED
ACCEPTED = PN_ACCEPTED
+ REJECTED = PN_REJECTED
+ RELEASED = PN_RELEASED
+ MODIFIED = PN_MODIFIED
+
+ def __init__(self, impl, local):
+ self._impl = impl
+ self.local = local
+ self._data = None
+ self._condition = None
+ self._annotations = None
+
+ @property
+ def type(self):
+ return pn_disposition_type(self._impl)
+
+ def _get_section_number(self):
+ return pn_disposition_get_section_number(self._impl)
+ def _set_section_number(self, n):
+ pn_disposition_set_section_number(self._impl, n)
+ section_number = property(_get_section_number, _set_section_number)
+
+ def _get_section_offset(self):
+ return pn_disposition_get_section_offset(self._impl)
+ def _set_section_offset(self, n):
+ pn_disposition_set_section_offset(self._impl, n)
+ section_offset = property(_get_section_offset, _set_section_offset)
+
+ def _get_failed(self):
+ return pn_disposition_is_failed(self._impl)
+ def _set_failed(self, b):
+ pn_disposition_set_failed(self._impl, b)
+ failed = property(_get_failed, _set_failed)
+
+ def _get_undeliverable(self):
+ return pn_disposition_is_undeliverable(self._impl)
+ def _set_undeliverable(self, b):
+ pn_disposition_set_undeliverable(self._impl, b)
+ undeliverable = property(_get_undeliverable, _set_undeliverable)
+
+ def _get_data(self):
+ if self.local:
+ return self._data
+ else:
+ return dat2obj(pn_disposition_data(self._impl))
+ def _set_data(self, obj):
+ if self.local:
+ self._data = obj
+ else:
+ raise AttributeError("data attribute is read-only")
+ data = property(_get_data, _set_data)
+
+ def _get_annotations(self):
+ if self.local:
+ return self._annotations
+ else:
+ return dat2obj(pn_disposition_annotations(self._impl))
+ def _set_annotations(self, obj):
+ if self.local:
+ self._annotations = obj
+ else:
+ raise AttributeError("annotations attribute is read-only")
+ annotations = property(_get_annotations, _set_annotations)
+
+ def _get_condition(self):
+ if self.local:
+ return self._condition
+ else:
+ return cond2obj(pn_disposition_condition(self._impl))
+ def _set_condition(self, obj):
+ if self.local:
+ self._condition = obj
+ else:
+ raise AttributeError("condition attribute is read-only")
+ condition = property(_get_condition, _set_condition)
+
+class Delivery(object):
+
+ RECEIVED = Disposition.RECEIVED
+ ACCEPTED = Disposition.ACCEPTED
+ REJECTED = Disposition.REJECTED
+ RELEASED = Disposition.RELEASED
+ MODIFIED = Disposition.MODIFIED
def __init__(self, dlv):
self._dlv = dlv
+ self.local = Disposition(pn_delivery_local(self._dlv), True)
+ self.remote = Disposition(pn_delivery_remote(self._dlv), False)
@property
def tag(self):
@@ -2283,6 +2369,9 @@ class Delivery(object):
return pn_delivery_updated(self._dlv)
def update(self, state):
+ obj2dat(self.local._data, pn_disposition_data(self.local._impl))
+ obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
+ obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
pn_delivery_update(self._dlv, state)
@property
@@ -2316,7 +2405,6 @@ class Delivery(object):
def link(self):
return wrap_link(pn_delivery_link(self._dlv))
-
class TransportException(ProtonException):
pass
@@ -2714,6 +2802,7 @@ __all__ = [
"Connector",
"Data",
"Delivery",
+ "Disposition",
"Described",
"Driver",
"DriverException",
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Mon Jun 17 17:54:54 2013
@@ -65,6 +65,8 @@ typedef enum {
PN_CONNECTION_CLOSE,
PN_NEVER
} pn_expiry_policy_t;
+
+typedef struct pn_disposition_t pn_disposition_t;
typedef struct pn_delivery_t pn_delivery_t;
typedef struct pn_delivery_tag_t {
@@ -91,18 +93,17 @@ typedef int pn_state_t; /**< encodes
#define PN_LOCAL_MASK (PN_LOCAL_UNINIT | PN_LOCAL_ACTIVE | PN_LOCAL_CLOSED)
#define PN_REMOTE_MASK (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED)
-/** @enum pn_disposition_t
+/**
* The state/outcome of a message transfer.
*
* @todo document each value
*/
-typedef enum pn_disposition_t {
- PN_RECEIVED=1,
- PN_ACCEPTED=2,
- PN_REJECTED=3,
- PN_RELEASED=4,
- PN_MODIFIED=5
-} pn_disposition_t;
+
+#define PN_RECEIVED (0x0000000000000023)
+#define PN_ACCEPTED (0x0000000000000024)
+#define PN_REJECTED (0x0000000000000025)
+#define PN_RELEASED (0x0000000000000026)
+#define PN_MODIFIED (0x0000000000000027)
typedef int pn_trace_t;
@@ -489,15 +490,17 @@ PN_EXTERN pn_delivery_t *pn_delivery(pn_
PN_EXTERN pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery);
PN_EXTERN pn_link_t *pn_delivery_link(pn_delivery_t *delivery);
// how do we do delivery state?
-PN_EXTERN pn_disposition_t pn_delivery_local_state(pn_delivery_t *delivery);
-PN_EXTERN pn_disposition_t pn_delivery_remote_state(pn_delivery_t *delivery);
+PN_EXTERN pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery);
+PN_EXTERN uint64_t pn_delivery_local_state(pn_delivery_t *delivery);
+PN_EXTERN pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery);
+PN_EXTERN uint64_t pn_delivery_remote_state(pn_delivery_t *delivery);
PN_EXTERN bool pn_delivery_settled(pn_delivery_t *delivery);
PN_EXTERN size_t pn_delivery_pending(pn_delivery_t *delivery);
PN_EXTERN bool pn_delivery_partial(pn_delivery_t *delivery);
PN_EXTERN bool pn_delivery_writable(pn_delivery_t *delivery);
PN_EXTERN bool pn_delivery_readable(pn_delivery_t *delivery);
PN_EXTERN bool pn_delivery_updated(pn_delivery_t *delivery);
-PN_EXTERN void pn_delivery_update(pn_delivery_t *delivery, pn_disposition_t disposition);
+PN_EXTERN void pn_delivery_update(pn_delivery_t *delivery, uint64_t state);
PN_EXTERN void pn_delivery_clear(pn_delivery_t *delivery);
//int pn_delivery_format(pn_delivery_t *delivery);
PN_EXTERN void pn_delivery_settle(pn_delivery_t *delivery);
@@ -505,6 +508,20 @@ PN_EXTERN void pn_delivery_dump(pn_deliv
PN_EXTERN void *pn_delivery_get_context(pn_delivery_t *delivery);
PN_EXTERN void pn_delivery_set_context(pn_delivery_t *delivery, void *context);
+// disposition
+PN_EXTERN uint64_t pn_disposition_type(pn_disposition_t *disposition);
+PN_EXTERN pn_data_t *pn_disposition_data(pn_disposition_t *disposition);
+PN_EXTERN uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition);
+PN_EXTERN void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number);
+PN_EXTERN uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition);
+PN_EXTERN void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset);
+PN_EXTERN bool pn_disposition_is_failed(pn_disposition_t *disposition);
+PN_EXTERN void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed);
+PN_EXTERN bool pn_disposition_is_undeliverable(pn_disposition_t *disposition);
+PN_EXTERN void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable);
+PN_EXTERN pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition);
+
+// conditions
PN_EXTERN pn_condition_t *pn_connection_condition(pn_connection_t *connection);
PN_EXTERN pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection);
@@ -514,6 +531,8 @@ PN_EXTERN pn_condition_t *pn_session_rem
PN_EXTERN pn_condition_t *pn_link_condition(pn_link_t *link);
PN_EXTERN pn_condition_t *pn_link_remote_condition(pn_link_t *link);
+PN_EXTERN pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition);
+
PN_EXTERN bool pn_condition_is_set(pn_condition_t *condition);
PN_EXTERN void pn_condition_clear(pn_condition_t *condition);
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Mon Jun 17 17:54:54 2013
@@ -127,6 +127,7 @@ struct pn_transport_t {
pn_data_t *remote_offered_capabilities;
pn_data_t *remote_desired_capabilities;
pn_data_t *remote_properties;
+ pn_data_t *disp_data;
//#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024)
#define PN_DEFAULT_MAX_FRAME_SIZE (0) /* for now, allow unlimited size */
uint32_t local_max_frame;
@@ -240,13 +241,23 @@ struct pn_link_t {
pn_link_state_t state;
};
+struct pn_disposition_t {
+ uint64_t type;
+ pn_data_t *data;
+ pn_data_t *annotations;
+ pn_condition_t condition;
+ uint32_t section_number;
+ uint64_t section_offset;
+ bool failed;
+ bool undeliverable;
+ bool settled;
+};
+
struct pn_delivery_t {
pn_link_t *link;
pn_buffer_t *tag;
- int local_state;
- int remote_state;
- bool local_settled;
- bool remote_settled;
+ pn_disposition_t local;
+ pn_disposition_t remote;
bool updated;
bool settled; // tracks whether we're in the unsettled list or not
pn_delivery_t *unsettled_next;
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Jun 17 17:54:54 2013
@@ -188,6 +188,7 @@ void pn_transport_free(pn_transport_t *t
pn_free(transport->remote_offered_capabilities);
pn_free(transport->remote_desired_capabilities);
pn_free(transport->remote_properties);
+ pn_free(transport->disp_data);
pn_error_free(transport->error);
pn_condition_tini(&transport->remote_condition);
pn_free(transport->local_channels);
@@ -472,7 +473,7 @@ void pn_work_update(pn_connection_t *con
{
pn_link_t *link = pn_delivery_link(delivery);
pn_delivery_t *current = pn_link_current(link);
- if (delivery->updated && !delivery->local_settled) {
+ if (delivery->updated && !delivery->local.settled) {
pn_add_work(connection, delivery);
} else if (delivery == current) {
if (link->endpoint.type == SENDER) {
@@ -787,6 +788,7 @@ void pn_transport_init(pn_transport_t *t
transport->remote_offered_capabilities = pn_data(16);
transport->remote_desired_capabilities = pn_data(16);
transport->remote_properties = pn_data(16);
+ transport->disp_data = pn_data(16);
transport->error = pn_error();
pn_condition_init(&transport->remote_condition);
@@ -1162,11 +1164,40 @@ pn_session_t *pn_link_session(pn_link_t
return link->session;
}
+static void pn_disposition_finalize(pn_disposition_t *ds)
+{
+ pn_free(ds->data);
+ pn_free(ds->annotations);
+ pn_condition_tini(&ds->condition);
+}
+
static void pn_delivery_finalize(void *object)
{
pn_delivery_t *delivery = (pn_delivery_t *) object;
pn_buffer_free(delivery->tag);
pn_buffer_free(delivery->bytes);
+ pn_disposition_finalize(&delivery->local);
+ pn_disposition_finalize(&delivery->remote);
+}
+
+static void pn_disposition_init(pn_disposition_t *ds)
+{
+ ds->data = pn_data(16);
+ ds->annotations = pn_data(16);
+ pn_condition_init(&ds->condition);
+}
+
+static void pn_disposition_clear(pn_disposition_t *ds)
+{
+ ds->type = 0;
+ ds->section_number = 0;
+ ds->section_offset = 0;
+ ds->failed = false;
+ ds->undeliverable = false;
+ ds->settled = false;
+ pn_data_clear(ds->data);
+ pn_data_clear(ds->annotations);
+ pn_condition_clear(&ds->condition);
}
pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
@@ -1180,16 +1211,16 @@ pn_delivery_t *pn_delivery(pn_link_t *li
if (!delivery) return NULL;
delivery->tag = pn_buffer(16);
delivery->bytes = pn_buffer(64);
+ pn_disposition_init(&delivery->local);
+ pn_disposition_init(&delivery->remote);
} else {
assert(!delivery->tpwork);
}
delivery->link = link;
pn_buffer_clear(delivery->tag);
pn_buffer_append(delivery->tag, tag.bytes, tag.size);
- delivery->local_state = 0;
- delivery->remote_state = 0;
- delivery->local_settled = false;
- delivery->remote_settled = false;
+ pn_disposition_clear(&delivery->local);
+ pn_disposition_clear(&delivery->remote);
delivery->updated = false;
delivery->settled = false;
LL_ADD(link, unsettled, delivery);
@@ -1225,7 +1256,7 @@ int pn_link_unsettled(pn_link_t *link)
pn_delivery_t *pn_unsettled_head(pn_link_t *link)
{
pn_delivery_t *d = link->unsettled_head;
- while (d && d->local_settled) {
+ while (d && d->local.settled) {
d = d->unsettled_next;
}
return d;
@@ -1234,7 +1265,7 @@ pn_delivery_t *pn_unsettled_head(pn_link
pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery)
{
pn_delivery_t *d = delivery->unsettled_next;
- while (d && d->local_settled) {
+ while (d && d->local.settled) {
d = d->unsettled_next;
}
return d;
@@ -1251,11 +1282,11 @@ void pn_delivery_dump(pn_delivery_t *d)
char tag[1024];
pn_bytes_t bytes = pn_buffer_bytes(d->tag);
pn_quote_data(tag, 1024, bytes.start, bytes.size);
- printf("{tag=%s, local_state=%u, remote_state=%u, local_settled=%u, "
- "remote_settled=%u, updated=%u, current=%u, writable=%u, readable=%u, "
+ printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%u, "
+ "remote.settled=%u, updated=%u, current=%u, writable=%u, readable=%u, "
"work=%u}",
- tag, d->local_state, d->remote_state, d->local_settled,
- d->remote_settled, d->updated, pn_is_current(d),
+ tag, d->local.type, d->remote.type, d->local.settled,
+ d->remote.settled, d->updated, pn_is_current(d),
pn_delivery_writable(d), pn_delivery_readable(d), d->work);
}
@@ -1271,6 +1302,122 @@ void pn_delivery_set_context(pn_delivery
delivery->context = context;
}
+uint64_t pn_disposition_type(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->type;
+}
+
+pn_data_t *pn_disposition_data(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->data;
+}
+
+uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->section_number;
+}
+
+void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number)
+{
+ assert(disposition);
+ disposition->section_number = section_number;
+}
+
+uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->section_offset;
+}
+
+void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset)
+{
+ assert(disposition);
+ disposition->section_offset = section_offset;
+}
+
+bool pn_disposition_is_failed(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->failed;
+}
+
+void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed)
+{
+ assert(disposition);
+ disposition->failed = failed;
+}
+
+bool pn_disposition_is_undeliverable(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->undeliverable;
+}
+
+void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable)
+{
+ assert(disposition);
+ disposition->undeliverable = undeliverable;
+}
+
+pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->annotations;
+}
+
+pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return &disposition->condition;
+}
+
+bool pni_disposition_batchable(pn_disposition_t *disposition)
+{
+ switch (disposition->type) {
+ case PN_ACCEPTED:
+ return true;
+ case PN_RELEASED:
+ return true;
+ default:
+ return false;
+ }
+}
+
+void pni_disposition_encode(pn_disposition_t *disposition, pn_data_t *data)
+{
+ pn_condition_t *cond = &disposition->condition;
+ switch (disposition->type) {
+ case PN_RECEIVED:
+ pn_data_put_list(data);
+ pn_data_enter(data);
+ pn_data_put_uint(data, disposition->section_number);
+ pn_data_put_ulong(data, disposition->section_offset);
+ pn_data_exit(data);
+ break;
+ case PN_ACCEPTED:
+ case PN_RELEASED:
+ return;
+ case PN_REJECTED:
+ pn_data_fill(data, "[?DL[sSC]]", pn_condition_is_set(cond), ERROR,
+ pn_condition_get_name(cond),
+ pn_condition_get_description(cond),
+ pn_condition_info(cond));
+ break;
+ case PN_MODIFIED:
+ pn_data_fill(data, "[ooC]",
+ disposition->failed,
+ disposition->undeliverable,
+ disposition->annotations);
+ break;
+ default:
+ pn_data_copy(data, disposition->data);
+ break;
+ }
+}
+
pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
{
if (delivery) {
@@ -1377,7 +1524,7 @@ void pn_delivery_settle(pn_delivery_t *d
}
link->unsettled_count--;
- delivery->local_settled = true;
+ delivery->local.settled = true;
pn_add_tpwork(delivery);
pn_work_update(delivery->link->session->connection, delivery);
}
@@ -1664,8 +1811,8 @@ int pn_do_transfer(pn_dispatcher_t *disp
link->state.link_credit--;
link->queued++;
- // XXX: need to fill in remote state: delivery->remote_state = ...;
- delivery->remote_settled = settled;
+ // XXX: need to fill in remote state: delivery->remote.state = ...;
+ delivery->remote.settled = settled;
delivery->updated = true;
pn_work_update(transport->connection, delivery);
}
@@ -1734,36 +1881,38 @@ int pn_do_flow(pn_dispatcher_t *disp)
return 0;
}
+#define SCAN_ERROR_DEFAULT ("D.[D.[sSC]")
+#define SCAN_ERROR_DETACH ("D.[..D.[sSC]")
+#define SCAN_ERROR_DISP ("[D.[sSC]")
+
+static int pn_scan_error(pn_data_t *data, pn_condition_t *condition, const char *fmt)
+{
+ pn_bytes_t cond;
+ pn_bytes_t desc;
+ pn_condition_clear(condition);
+ int err = pn_data_scan(data, fmt, &cond, &desc, condition->info);
+ if (err) return err;
+ strncat(condition->name, cond.start, cond.size);
+ strncat(condition->description, desc.start, desc.size);
+ pn_data_rewind(condition->info);
+ return 0;
+}
+
int pn_do_disposition(pn_dispatcher_t *disp)
{
pn_transport_t *transport = (pn_transport_t *) disp->context;
bool role;
pn_sequence_t first, last;
- uint64_t code;
- bool last_init, settled, code_init;
- int err = pn_scan_args(disp, "D.[oI?IoD?L[]]", &role, &first, &last_init,
- &last, &settled, &code_init, &code);
+ uint64_t type = 0;
+ bool last_init, settled, type_init;
+ pn_data_clear(transport->disp_data);
+ int err = pn_scan_args(disp, "D.[oI?IoD?LC]", &role, &first, &last_init,
+ &last, &settled, &type_init, &type,
+ transport->disp_data);
if (err) return err;
if (!last_init) last = first;
pn_session_t *ssn = pn_channel_state(transport, disp->channel);
- pn_disposition_t dispo = (pn_disposition_t) 0;
- if (code_init) {
- switch (code)
- {
- case ACCEPTED:
- dispo = PN_ACCEPTED;
- break;
- case REJECTED:
- dispo = PN_REJECTED;
- break;
- default:
- // XXX
- fprintf(stderr, "default %" PRIu64 "\n", code);
- break;
- }
- }
-
pn_delivery_map_t *deliveries;
if (role) {
deliveries = &ssn->state.outgoing;
@@ -1771,11 +1920,53 @@ int pn_do_disposition(pn_dispatcher_t *d
deliveries = &ssn->state.incoming;
}
+ pn_data_rewind(transport->disp_data);
+ bool remote_data = (pn_data_next(transport->disp_data) &&
+ pn_data_get_list(transport->disp_data) > 0);
+
for (pn_sequence_t id = first; id <= last; id++) {
pn_delivery_t *delivery = pn_delivery_map_get(deliveries, id);
+ pn_disposition_t *remote = &delivery->remote;
if (delivery) {
- delivery->remote_state = dispo;
- delivery->remote_settled = settled;
+ if (type_init) remote->type = type;
+ if (remote_data) {
+ switch (type) {
+ case PN_RECEIVED:
+ pn_data_rewind(transport->disp_data);
+ pn_data_next(transport->disp_data);
+ pn_data_enter(transport->disp_data);
+ if (pn_data_next(transport->disp_data))
+ remote->section_number = pn_data_get_uint(transport->disp_data);
+ if (pn_data_next(transport->disp_data))
+ remote->section_offset = pn_data_get_ulong(transport->disp_data);
+ break;
+ case PN_ACCEPTED:
+ break;
+ case PN_REJECTED:
+ err = pn_scan_error(transport->disp_data, &remote->condition, SCAN_ERROR_DISP);
+ if (err) return err;
+ break;
+ case PN_RELEASED:
+ break;
+ case PN_MODIFIED:
+ pn_data_rewind(transport->disp_data);
+ pn_data_next(transport->disp_data);
+ pn_data_enter(transport->disp_data);
+ if (pn_data_next(transport->disp_data))
+ remote->failed = pn_data_get_bool(transport->disp_data);
+ if (pn_data_next(transport->disp_data))
+ remote->undeliverable = pn_data_get_bool(transport->disp_data);
+ pn_data_narrow(transport->disp_data);
+ pn_data_clear(remote->data);
+ pn_data_appendn(remote->annotations, transport->disp_data, 1);
+ pn_data_widen(transport->disp_data);
+ break;
+ default:
+ pn_data_copy(remote->data, transport->disp_data);
+ break;
+ }
+ }
+ remote->settled = settled;
delivery->updated = true;
pn_work_update(transport->connection, delivery);
}
@@ -1784,20 +1975,6 @@ int pn_do_disposition(pn_dispatcher_t *d
return 0;
}
-static int pn_scan_error(pn_dispatcher_t *disp, pn_condition_t *condition, bool detach)
-{
- pn_bytes_t cond;
- pn_bytes_t desc;
- pn_condition_clear(condition);
- int err = pn_scan_args(disp, detach ? "D.[..D.[sSC]" : "D.[D.[sSC]",
- &cond, &desc, condition->info);
- if (err) return err;
- strncat(condition->name, cond.start, cond.size);
- strncat(condition->description, desc.start, desc.size);
- pn_data_rewind(condition->info);
- return 0;
-}
-
int pn_do_detach(pn_dispatcher_t *disp)
{
pn_transport_t *transport = (pn_transport_t *) disp->context;
@@ -1812,7 +1989,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
}
pn_link_t *link = pn_handle_state(ssn, handle);
- err = pn_scan_error(disp, &link->endpoint.remote_condition, true);
+ err = pn_scan_error(disp->args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH);
if (err) return err;
pn_unmap_handle(ssn, link);
@@ -1831,7 +2008,7 @@ int pn_do_end(pn_dispatcher_t *disp)
{
pn_transport_t *transport = (pn_transport_t *) disp->context;
pn_session_t *ssn = pn_channel_state(transport, disp->channel);
- int err = pn_scan_error(disp, &ssn->endpoint.remote_condition, false);
+ int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT);
if (err) return err;
pn_unmap_channel(transport, ssn);
PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
@@ -1842,7 +2019,7 @@ int pn_do_close(pn_dispatcher_t *disp)
{
pn_transport_t *transport = (pn_transport_t *) disp->context;
pn_connection_t *conn = transport->connection;
- int err = pn_scan_error(disp, &transport->remote_condition, false);
+ int err = pn_scan_error(disp->args, &transport->remote_condition, SCAN_ERROR_DEFAULT);
if (err) return err;
transport->close_rcvd = true;
PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
@@ -2208,29 +2385,25 @@ int pn_post_disp(pn_transport_t *transpo
pn_modified(transport->connection, &link->session->endpoint);
pn_delivery_state_t *state = &delivery->state;
assert(state->init);
- uint64_t code;
- switch(delivery->local_state) {
- case PN_ACCEPTED:
- code = ACCEPTED;
- break;
- case PN_RELEASED:
- code = RELEASED;
- break;
- case PN_REJECTED:
- code = REJECTED;
- break;
- //TODO: rejected and modified (both take extra data which may need to be passed through somehow) e.g. change from enum to discriminated union?
- default:
- code = 0;
- }
+ bool role = (link->endpoint.type == RECEIVER);
+ uint64_t code = delivery->local.type;
- if (!code && !delivery->local_settled) {
+ if (!code && !delivery->local.settled) {
return 0;
}
+ if (!pni_disposition_batchable(&delivery->local)) {
+ pn_data_clear(transport->disp_data);
+ pni_disposition_encode(&delivery->local, transport->disp_data);
+ return pn_post_frame(transport->disp, ssn->state.local_channel,
+ "DL[oIIo?DLC]", DISPOSITION,
+ role, state->id, state->id, delivery->local.settled,
+ (bool)code, code, transport->disp_data);
+ }
+
if (ssn_state->disp && code == ssn_state->disp_code &&
- delivery->local_settled == ssn_state->disp_settled &&
- ssn_state->disp_type == (link->endpoint.type == RECEIVER)) {
+ delivery->local.settled == ssn_state->disp_settled &&
+ ssn_state->disp_type == role) {
if (state->id == ssn_state->disp_first - 1) {
ssn_state->disp_first = state->id;
return 0;
@@ -2245,9 +2418,9 @@ int pn_post_disp(pn_transport_t *transpo
if (err) return err;
}
- ssn_state->disp_type = (link->endpoint.type == RECEIVER);
+ ssn_state->disp_type = role;
ssn_state->disp_code = code;
- ssn_state->disp_settled = delivery->local_settled;
+ ssn_state->disp_settled = delivery->local.settled;
ssn_state->disp_first = state->id;
ssn_state->disp_last = state->id;
ssn_state->disp = true;
@@ -2280,7 +2453,7 @@ int pn_process_tpwork_sender(pn_transpor
link_state->local_handle,
state->id, &tag,
0, // message-format
- delivery->local_settled,
+ delivery->local.settled,
!delivery->done,
ssn_state->remote_incoming_window);
if (count < 0) return count;
@@ -2300,13 +2473,13 @@ int pn_process_tpwork_sender(pn_transpor
}
pn_delivery_state_t *state = delivery->state.init ? &delivery->state : NULL;
- if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled
+ if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote.settled
&& state && state->sent && !xfr_posted) {
int err = pn_post_disp(transport, delivery);
if (err) return err;
}
- if (delivery->local_settled && state && state->sent) {
+ if (delivery->local.settled && state && state->sent) {
pn_full_settle(&ssn_state->outgoing, delivery);
}
@@ -2318,12 +2491,12 @@ int pn_process_tpwork_receiver(pn_transp
pn_link_t *link = delivery->link;
// XXX: need to prevent duplicate disposition sending
pn_session_t *ssn = link->session;
- if ((int16_t) ssn->state.local_channel >= 0 && !delivery->remote_settled && delivery->state.init) {
+ if ((int16_t) ssn->state.local_channel >= 0 && !delivery->remote.settled && delivery->state.init) {
int err = pn_post_disp(transport, delivery);
if (err) return err;
}
- if (delivery->local_settled) {
+ if (delivery->local.settled) {
pn_full_settle(&ssn->state.incoming, delivery);
}
@@ -2796,19 +2969,33 @@ pn_link_t *pn_delivery_link(pn_delivery_
return delivery->link;
}
-pn_disposition_t pn_delivery_local_state(pn_delivery_t *delivery)
+pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ return &delivery->local;
+}
+
+uint64_t pn_delivery_local_state(pn_delivery_t *delivery)
{
- return (pn_disposition_t) delivery->local_state;
+ assert(delivery);
+ return delivery->local.type;
}
-pn_disposition_t pn_delivery_remote_state(pn_delivery_t *delivery)
+pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery)
{
- return (pn_disposition_t) delivery->remote_state;
+ assert(delivery);
+ return &delivery->remote;
+}
+
+uint64_t pn_delivery_remote_state(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ return delivery->remote.type;
}
bool pn_delivery_settled(pn_delivery_t *delivery)
{
- return delivery ? delivery->remote_settled : false;
+ return delivery ? delivery->remote.settled : false;
}
bool pn_delivery_updated(pn_delivery_t *delivery)
@@ -2822,10 +3009,10 @@ void pn_delivery_clear(pn_delivery_t *de
pn_work_update(delivery->link->session->connection, delivery);
}
-void pn_delivery_update(pn_delivery_t *delivery, pn_disposition_t disposition)
+void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
{
if (!delivery) return;
- delivery->local_state = disposition;
+ delivery->local.type = state;
pn_add_tpwork(delivery);
}
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Mon Jun 17 17:54:54 2013
@@ -1138,7 +1138,7 @@ pn_status_t pn_messenger_status(pn_messe
int pn_messenger_settle(pn_messenger_t *messenger, pn_tracker_t tracker, int flags)
{
pni_store_t *store = pn_tracker_store(messenger, tracker);
- return pni_store_update(store, pn_tracker_sequence(tracker), (pn_status_t) 0, flags, true, true);
+ return pni_store_update(store, pn_tracker_sequence(tracker), PN_STATUS_UNKNOWN, flags, true, true);
}
// true if all pending output has been sent to peer
@@ -1290,7 +1290,7 @@ int pn_messenger_accept(pn_messenger_t *
}
return pni_store_update(messenger->incoming, pn_tracker_sequence(tracker),
- (pn_status_t) PN_ACCEPTED, flags, false, false);
+ PN_STATUS_ACCEPTED, flags, false, false);
}
int pn_messenger_reject(pn_messenger_t *messenger, pn_tracker_t tracker, int flags)
@@ -1301,7 +1301,7 @@ int pn_messenger_reject(pn_messenger_t *
}
return pni_store_update(messenger->incoming, pn_tracker_sequence(tracker),
- (pn_status_t) PN_REJECTED, flags, false, false);
+ PN_STATUS_REJECTED, flags, false, false);
}
int pn_messenger_queued(pn_messenger_t *messenger, bool sender)
Modified: qpid/proton/trunk/proton-c/src/messenger/store.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.c?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/store.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/store.c Mon Jun 17 17:54:54 2013
@@ -282,7 +282,7 @@ void *pni_entry_get_context(pni_entry_t
return entry->context;
}
-static pn_status_t disp2status(pn_disposition_t disp)
+static pn_status_t disp2status(uint64_t disp)
{
if (!disp) return PN_STATUS_PENDING;
Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Mon Jun 17 17:54:54 2013
@@ -216,7 +216,7 @@ void server_callback(pn_connector_t *cto
}
if (pn_delivery_updated(delivery)) {
- if (!ctx->quiet) printf("disposition for %s: %u\n", tagstr, pn_delivery_remote_state(delivery));
+ if (!ctx->quiet) printf("disposition for %s: %" PRIu64 "\n", tagstr, pn_delivery_remote_state(delivery));
pn_delivery_settle(delivery);
}
@@ -377,7 +377,7 @@ void client_callback(pn_connector_t *cto
}
if (pn_delivery_updated(delivery)) {
- if (!ctx->quiet) printf("disposition for %s: %u\n", tagstr, pn_delivery_remote_state(delivery));
+ if (!ctx->quiet) printf("disposition for %s: %" PRIu64 "\n", tagstr, pn_delivery_remote_state(delivery));
pn_delivery_clear(delivery);
pn_delivery_settle(delivery);
if (!--ctx->send_count) {
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Mon Jun 17 17:54:54 2013
@@ -34,7 +34,9 @@ from org.apache.qpid.proton.message impo
from org.apache.qpid.proton.codec import \
DataFactory, Data as JData
from org.apache.qpid.proton.messenger import MessengerFactory, MessengerException, Status
-from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted, AmqpValue
+from org.apache.qpid.proton.amqp.transport import ErrorCondition
+from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted, \
+ Rejected, Received, Modified, Released, AmqpValue
from org.apache.qpid.proton.amqp import UnsignedInteger, UnsignedLong, UnsignedByte, UnsignedShort, Symbol, \
Decimal32, Decimal64, Decimal128
from jarray import zeros, array
@@ -147,26 +149,37 @@ class Endpoint(object):
def close(self):
if self.condition is not None:
- if self.condition.impl is None:
- self.condition.impl = self.impl.getCondition()
- self.condition.impl.setCondition(Symbol.getSymbol(self.condition.name))
- self.condition.impl.setDescription(self.condition.description)
- self.condition.impl.setInfo(self.condition.info)
+ self.impl.setCondition(self.condition.impl)
self.impl.close()
-class Condition:
+class Condition(object):
def __init__(self, name=None, description=None, info=None, impl=None):
- self.name = name
- self.description = description
- self.info = info
- if impl is not None:
- self.impl = impl
- self.name = impl.getCondition()
- self.description = impl.getDescription()
- self.info = impl.getInfo()
- else:
- self.impl = None
+ if impl is None:
+ impl = ErrorCondition(Symbol.valueOf(name), description)
+ if info is not None:
+ impl.setInfo(info)
+ self.impl = impl
+
+ def _get_name(self):
+ c = self.impl.getCondition()
+ if c is not None:
+ return c.toString()
+ def _set_name(self, n):
+ self.impl.setCondition(Symbol.valueOf(n))
+ name = property(_get_name, _set_name)
+
+ def _get_description(self):
+ return self.impl.getDescription()
+ def _set_description(self, d):
+ self.impl.setDescription(d)
+ description = property(_get_description, _set_description)
+
+ def _get_info(self):
+ return self.impl.getInfo()
+ def _set_info(self, i):
+ self.impl.setInfo(i)
+ info = property(_get_info, _get_description)
def __repr__(self):
return "Condition(%s)" % ", ".join([repr(x) for x in
@@ -175,10 +188,10 @@ class Condition:
def __eq__(self, o):
if not isinstance(o, Condition): return False
- return (self.impl is not None and o.impl is not None and self.impl.equals(o.impl)) or \
- (self.name == o.name and \
- self.description == o.description and \
- self.info == o.info)
+ return self.impl.equals(o.impl)
+
+ def _2J(self):
+ return self.impl
def wrap_connection(impl):
if impl: return Connection(_impl = impl)
@@ -410,19 +423,175 @@ class Receiver(Link):
else:
raise Exception(n)
+class Disposition(object):
+
+ RECEIVED = 0x23
+ ACCEPTED = 0x24
+ REJECTED = 0x25
+ RELEASED = 0x26
+ MODIFIED = 0x27
+
+ def __init__(self):
+ self.type = 0
+ self._received = None
+ self._accepted = None
+ self._rejected = None
+ self._released = None
+ self._modified = None
+
+ def _get_section_number(self):
+ if self._received:
+ return J2PY(self._received.getSectionNumber())
+ else:
+ return 0
+ def _set_section_number(self, n):
+ if not self._received:
+ self._received = Received()
+ self._received.setSectionNumber(UnsignedInteger(n))
+ section_number = property(_get_section_number, _set_section_number)
+
+ def _get_section_offset(self):
+ if self._received:
+ return J2PY(self._received.getSectionOffset())
+ else:
+ return 0
+ def _set_section_offset(self, n):
+ if not self._received:
+ self._received = Received()
+ self._received.setSectionOffset(UnsignedLong(n))
+ section_offset = property(_get_section_offset, _set_section_offset)
+
+ def _get_failed(self):
+ if self._modified:
+ return self._modified.getDeliveryFailed()
+ else:
+ return False
+ def _set_failed(self, b):
+ if not self._modified:
+ self._modified = Modified()
+ self._modified.setDeliveryFailed(b)
+ failed = property(_get_failed, _set_failed)
+
+ def _get_undeliverable(self):
+ if self._modified:
+ return self._modified.getUndeliverableHere()
+ else:
+ return False
+ def _set_undeliverable(self, b):
+ if not self._modified:
+ self._modified = Modified()
+ self._modified.setUndeliverableHere(b)
+ undeliverable = property(_get_undeliverable, _set_undeliverable)
+
+ def _get_data(self):
+ return None
+ def _set_data(self, obj):
+ raise Skipped()
+ data = property(_get_data, _set_data)
+
+ def _get_annotations(self):
+ if self._modified:
+ return J2PY(self._modified.getMessageAnnotations())
+ else:
+ return None
+ def _set_annotations(self, obj):
+ if not self._modified:
+ self._modified = Modified()
+ self._modified.setMessageAnnotations(PY2J(obj))
+ annotations = property(_get_annotations, _set_annotations)
+
+ def _get_condition(self):
+ if self._rejected:
+ return Condition(impl = self._rejected.getError())
+ else:
+ return None
+ def _set_condition(self, obj):
+ if not self._rejected:
+ self._rejected = Rejected()
+ self._rejected.setError(obj._2J())
+ condition = property(_get_condition, _set_condition)
+
+ def _as_received(self):
+ if self._received is None:
+ self._received = Received()
+ return self._received
+
+ def _as_accepted(self):
+ if self._accepted is None:
+ self._accepted = Accepted.getInstance()
+ return self._accepted
+
+ def _as_rejected(self):
+ if self._rejected is None:
+ self._rejected = Rejected()
+ return self._rejected
+
+ def _as_released(self):
+ if self._released is None:
+ self._released = Released.getInstance()
+ return self._released
+
+ def _as_modified(self):
+ if self._modified is None:
+ self._modified = Modified()
+ return self._modified
+
+ PY2J = {
+ RECEIVED: _as_received,
+ ACCEPTED: _as_accepted,
+ REJECTED: _as_rejected,
+ RELEASED: _as_released,
+ MODIFIED: _as_modified
+ }
+
+ def _2J(self):
+ return self.PY2J[self.type](self)
+
+ def _from_received(self, s):
+ self.type = self.RECEIVED
+ self._received = s
+
+ def _from_accepted(self, s):
+ self.type = self.ACCEPTED
+ self._accepted = s
+
+ def _from_rejected(self, s):
+ self.type = self.REJECTED
+ self._rejected = s
+
+ def _from_released(self, s):
+ self.type = self.RELEASED
+ self._released = s
+
+ def _from_modified(self, s):
+ self.type = self.MODIFIED
+ self._modified = s
+
+ J2PY = {
+ Received: _from_received,
+ Accepted: _from_accepted,
+ Rejected: _from_rejected,
+ Released: _from_released,
+ Modified: _from_modified
+ }
+
+ def _2PY(self, impl):
+ self.J2PY[type(impl)](self, impl)
+
def wrap_delivery(impl):
if impl: return Delivery(impl)
class Delivery(object):
- RECEIVED = 1
- ACCEPTED = 2
- REJECTED = 3
- RELEASED = 4
- MODIFIED = 5
+ RECEIVED = Disposition.RECEIVED
+ ACCEPTED = Disposition.ACCEPTED
+ REJECTED = Disposition.REJECTED
+ RELEASED = Disposition.RELEASED
+ MODIFIED = Disposition.MODIFIED
def __init__(self, impl):
self.impl = impl
+ self.local = Disposition()
@property
def tag(self):
@@ -441,26 +610,22 @@ class Delivery(object):
return self.impl.isUpdated()
def update(self, disp):
- if disp == self.ACCEPTED:
- self.impl.disposition(Accepted.getInstance())
- else:
- raise Exception("xxx: %s" % disp)
+ self.local.type = disp
+ self.impl.disposition(self.local._2J())
+
+ @property
+ def remote(self):
+ d = Disposition()
+ d._2PY(self.impl.getRemoteState())
+ return d
@property
def remote_state(self):
- rd = self.impl.getRemoteState()
- if(rd == Accepted.getInstance()):
- return self.ACCEPTED
- else:
- raise Exception("xxx: %s" % rd)
+ return self.remote.type
@property
def local_state(self):
- ld = self.impl.getLocalState()
- if(ld == Accepted.getInstance()):
- return self.ACCEPTED
- else:
- raise Exception("xxx: %s" % ld)
+ return self.local.type
def settle(self):
self.impl.settle()
@@ -1441,7 +1606,9 @@ conversions_J2PY = {
dict: lambda d: dict([(J2PY(k), J2PY(v)) for k, v in d.items()]),
HashMap: lambda m: dict([(J2PY(e.getKey()), J2PY(e.getValue())) for e in m.entrySet()]),
list: lambda l: [J2PY(x) for x in l],
- Symbol: lambda s: symbol(s.toString())
+ Symbol: lambda s: symbol(s.toString()),
+ UnsignedInteger: lambda n: n.longValue(),
+ UnsignedLong: lambda n: n.longValue()
}
conversions_PY2J = {
@@ -1476,6 +1643,7 @@ __all__ = [
"Connector",
"Data",
"Delivery",
+ "Disposition",
"Described",
"Driver",
"Endpoint",
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Mon Jun 17 17:54:54 2013
@@ -49,20 +49,15 @@ public class DeliveryImpl implements Del
private static final int DELIVERY_STATE_CHANGED = 1;
private static final int ABLE_TO_SEND = 2;
private static final int IO_WORK = 4;
+ private static final int DELIVERY_SETTLED = 8;
/**
* A bit-mask representing the outstanding work on this delivery received from the transport layer
* that has not yet been processed by the application.
- * Note that contrast with {@link #_transportFlags}.
*/
private int _flags = (byte) 0;
- /**
- * A bit-mask representing the outstanding work done by the application on this delivery
- * that has not yet been processed by the transport layer
- */
- private int _transportFlags = (byte) 0;
private TransportDelivery _transportDelivery;
private byte[] _data;
private int _dataSize;
@@ -118,7 +113,7 @@ public class DeliveryImpl implements Del
_deliveryState = state;
if(!_remoteSettled)
{
- setTransportFlag(DELIVERY_STATE_CHANGED);
+ addToTransportWorkList();
}
}
@@ -128,7 +123,7 @@ public class DeliveryImpl implements Del
_link.decrementUnsettled();
if(!_remoteSettled)
{
- setTransportFlag(DELIVERY_STATE_CHANGED);
+ addToTransportWorkList();
}
else
{
@@ -303,16 +298,6 @@ public class DeliveryImpl implements Del
}
- private void setTransportFlag(int flag)
- {
- boolean addWork = (_transportFlags == 0);
- _transportFlags = _transportFlags | flag;
- if(addWork)
- {
- addToTransportWorkList();
- }
- }
-
DeliveryImpl getTransportWorkNext()
{
return _transportWorkNext;
@@ -334,11 +319,6 @@ public class DeliveryImpl implements Del
_transportWorkPrev = transportWorkPrev;
}
- boolean isLocalStateChange()
- {
- return (_transportFlags & DELIVERY_STATE_CHANGED) != 0;
- }
-
TransportDelivery getTransportDelivery()
{
return _transportDelivery;
@@ -476,7 +456,6 @@ public class DeliveryImpl implements Del
.append(", _remoteSettled=").append(_remoteSettled)
.append(", _remoteDeliveryState=").append(_remoteDeliveryState)
.append(", _flags=").append(_flags)
- .append(", _transportFlags=").append(_transportFlags)
.append(", _transportDelivery=").append(_transportDelivery)
.append(", _dataSize=").append(_dataSize)
.append(", _complete=").append(_complete)
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Mon Jun 17 17:54:54 2013
@@ -394,7 +394,7 @@ public class TransportImpl extends Endpo
DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
while(delivery != null && buffer.remaining() >= _maxFrameSize )
{
- if((delivery.getLink() instanceof SenderImpl) && delivery.isLocalStateChange() && delivery.getTransportDelivery() != null)
+ if((delivery.getLink() instanceof SenderImpl) && delivery.getTransportDelivery() != null)
{
TransportDelivery transportDelivery = delivery.getTransportDelivery();
Disposition disposition = new Disposition();
@@ -509,7 +509,7 @@ public class TransportImpl extends Endpo
DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
while(delivery != null && buffer.remaining() >= _maxFrameSize)
{
- if((delivery.getLink() instanceof ReceiverImpl) && delivery.isLocalStateChange())
+ if((delivery.getLink() instanceof ReceiverImpl))
{
TransportDelivery transportDelivery = delivery.getTransportDelivery();
Disposition disposition = new Disposition();
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Mon Jun 17 17:54:54 2013
@@ -1510,5 +1510,161 @@ class ServerTest(Test):
self.server.stop()
+class NoValue:
+ def __init__(self):
+ pass
+ def apply(self, dlv):
+ pass
+
+ def check(self, dlv):
+ assert dlv.data == None
+ assert dlv.section_number == 0
+ assert dlv.section_offset == 0
+ assert dlv.condition == None
+ assert dlv.failed == False
+ assert dlv.undeliverable == False
+ assert dlv.annotations == None
+
+class RejectValue:
+ def __init__(self, condition):
+ self.condition = condition
+
+ def apply(self, dlv):
+ dlv.condition = self.condition
+
+ def check(self, dlv):
+ assert dlv.data == None, dlv.data
+ assert dlv.section_number == 0
+ assert dlv.section_offset == 0
+ assert dlv.condition == self.condition, (dlv.condition, self.condition)
+ assert dlv.failed == False
+ assert dlv.undeliverable == False
+ assert dlv.annotations == None
+
+class ReceivedValue:
+ def __init__(self, section_number, section_offset):
+ self.section_number = section_number
+ self.section_offset = section_offset
+
+ def apply(self, dlv):
+ dlv.section_number = self.section_number
+ dlv.section_offset = self.section_offset
+
+ def check(self, dlv):
+ assert dlv.data == None, dlv.data
+ assert dlv.section_number == self.section_number, (dlv.section_number, self.section_number)
+ assert dlv.section_offset == self.section_offset
+ assert dlv.condition == None
+ assert dlv.failed == False
+ assert dlv.undeliverable == False
+ assert dlv.annotations == None
+
+class ModifiedValue:
+ def __init__(self, failed, undeliverable, annotations):
+ self.failed = failed
+ self.undeliverable = undeliverable
+ self.annotations = annotations
+
+ def apply(self, dlv):
+ dlv.failed = self.failed
+ dlv.undeliverable = self.undeliverable
+ dlv.annotations = self.annotations
+
+ def check(self, dlv):
+ assert dlv.data == None, dlv.data
+ assert dlv.section_number == 0
+ assert dlv.section_offset == 0
+ assert dlv.condition == None
+ assert dlv.failed == self.failed
+ assert dlv.undeliverable == self.undeliverable
+ assert dlv.annotations == self.annotations, (dlv.annotations, self.annotations)
+
+class CustomValue:
+ def __init__(self, data):
+ self.data = data
+
+ def apply(self, dlv):
+ dlv.data = self.data
+
+ def check(self, dlv):
+ assert dlv.data == self.data, (dlv.data, self.data)
+ assert dlv.section_number == 0
+ assert dlv.section_offset == 0
+ assert dlv.condition == None
+ assert dlv.failed == False
+ assert dlv.undeliverable == False
+ assert dlv.annotations == None
+
+class DeliveryTest(Test):
+
+ def teardown(self):
+ self.cleanup()
+
+ def testDisposition(self, count=1, tag="tag%i", type=Delivery.ACCEPTED, value=NoValue()):
+ snd, rcv = self.link("test-link")
+ snd.open()
+ rcv.open()
+
+ snd_deliveries = []
+ for i in range(count):
+ d = snd.delivery(tag % i)
+ snd_deliveries.append(d)
+ snd.advance()
+
+ rcv.flow(count)
+ self.pump()
+
+ rcv_deliveries = []
+ for i in range(count):
+ d = rcv.current
+ assert d.tag == (tag % i)
+ rcv_deliveries.append(d)
+ rcv.advance()
+
+ for d in rcv_deliveries:
+ value.apply(d.local)
+ d.update(type)
+
+ self.pump()
+
+ for d in snd_deliveries:
+ assert d.remote_state == type
+ assert d.remote.type == type
+ value.check(d.remote)
+ value.apply(d.local)
+ d.update(type)
+
+ self.pump()
+
+ for d in rcv_deliveries:
+ assert d.remote_state == type
+ assert d.remote.type == type
+ value.check(d.remote)
+
+ for d in snd_deliveries:
+ d.settle()
+
+ self.pump()
+
+ for d in rcv_deliveries:
+ assert d.settled, d.settled
+ d.settle()
+
+ def testReceived(self):
+ self.testDisposition(type=Disposition.RECEIVED, value=ReceivedValue(1, 2))
+
+ def testRejected(self):
+ self.testDisposition(type=Disposition.REJECTED, value=RejectValue(Condition(symbol("foo"))))
+
+ def testReleased(self):
+ self.testDisposition(type=Disposition.RELEASED)
+
+ def testModified(self):
+ self.testDisposition(type=Disposition.MODIFIED,
+ value=ModifiedValue(failed=True, undeliverable=True,
+ annotations={"key": "value"}))
+
+ def testCustom(self):
+ self.testDisposition(type=0x12345, value=CustomValue([1, 2, 3]))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org