You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/06/17 19:54:54 UTC

svn commit: r1493859 - in /qpid/proton/trunk: proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/ proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/ proton-c/src/engine/ proton-c/src/messenger/ proton-j/proton-api/src/...

Author: rhs
Date: Mon Jun 17 17:54:54 2013
New Revision: 1493859

URL: http://svn.apache.org/r1493859
Log:
extended the delivery API to work with all delivery states defined by the transport spec as well as custom delivery states. This addresses PROTON-97 and the C portion of PROTON-75

Modified:
    qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-c/src/messenger/store.c
    qpid/proton/trunk/proton-c/src/proton.c
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/tests/python/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java Mon Jun 17 17:54:54 2013
@@ -25,7 +25,7 @@ import org.apache.qpid.proton.engine.Del
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.jni.Proton;
 import org.apache.qpid.proton.jni.SWIGTYPE_p_pn_delivery_t;
-import org.apache.qpid.proton.jni.pn_disposition_t;
+import org.apache.qpid.proton.jni.SWIGTYPE_p_pn_disposition_t;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Received;
@@ -33,6 +33,9 @@ import org.apache.qpid.proton.amqp.messa
 import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 
+import static org.apache.qpid.proton.jni.ProtonConstants.*;
+import java.math.BigInteger;
+
 public class JNIDelivery implements Delivery
 {
     private SWIGTYPE_p_pn_delivery_t _impl;
@@ -121,53 +124,53 @@ public class JNIDelivery implements Deli
         //TODO
     }
 
-    private static pn_disposition_t convertState(DeliveryState state)
+    private static BigInteger convertState(DeliveryState state)
     {
         //TODO - disposition properties conversion
         if(state instanceof Accepted)
         {
-            return pn_disposition_t.PN_ACCEPTED;
+            return BigInteger.valueOf(PN_ACCEPTED);
         }
         else if(state instanceof Rejected)
         {
-            return pn_disposition_t.PN_REJECTED;
+            return BigInteger.valueOf(PN_REJECTED);
         }
         else if(state instanceof Modified)
         {
-            return pn_disposition_t.PN_MODIFIED;
+            return BigInteger.valueOf(PN_MODIFIED);
         }
         else if(state instanceof Received)
         {
-            return pn_disposition_t.PN_RECEIVED;
+            return BigInteger.valueOf(PN_RECEIVED);
         }
         else if(state instanceof Released)
         {
-            return pn_disposition_t.PN_RELEASED;
+            return BigInteger.valueOf(PN_RELEASED);
         }
 
-        return null;
+        return BigInteger.ZERO;
     }
 
-    private static DeliveryState convertDisposition(pn_disposition_t disposition)
+    private static DeliveryState convertDisposition(BigInteger disposition)
     {
         //TODO - disposition properties conversion
-        if(pn_disposition_t.PN_ACCEPTED.equals(disposition))
+        if (BigInteger.valueOf(PN_ACCEPTED).equals(disposition))
         {
             return Accepted.getInstance();
         }
-        else if(pn_disposition_t.PN_REJECTED.equals(disposition))
+        else if(BigInteger.valueOf(PN_REJECTED).equals(disposition))
         {
             return new Rejected();
         }
-        else if(pn_disposition_t.PN_MODIFIED.equals(disposition))
+        else if(BigInteger.valueOf(PN_MODIFIED).equals(disposition))
         {
             return new Modified();
         }
-        else if(pn_disposition_t.PN_MODIFIED.equals(disposition))
+        else if(BigInteger.valueOf(PN_RECEIVED).equals(disposition))
         {
-            return new Modified();
+            return new Received();
         }
-        else if(pn_disposition_t.PN_RELEASED.equals(disposition))
+        else if(BigInteger.valueOf(PN_RELEASED).equals(disposition))
         {
             return new Released();
         }

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Mon Jun 17 17:54:54 2013
@@ -1807,27 +1807,11 @@ class Endpoint(object):
     self.condition = None
 
   def _update_cond(self):
-    impl = self._get_cond_impl()
-    pn_condition_clear(impl)
-    if self.condition:
-      pn_condition_set_name(impl, self.condition.name)
-      pn_condition_set_description(impl, self.condition.description)
-      info = Data(pn_condition_info(impl))
-      if self.condition.info:
-        info.put_object(self.condition.info)
+    obj2cond(self.condition, self._get_cond_impl())
 
   @property
   def remote_condition(self):
-    impl = self._get_remote_cond_impl()
-    if pn_condition_is_set(impl):
-      info_impl = Data(pn_condition_info(impl))
-      info_impl.rewind()
-      info_impl.next()
-      info = info_impl.get_object()
-      info_impl.rewind()
-      return Condition(pn_condition_get_name(impl),
-                       pn_condition_get_description(impl),
-                       info)
+    return cond2obj(self._get_remote_cond_impl())
 
 class Condition:
 
@@ -1847,6 +1831,36 @@ class Condition:
         self.description == o.description and \
         self.info == o.info
 
+def obj2cond(obj, cond):
+  pn_condition_clear(cond)
+  if obj:
+    pn_condition_set_name(cond, str(obj.name))
+    pn_condition_set_description(cond, obj.description)
+    info = Data(pn_condition_info(cond))
+    if obj.info:
+      info.put_object(obj.info)
+
+def cond2obj(cond):
+  if pn_condition_is_set(cond):
+    return Condition(pn_condition_get_name(cond),
+                     pn_condition_get_description(cond),
+                     dat2obj(pn_condition_info(cond)))
+  else:
+    return None
+
+def dat2obj(dimpl):
+  d = Data(dimpl)
+  d.rewind()
+  d.next()
+  obj = d.get_object()
+  d.rewind()
+  return obj
+
+def obj2dat(obj, dimpl):
+  if obj is not None:
+    d = Data(dimpl)
+    d.put_object(obj)
+
 def wrap_connection(conn):
   if not conn: return None
   ctx = pn_connection_get_context(conn)
@@ -1907,37 +1921,24 @@ class Connection(Endpoint):
   def remote_hostname(self):
     return pn_connection_remote_hostname(self._conn)
 
-  def _dat2obj(self, dimpl):
-    d = Data(dimpl)
-    d.rewind()
-    d.next()
-    obj = d.get_object()
-    d.rewind()
-    return obj
-
   @property
   def remote_offered_capabilities(self):
-    return self._dat2obj(pn_connection_remote_offered_capabilities(self._conn))
+    return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
 
   @property
   def remote_desired_capabilities(self):
-    return self._dat2obj(pn_connection_remote_desired_capabilities(self._conn))
+    return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
 
   @property
   def remote_properties(self):
-    return self._dat2obj(pn_connection_remote_properties(self._conn))
-
-  def _obj2dat(self, obj, dimpl):
-    if obj is not None:
-      d = Data(dimpl)
-      d.put_object(obj)
+    return dat2obj(pn_connection_remote_properties(self._conn))
 
   def open(self):
-    self._obj2dat(self.offered_capabilities,
-                  pn_connection_offered_capabilities(self._conn))
-    self._obj2dat(self.desired_capabilities,
-                  pn_connection_desired_capabilities(self._conn))
-    self._obj2dat(self.properties, pn_connection_properties(self._conn))
+    obj2dat(self.offered_capabilities,
+            pn_connection_offered_capabilities(self._conn))
+    obj2dat(self.desired_capabilities,
+            pn_connection_desired_capabilities(self._conn))
+    obj2dat(self.properties, pn_connection_properties(self._conn))
     pn_connection_open(self._conn)
 
   def close(self):
@@ -2259,12 +2260,97 @@ def wrap_delivery(dlv):
   pn_delivery_set_context(dlv, wrapper)
   return wrapper
 
-class Delivery(object):
+class Disposition(object):
 
+  RECEIVED = PN_RECEIVED
   ACCEPTED = PN_ACCEPTED
+  REJECTED = PN_REJECTED
+  RELEASED = PN_RELEASED
+  MODIFIED = PN_MODIFIED
+
+  def __init__(self, impl, local):
+    self._impl = impl
+    self.local = local
+    self._data = None
+    self._condition = None
+    self._annotations = None
+
+  @property
+  def type(self):
+    return pn_disposition_type(self._impl)
+
+  def _get_section_number(self):
+    return pn_disposition_get_section_number(self._impl)
+  def _set_section_number(self, n):
+    pn_disposition_set_section_number(self._impl, n)
+  section_number = property(_get_section_number, _set_section_number)
+
+  def _get_section_offset(self):
+    return pn_disposition_get_section_offset(self._impl)
+  def _set_section_offset(self, n):
+    pn_disposition_set_section_offset(self._impl, n)
+  section_offset = property(_get_section_offset, _set_section_offset)
+
+  def _get_failed(self):
+    return pn_disposition_is_failed(self._impl)
+  def _set_failed(self, b):
+    pn_disposition_set_failed(self._impl, b)
+  failed = property(_get_failed, _set_failed)
+
+  def _get_undeliverable(self):
+    return pn_disposition_is_undeliverable(self._impl)
+  def _set_undeliverable(self, b):
+    pn_disposition_set_undeliverable(self._impl, b)
+  undeliverable = property(_get_undeliverable, _set_undeliverable)
+
+  def _get_data(self):
+    if self.local:
+      return self._data
+    else:
+      return dat2obj(pn_disposition_data(self._impl))
+  def _set_data(self, obj):
+    if self.local:
+      self._data = obj
+    else:
+      raise AttributeError("data attribute is read-only")
+  data = property(_get_data, _set_data)
+
+  def _get_annotations(self):
+    if self.local:
+      return self._annotations
+    else:
+      return dat2obj(pn_disposition_annotations(self._impl))
+  def _set_annotations(self, obj):
+    if self.local:
+      self._annotations = obj
+    else:
+      raise AttributeError("annotations attribute is read-only")
+  annotations = property(_get_annotations, _set_annotations)
+
+  def _get_condition(self):
+    if self.local:
+      return self._condition
+    else:
+      return cond2obj(pn_disposition_condition(self._impl))
+  def _set_condition(self, obj):
+    if self.local:
+      self._condition = obj
+    else:
+      raise AttributeError("condition attribute is read-only")
+  condition = property(_get_condition, _set_condition)
+
+class Delivery(object):
+
+  RECEIVED = Disposition.RECEIVED
+  ACCEPTED = Disposition.ACCEPTED
+  REJECTED = Disposition.REJECTED
+  RELEASED = Disposition.RELEASED
+  MODIFIED = Disposition.MODIFIED
 
   def __init__(self, dlv):
     self._dlv = dlv
+    self.local = Disposition(pn_delivery_local(self._dlv), True)
+    self.remote = Disposition(pn_delivery_remote(self._dlv), False)
 
   @property
   def tag(self):
@@ -2283,6 +2369,9 @@ class Delivery(object):
     return pn_delivery_updated(self._dlv)
 
   def update(self, state):
+    obj2dat(self.local._data, pn_disposition_data(self.local._impl))
+    obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
+    obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
     pn_delivery_update(self._dlv, state)
 
   @property
@@ -2316,7 +2405,6 @@ class Delivery(object):
   def link(self):
     return wrap_link(pn_delivery_link(self._dlv))
 
-
 class TransportException(ProtonException):
   pass
 
@@ -2714,6 +2802,7 @@ __all__ = [
            "Connector",
            "Data",
            "Delivery",
+           "Disposition",
            "Described",
            "Driver",
            "DriverException",

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Mon Jun 17 17:54:54 2013
@@ -65,6 +65,8 @@ typedef enum {
   PN_CONNECTION_CLOSE,
   PN_NEVER
 } pn_expiry_policy_t;
+
+typedef struct pn_disposition_t pn_disposition_t;
 typedef struct pn_delivery_t pn_delivery_t;
 
 typedef struct pn_delivery_tag_t {
@@ -91,18 +93,17 @@ typedef int pn_state_t;     /**< encodes
 #define PN_LOCAL_MASK (PN_LOCAL_UNINIT | PN_LOCAL_ACTIVE | PN_LOCAL_CLOSED)
 #define PN_REMOTE_MASK (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED)
 
-/** @enum pn_disposition_t
+/**
  * The state/outcome of a message transfer.
  *
  * @todo document each value
  */
-typedef enum pn_disposition_t {
-  PN_RECEIVED=1,
-  PN_ACCEPTED=2,
-  PN_REJECTED=3,
-  PN_RELEASED=4,
-  PN_MODIFIED=5
-} pn_disposition_t;
+
+#define PN_RECEIVED (0x0000000000000023)
+#define PN_ACCEPTED (0x0000000000000024)
+#define PN_REJECTED (0x0000000000000025)
+#define PN_RELEASED (0x0000000000000026)
+#define PN_MODIFIED (0x0000000000000027)
 
 typedef int pn_trace_t;
 
@@ -489,15 +490,17 @@ PN_EXTERN pn_delivery_t *pn_delivery(pn_
 PN_EXTERN pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery);
 PN_EXTERN pn_link_t *pn_delivery_link(pn_delivery_t *delivery);
 // how do we do delivery state?
-PN_EXTERN pn_disposition_t pn_delivery_local_state(pn_delivery_t *delivery);
-PN_EXTERN pn_disposition_t pn_delivery_remote_state(pn_delivery_t *delivery);
+PN_EXTERN pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery);
+PN_EXTERN uint64_t pn_delivery_local_state(pn_delivery_t *delivery);
+PN_EXTERN pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery);
+PN_EXTERN uint64_t pn_delivery_remote_state(pn_delivery_t *delivery);
 PN_EXTERN bool pn_delivery_settled(pn_delivery_t *delivery);
 PN_EXTERN size_t pn_delivery_pending(pn_delivery_t *delivery);
 PN_EXTERN bool pn_delivery_partial(pn_delivery_t *delivery);
 PN_EXTERN bool pn_delivery_writable(pn_delivery_t *delivery);
 PN_EXTERN bool pn_delivery_readable(pn_delivery_t *delivery);
 PN_EXTERN bool pn_delivery_updated(pn_delivery_t *delivery);
-PN_EXTERN void pn_delivery_update(pn_delivery_t *delivery, pn_disposition_t disposition);
+PN_EXTERN void pn_delivery_update(pn_delivery_t *delivery, uint64_t state);
 PN_EXTERN void pn_delivery_clear(pn_delivery_t *delivery);
 //int pn_delivery_format(pn_delivery_t *delivery);
 PN_EXTERN void pn_delivery_settle(pn_delivery_t *delivery);
@@ -505,6 +508,20 @@ PN_EXTERN void pn_delivery_dump(pn_deliv
 PN_EXTERN void *pn_delivery_get_context(pn_delivery_t *delivery);
 PN_EXTERN void pn_delivery_set_context(pn_delivery_t *delivery, void *context);
 
+// disposition
+PN_EXTERN uint64_t pn_disposition_type(pn_disposition_t *disposition);
+PN_EXTERN pn_data_t *pn_disposition_data(pn_disposition_t *disposition);
+PN_EXTERN uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition);
+PN_EXTERN void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number);
+PN_EXTERN uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition);
+PN_EXTERN void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset);
+PN_EXTERN bool pn_disposition_is_failed(pn_disposition_t *disposition);
+PN_EXTERN void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed);
+PN_EXTERN bool pn_disposition_is_undeliverable(pn_disposition_t *disposition);
+PN_EXTERN void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable);
+PN_EXTERN pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition);
+
+// conditions
 PN_EXTERN pn_condition_t *pn_connection_condition(pn_connection_t *connection);
 PN_EXTERN pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection);
 
@@ -514,6 +531,8 @@ PN_EXTERN pn_condition_t *pn_session_rem
 PN_EXTERN pn_condition_t *pn_link_condition(pn_link_t *link);
 PN_EXTERN pn_condition_t *pn_link_remote_condition(pn_link_t *link);
 
+PN_EXTERN pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition);
+
 PN_EXTERN bool pn_condition_is_set(pn_condition_t *condition);
 PN_EXTERN void pn_condition_clear(pn_condition_t *condition);
 

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Mon Jun 17 17:54:54 2013
@@ -127,6 +127,7 @@ struct pn_transport_t {
   pn_data_t *remote_offered_capabilities;
   pn_data_t *remote_desired_capabilities;
   pn_data_t *remote_properties;
+  pn_data_t *disp_data;
   //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024)
 #define PN_DEFAULT_MAX_FRAME_SIZE (0)  /* for now, allow unlimited size */
   uint32_t   local_max_frame;
@@ -240,13 +241,23 @@ struct pn_link_t {
   pn_link_state_t state;
 };
 
+struct pn_disposition_t {
+  uint64_t type;
+  pn_data_t *data;
+  pn_data_t *annotations;
+  pn_condition_t condition;
+  uint32_t section_number;
+  uint64_t section_offset;
+  bool failed;
+  bool undeliverable;
+  bool settled;
+};
+
 struct pn_delivery_t {
   pn_link_t *link;
   pn_buffer_t *tag;
-  int local_state;
-  int remote_state;
-  bool local_settled;
-  bool remote_settled;
+  pn_disposition_t local;
+  pn_disposition_t remote;
   bool updated;
   bool settled; // tracks whether we're in the unsettled list or not
   pn_delivery_t *unsettled_next;

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Jun 17 17:54:54 2013
@@ -188,6 +188,7 @@ void pn_transport_free(pn_transport_t *t
   pn_free(transport->remote_offered_capabilities);
   pn_free(transport->remote_desired_capabilities);
   pn_free(transport->remote_properties);
+  pn_free(transport->disp_data);
   pn_error_free(transport->error);
   pn_condition_tini(&transport->remote_condition);
   pn_free(transport->local_channels);
@@ -472,7 +473,7 @@ void pn_work_update(pn_connection_t *con
 {
   pn_link_t *link = pn_delivery_link(delivery);
   pn_delivery_t *current = pn_link_current(link);
-  if (delivery->updated && !delivery->local_settled) {
+  if (delivery->updated && !delivery->local.settled) {
     pn_add_work(connection, delivery);
   } else if (delivery == current) {
     if (link->endpoint.type == SENDER) {
@@ -787,6 +788,7 @@ void pn_transport_init(pn_transport_t *t
   transport->remote_offered_capabilities = pn_data(16);
   transport->remote_desired_capabilities = pn_data(16);
   transport->remote_properties = pn_data(16);
+  transport->disp_data = pn_data(16);
   transport->error = pn_error();
   pn_condition_init(&transport->remote_condition);
 
@@ -1162,11 +1164,40 @@ pn_session_t *pn_link_session(pn_link_t 
   return link->session;
 }
 
+static void pn_disposition_finalize(pn_disposition_t *ds)
+{
+  pn_free(ds->data);
+  pn_free(ds->annotations);
+  pn_condition_tini(&ds->condition);
+}
+
 static void pn_delivery_finalize(void *object)
 {
   pn_delivery_t *delivery = (pn_delivery_t *) object;
   pn_buffer_free(delivery->tag);
   pn_buffer_free(delivery->bytes);
+  pn_disposition_finalize(&delivery->local);
+  pn_disposition_finalize(&delivery->remote);
+}
+
+static void pn_disposition_init(pn_disposition_t *ds)
+{
+  ds->data = pn_data(16);
+  ds->annotations = pn_data(16);
+  pn_condition_init(&ds->condition);
+}
+
+static void pn_disposition_clear(pn_disposition_t *ds)
+{
+  ds->type = 0;
+  ds->section_number = 0;
+  ds->section_offset = 0;
+  ds->failed = false;
+  ds->undeliverable = false;
+  ds->settled = false;
+  pn_data_clear(ds->data);
+  pn_data_clear(ds->annotations);
+  pn_condition_clear(&ds->condition);
 }
 
 pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
@@ -1180,16 +1211,16 @@ pn_delivery_t *pn_delivery(pn_link_t *li
     if (!delivery) return NULL;
     delivery->tag = pn_buffer(16);
     delivery->bytes = pn_buffer(64);
+    pn_disposition_init(&delivery->local);
+    pn_disposition_init(&delivery->remote);
   } else {
     assert(!delivery->tpwork);
   }
   delivery->link = link;
   pn_buffer_clear(delivery->tag);
   pn_buffer_append(delivery->tag, tag.bytes, tag.size);
-  delivery->local_state = 0;
-  delivery->remote_state = 0;
-  delivery->local_settled = false;
-  delivery->remote_settled = false;
+  pn_disposition_clear(&delivery->local);
+  pn_disposition_clear(&delivery->remote);
   delivery->updated = false;
   delivery->settled = false;
   LL_ADD(link, unsettled, delivery);
@@ -1225,7 +1256,7 @@ int pn_link_unsettled(pn_link_t *link)
 pn_delivery_t *pn_unsettled_head(pn_link_t *link)
 {
   pn_delivery_t *d = link->unsettled_head;
-  while (d && d->local_settled) {
+  while (d && d->local.settled) {
     d = d->unsettled_next;
   }
   return d;
@@ -1234,7 +1265,7 @@ pn_delivery_t *pn_unsettled_head(pn_link
 pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery)
 {
   pn_delivery_t *d = delivery->unsettled_next;
-  while (d && d->local_settled) {
+  while (d && d->local.settled) {
     d = d->unsettled_next;
   }
   return d;
@@ -1251,11 +1282,11 @@ void pn_delivery_dump(pn_delivery_t *d)
   char tag[1024];
   pn_bytes_t bytes = pn_buffer_bytes(d->tag);
   pn_quote_data(tag, 1024, bytes.start, bytes.size);
-  printf("{tag=%s, local_state=%u, remote_state=%u, local_settled=%u, "
-         "remote_settled=%u, updated=%u, current=%u, writable=%u, readable=%u, "
+  printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%u, "
+         "remote.settled=%u, updated=%u, current=%u, writable=%u, readable=%u, "
          "work=%u}",
-         tag, d->local_state, d->remote_state, d->local_settled,
-         d->remote_settled, d->updated, pn_is_current(d),
+         tag, d->local.type, d->remote.type, d->local.settled,
+         d->remote.settled, d->updated, pn_is_current(d),
          pn_delivery_writable(d), pn_delivery_readable(d), d->work);
 }
 
@@ -1271,6 +1302,122 @@ void pn_delivery_set_context(pn_delivery
   delivery->context = context;
 }
 
+uint64_t pn_disposition_type(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->type;
+}
+
+pn_data_t *pn_disposition_data(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->data;
+}
+
+uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->section_number;
+}
+
+void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number)
+{
+  assert(disposition);
+  disposition->section_number = section_number;
+}
+
+uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->section_offset;
+}
+
+void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset)
+{
+  assert(disposition);
+  disposition->section_offset = section_offset;
+}
+
+bool pn_disposition_is_failed(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->failed;
+}
+
+void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed)
+{
+  assert(disposition);
+  disposition->failed = failed;
+}
+
+bool pn_disposition_is_undeliverable(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->undeliverable;
+}
+
+void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable)
+{
+  assert(disposition);
+  disposition->undeliverable = undeliverable;
+}
+
+pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->annotations;
+}
+
+pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return &disposition->condition;
+}
+
+bool pni_disposition_batchable(pn_disposition_t *disposition)
+{
+  switch (disposition->type) {
+  case PN_ACCEPTED:
+    return true;
+  case PN_RELEASED:
+    return true;
+  default:
+    return false;
+  }
+}
+
+void pni_disposition_encode(pn_disposition_t *disposition, pn_data_t *data)
+{
+  pn_condition_t *cond = &disposition->condition;
+  switch (disposition->type) {
+  case PN_RECEIVED:
+    pn_data_put_list(data);
+    pn_data_enter(data);
+    pn_data_put_uint(data, disposition->section_number);
+    pn_data_put_ulong(data, disposition->section_offset);
+    pn_data_exit(data);
+    break;
+  case PN_ACCEPTED:
+  case PN_RELEASED:
+    return;
+  case PN_REJECTED:
+    pn_data_fill(data, "[?DL[sSC]]", pn_condition_is_set(cond), ERROR,
+                 pn_condition_get_name(cond),
+                 pn_condition_get_description(cond),
+                 pn_condition_info(cond));
+    break;
+  case PN_MODIFIED:
+    pn_data_fill(data, "[ooC]",
+                 disposition->failed,
+                 disposition->undeliverable,
+                 disposition->annotations);
+    break;
+  default:
+    pn_data_copy(data, disposition->data);
+    break;
+  }
+}
+
 pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
 {
   if (delivery) {
@@ -1377,7 +1524,7 @@ void pn_delivery_settle(pn_delivery_t *d
   }
 
   link->unsettled_count--;
-  delivery->local_settled = true;
+  delivery->local.settled = true;
   pn_add_tpwork(delivery);
   pn_work_update(delivery->link->session->connection, delivery);
 }
@@ -1664,8 +1811,8 @@ int pn_do_transfer(pn_dispatcher_t *disp
     link->state.link_credit--;
     link->queued++;
 
-    // XXX: need to fill in remote state: delivery->remote_state = ...;
-    delivery->remote_settled = settled;
+    // XXX: need to fill in remote state: delivery->remote.state = ...;
+    delivery->remote.settled = settled;
     delivery->updated = true;
     pn_work_update(transport->connection, delivery);
   }
@@ -1734,36 +1881,38 @@ int pn_do_flow(pn_dispatcher_t *disp)
   return 0;
 }
 
+#define SCAN_ERROR_DEFAULT ("D.[D.[sSC]")
+#define SCAN_ERROR_DETACH ("D.[..D.[sSC]")
+#define SCAN_ERROR_DISP ("[D.[sSC]")
+
+static int pn_scan_error(pn_data_t *data, pn_condition_t *condition, const char *fmt)
+{
+  pn_bytes_t cond;
+  pn_bytes_t desc;
+  pn_condition_clear(condition);
+  int err = pn_data_scan(data, fmt, &cond, &desc, condition->info);
+  if (err) return err;
+  strncat(condition->name, cond.start, cond.size);
+  strncat(condition->description, desc.start, desc.size);
+  pn_data_rewind(condition->info);
+  return 0;
+}
+
 int pn_do_disposition(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = (pn_transport_t *) disp->context;
   bool role;
   pn_sequence_t first, last;
-  uint64_t code;
-  bool last_init, settled, code_init;
-  int err = pn_scan_args(disp, "D.[oI?IoD?L[]]", &role, &first, &last_init,
-                         &last, &settled, &code_init, &code);
+  uint64_t type = 0;
+  bool last_init, settled, type_init;
+  pn_data_clear(transport->disp_data);
+  int err = pn_scan_args(disp, "D.[oI?IoD?LC]", &role, &first, &last_init,
+                         &last, &settled, &type_init, &type,
+                         transport->disp_data);
   if (err) return err;
   if (!last_init) last = first;
 
   pn_session_t *ssn = pn_channel_state(transport, disp->channel);
-  pn_disposition_t dispo = (pn_disposition_t) 0;
-  if (code_init) {
-    switch (code)
-    {
-    case ACCEPTED:
-      dispo = PN_ACCEPTED;
-      break;
-    case REJECTED:
-      dispo = PN_REJECTED;
-      break;
-    default:
-      // XXX
-      fprintf(stderr, "default %" PRIu64 "\n", code);
-      break;
-    }
-  }
-
   pn_delivery_map_t *deliveries;
   if (role) {
     deliveries = &ssn->state.outgoing;
@@ -1771,11 +1920,53 @@ int pn_do_disposition(pn_dispatcher_t *d
     deliveries = &ssn->state.incoming;
   }
 
+  pn_data_rewind(transport->disp_data);
+  bool remote_data = (pn_data_next(transport->disp_data) &&
+                      pn_data_get_list(transport->disp_data) > 0);
+
   for (pn_sequence_t id = first; id <= last; id++) {
     pn_delivery_t *delivery = pn_delivery_map_get(deliveries, id);
+    pn_disposition_t *remote = &delivery->remote;
     if (delivery) {
-      delivery->remote_state = dispo;
-      delivery->remote_settled = settled;
+      if (type_init) remote->type = type;
+      if (remote_data) {
+        switch (type) {
+        case PN_RECEIVED:
+          pn_data_rewind(transport->disp_data);
+          pn_data_next(transport->disp_data);
+          pn_data_enter(transport->disp_data);
+          if (pn_data_next(transport->disp_data))
+            remote->section_number = pn_data_get_uint(transport->disp_data);
+          if (pn_data_next(transport->disp_data))
+            remote->section_offset = pn_data_get_ulong(transport->disp_data);
+          break;
+        case PN_ACCEPTED:
+          break;
+        case PN_REJECTED:
+          err = pn_scan_error(transport->disp_data, &remote->condition, SCAN_ERROR_DISP);
+          if (err) return err;
+          break;
+        case PN_RELEASED:
+          break;
+        case PN_MODIFIED:
+          pn_data_rewind(transport->disp_data);
+          pn_data_next(transport->disp_data);
+          pn_data_enter(transport->disp_data);
+          if (pn_data_next(transport->disp_data))
+            remote->failed = pn_data_get_bool(transport->disp_data);
+          if (pn_data_next(transport->disp_data))
+            remote->undeliverable = pn_data_get_bool(transport->disp_data);
+          pn_data_narrow(transport->disp_data);
+          pn_data_clear(remote->data);
+          pn_data_appendn(remote->annotations, transport->disp_data, 1);
+          pn_data_widen(transport->disp_data);
+          break;
+        default:
+          pn_data_copy(remote->data, transport->disp_data);
+          break;
+        }
+      }
+      remote->settled = settled;
       delivery->updated = true;
       pn_work_update(transport->connection, delivery);
     }
@@ -1784,20 +1975,6 @@ int pn_do_disposition(pn_dispatcher_t *d
   return 0;
 }
 
-static int pn_scan_error(pn_dispatcher_t *disp, pn_condition_t *condition, bool detach)
-{
-  pn_bytes_t cond;
-  pn_bytes_t desc;
-  pn_condition_clear(condition);
-  int err = pn_scan_args(disp, detach ? "D.[..D.[sSC]" : "D.[D.[sSC]",
-                         &cond, &desc, condition->info);
-  if (err) return err;
-  strncat(condition->name, cond.start, cond.size);
-  strncat(condition->description, desc.start, desc.size);
-  pn_data_rewind(condition->info);
-  return 0;
-}
-
 int pn_do_detach(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = (pn_transport_t *) disp->context;
@@ -1812,7 +1989,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
   }
   pn_link_t *link = pn_handle_state(ssn, handle);
 
-  err = pn_scan_error(disp, &link->endpoint.remote_condition, true);
+  err = pn_scan_error(disp->args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH);
   if (err) return err;
 
   pn_unmap_handle(ssn, link);
@@ -1831,7 +2008,7 @@ int pn_do_end(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = (pn_transport_t *) disp->context;
   pn_session_t *ssn = pn_channel_state(transport, disp->channel);
-  int err = pn_scan_error(disp, &ssn->endpoint.remote_condition, false);
+  int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT);
   if (err) return err;
   pn_unmap_channel(transport, ssn);
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
@@ -1842,7 +2019,7 @@ int pn_do_close(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = (pn_transport_t *) disp->context;
   pn_connection_t *conn = transport->connection;
-  int err = pn_scan_error(disp, &transport->remote_condition, false);
+  int err = pn_scan_error(disp->args, &transport->remote_condition, SCAN_ERROR_DEFAULT);
   if (err) return err;
   transport->close_rcvd = true;
   PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
@@ -2208,29 +2385,25 @@ int pn_post_disp(pn_transport_t *transpo
   pn_modified(transport->connection, &link->session->endpoint);
   pn_delivery_state_t *state = &delivery->state;
   assert(state->init);
-  uint64_t code;
-  switch(delivery->local_state) {
-  case PN_ACCEPTED:
-    code = ACCEPTED;
-    break;
-  case PN_RELEASED:
-    code = RELEASED;
-    break;
-  case PN_REJECTED:
-    code = REJECTED;
-    break;
-    //TODO: rejected and modified (both take extra data which may need to be passed through somehow) e.g. change from enum to discriminated union?
-  default:
-    code = 0;
-  }
+  bool role = (link->endpoint.type == RECEIVER);
+  uint64_t code = delivery->local.type;
 
-  if (!code && !delivery->local_settled) {
+  if (!code && !delivery->local.settled) {
     return 0;
   }
 
+  if (!pni_disposition_batchable(&delivery->local)) {
+    pn_data_clear(transport->disp_data);
+    pni_disposition_encode(&delivery->local, transport->disp_data);
+    return pn_post_frame(transport->disp, ssn->state.local_channel,
+                         "DL[oIIo?DLC]", DISPOSITION,
+                         role, state->id, state->id, delivery->local.settled,
+                         (bool)code, code, transport->disp_data);
+  }
+
   if (ssn_state->disp && code == ssn_state->disp_code &&
-      delivery->local_settled == ssn_state->disp_settled &&
-      ssn_state->disp_type == (link->endpoint.type == RECEIVER)) {
+      delivery->local.settled == ssn_state->disp_settled &&
+      ssn_state->disp_type == role) {
     if (state->id == ssn_state->disp_first - 1) {
       ssn_state->disp_first = state->id;
       return 0;
@@ -2245,9 +2418,9 @@ int pn_post_disp(pn_transport_t *transpo
     if (err) return err;
   }
 
-  ssn_state->disp_type = (link->endpoint.type == RECEIVER);
+  ssn_state->disp_type = role;
   ssn_state->disp_code = code;
-  ssn_state->disp_settled = delivery->local_settled;
+  ssn_state->disp_settled = delivery->local.settled;
   ssn_state->disp_first = state->id;
   ssn_state->disp_last = state->id;
   ssn_state->disp = true;
@@ -2280,7 +2453,7 @@ int pn_process_tpwork_sender(pn_transpor
                                          link_state->local_handle,
                                          state->id, &tag,
                                          0, // message-format
-                                         delivery->local_settled,
+                                         delivery->local.settled,
                                          !delivery->done,
                                          ssn_state->remote_incoming_window);
       if (count < 0) return count;
@@ -2300,13 +2473,13 @@ int pn_process_tpwork_sender(pn_transpor
   }
 
   pn_delivery_state_t *state = delivery->state.init ? &delivery->state : NULL;
-  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled
+  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote.settled
       && state && state->sent && !xfr_posted) {
     int err = pn_post_disp(transport, delivery);
     if (err) return err;
   }
 
-  if (delivery->local_settled && state && state->sent) {
+  if (delivery->local.settled && state && state->sent) {
     pn_full_settle(&ssn_state->outgoing, delivery);
   }
 
@@ -2318,12 +2491,12 @@ int pn_process_tpwork_receiver(pn_transp
   pn_link_t *link = delivery->link;
   // XXX: need to prevent duplicate disposition sending
   pn_session_t *ssn = link->session;
-  if ((int16_t) ssn->state.local_channel >= 0 && !delivery->remote_settled && delivery->state.init) {
+  if ((int16_t) ssn->state.local_channel >= 0 && !delivery->remote.settled && delivery->state.init) {
     int err = pn_post_disp(transport, delivery);
     if (err) return err;
   }
 
-  if (delivery->local_settled) {
+  if (delivery->local.settled) {
     pn_full_settle(&ssn->state.incoming, delivery);
   }
 
@@ -2796,19 +2969,33 @@ pn_link_t *pn_delivery_link(pn_delivery_
   return delivery->link;
 }
 
-pn_disposition_t pn_delivery_local_state(pn_delivery_t *delivery)
+pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  return &delivery->local;
+}
+
+uint64_t pn_delivery_local_state(pn_delivery_t *delivery)
 {
-  return (pn_disposition_t) delivery->local_state;
+  assert(delivery);
+  return delivery->local.type;
 }
 
-pn_disposition_t pn_delivery_remote_state(pn_delivery_t *delivery)
+pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery)
 {
-  return (pn_disposition_t) delivery->remote_state;
+  assert(delivery);
+  return &delivery->remote;
+}
+
+uint64_t pn_delivery_remote_state(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  return delivery->remote.type;
 }
 
 bool pn_delivery_settled(pn_delivery_t *delivery)
 {
-  return delivery ? delivery->remote_settled : false;
+  return delivery ? delivery->remote.settled : false;
 }
 
 bool pn_delivery_updated(pn_delivery_t *delivery)
@@ -2822,10 +3009,10 @@ void pn_delivery_clear(pn_delivery_t *de
   pn_work_update(delivery->link->session->connection, delivery);
 }
 
-void pn_delivery_update(pn_delivery_t *delivery, pn_disposition_t disposition)
+void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
 {
   if (!delivery) return;
-  delivery->local_state = disposition;
+  delivery->local.type = state;
   pn_add_tpwork(delivery);
 }
 

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Mon Jun 17 17:54:54 2013
@@ -1138,7 +1138,7 @@ pn_status_t pn_messenger_status(pn_messe
 int pn_messenger_settle(pn_messenger_t *messenger, pn_tracker_t tracker, int flags)
 {
   pni_store_t *store = pn_tracker_store(messenger, tracker);
-  return pni_store_update(store, pn_tracker_sequence(tracker), (pn_status_t) 0, flags, true, true);
+  return pni_store_update(store, pn_tracker_sequence(tracker), PN_STATUS_UNKNOWN, flags, true, true);
 }
 
 // true if all pending output has been sent to peer
@@ -1290,7 +1290,7 @@ int pn_messenger_accept(pn_messenger_t *
   }
 
   return pni_store_update(messenger->incoming, pn_tracker_sequence(tracker),
-                          (pn_status_t) PN_ACCEPTED, flags, false, false);
+                          PN_STATUS_ACCEPTED, flags, false, false);
 }
 
 int pn_messenger_reject(pn_messenger_t *messenger, pn_tracker_t tracker, int flags)
@@ -1301,7 +1301,7 @@ int pn_messenger_reject(pn_messenger_t *
   }
 
   return pni_store_update(messenger->incoming, pn_tracker_sequence(tracker),
-                          (pn_status_t) PN_REJECTED, flags, false, false);
+                          PN_STATUS_REJECTED, flags, false, false);
 }
 
 int pn_messenger_queued(pn_messenger_t *messenger, bool sender)

Modified: qpid/proton/trunk/proton-c/src/messenger/store.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.c?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/store.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/store.c Mon Jun 17 17:54:54 2013
@@ -282,7 +282,7 @@ void *pni_entry_get_context(pni_entry_t 
   return entry->context;
 }
 
-static pn_status_t disp2status(pn_disposition_t disp)
+static pn_status_t disp2status(uint64_t disp)
 {
   if (!disp) return PN_STATUS_PENDING;
 

Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Mon Jun 17 17:54:54 2013
@@ -216,7 +216,7 @@ void server_callback(pn_connector_t *cto
     }
 
     if (pn_delivery_updated(delivery)) {
-      if (!ctx->quiet) printf("disposition for %s: %u\n", tagstr, pn_delivery_remote_state(delivery));
+      if (!ctx->quiet) printf("disposition for %s: %" PRIu64 "\n", tagstr, pn_delivery_remote_state(delivery));
       pn_delivery_settle(delivery);
     }
 
@@ -377,7 +377,7 @@ void client_callback(pn_connector_t *cto
     }
 
     if (pn_delivery_updated(delivery)) {
-      if (!ctx->quiet) printf("disposition for %s: %u\n", tagstr, pn_delivery_remote_state(delivery));
+      if (!ctx->quiet) printf("disposition for %s: %" PRIu64 "\n", tagstr, pn_delivery_remote_state(delivery));
       pn_delivery_clear(delivery);
       pn_delivery_settle(delivery);
       if (!--ctx->send_count) {

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Mon Jun 17 17:54:54 2013
@@ -34,7 +34,9 @@ from org.apache.qpid.proton.message impo
 from org.apache.qpid.proton.codec import \
     DataFactory, Data as JData
 from org.apache.qpid.proton.messenger import MessengerFactory, MessengerException, Status
-from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted, AmqpValue
+from org.apache.qpid.proton.amqp.transport import ErrorCondition
+from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted, \
+    Rejected, Received, Modified, Released, AmqpValue
 from org.apache.qpid.proton.amqp import UnsignedInteger, UnsignedLong, UnsignedByte, UnsignedShort, Symbol, \
     Decimal32, Decimal64, Decimal128
 from jarray import zeros, array
@@ -147,26 +149,37 @@ class Endpoint(object):
 
   def close(self):
     if self.condition is not None:
-        if self.condition.impl is None:
-          self.condition.impl = self.impl.getCondition()
-        self.condition.impl.setCondition(Symbol.getSymbol(self.condition.name))
-        self.condition.impl.setDescription(self.condition.description)
-        self.condition.impl.setInfo(self.condition.info)
+      self.impl.setCondition(self.condition.impl)
     self.impl.close()
 
-class Condition:
+class Condition(object):
 
   def __init__(self, name=None, description=None, info=None, impl=None):
-    self.name = name
-    self.description = description
-    self.info = info
-    if impl is not None:
-      self.impl = impl
-      self.name = impl.getCondition()
-      self.description = impl.getDescription()
-      self.info = impl.getInfo()
-    else:
-      self.impl = None
+    if impl is None:
+      impl = ErrorCondition(Symbol.valueOf(name), description)
+      if info is not None:
+        impl.setInfo(info)
+    self.impl = impl
+
+  def _get_name(self):
+    c = self.impl.getCondition()
+    if c is not None:
+      return c.toString()
+  def _set_name(self, n):
+    self.impl.setCondition(Symbol.valueOf(n))
+  name = property(_get_name, _set_name)
+
+  def _get_description(self):
+    return self.impl.getDescription()
+  def _set_description(self, d):
+    self.impl.setDescription(d)
+  description = property(_get_description, _set_description)
+
+  def _get_info(self):
+    return self.impl.getInfo()
+  def _set_info(self, i):
+    self.impl.setInfo(i)
+  info = property(_get_info, _get_description)
 
   def __repr__(self):
     return "Condition(%s)" % ", ".join([repr(x) for x in
@@ -175,10 +188,10 @@ class Condition:
 
   def __eq__(self, o):
     if not isinstance(o, Condition): return False
-    return (self.impl is not None and o.impl is not None and self.impl.equals(o.impl)) or \
-        (self.name == o.name and \
-        self.description == o.description and \
-        self.info == o.info)
+    return self.impl.equals(o.impl)
+
+  def _2J(self):
+    return self.impl
 
 def wrap_connection(impl):
   if impl: return Connection(_impl = impl)
@@ -410,19 +423,175 @@ class Receiver(Link):
     else:
       raise Exception(n)
 
+class Disposition(object):
+
+  RECEIVED = 0x23
+  ACCEPTED = 0x24
+  REJECTED = 0x25
+  RELEASED = 0x26
+  MODIFIED = 0x27
+
+  def __init__(self):
+    self.type = 0
+    self._received = None
+    self._accepted = None
+    self._rejected = None
+    self._released = None
+    self._modified = None
+
+  def _get_section_number(self):
+    if self._received:
+      return J2PY(self._received.getSectionNumber())
+    else:
+      return 0
+  def _set_section_number(self, n):
+    if not self._received:
+      self._received = Received()
+    self._received.setSectionNumber(UnsignedInteger(n))
+  section_number = property(_get_section_number, _set_section_number)
+
+  def _get_section_offset(self):
+    if self._received:
+      return J2PY(self._received.getSectionOffset())
+    else:
+      return 0
+  def _set_section_offset(self, n):
+    if not self._received:
+      self._received = Received()
+    self._received.setSectionOffset(UnsignedLong(n))
+  section_offset = property(_get_section_offset, _set_section_offset)
+
+  def _get_failed(self):
+    if self._modified:
+      return self._modified.getDeliveryFailed()
+    else:
+      return False
+  def _set_failed(self, b):
+    if not self._modified:
+      self._modified = Modified()
+    self._modified.setDeliveryFailed(b)
+  failed = property(_get_failed, _set_failed)
+
+  def _get_undeliverable(self):
+    if self._modified:
+      return self._modified.getUndeliverableHere()
+    else:
+      return False
+  def _set_undeliverable(self, b):
+    if not self._modified:
+      self._modified = Modified()
+    self._modified.setUndeliverableHere(b)
+  undeliverable = property(_get_undeliverable, _set_undeliverable)
+
+  def _get_data(self):
+    return None
+  def _set_data(self, obj):
+    raise Skipped()
+  data = property(_get_data, _set_data)
+
+  def _get_annotations(self):
+    if self._modified:
+      return J2PY(self._modified.getMessageAnnotations())
+    else:
+      return None
+  def _set_annotations(self, obj):
+    if not self._modified:
+      self._modified = Modified()
+    self._modified.setMessageAnnotations(PY2J(obj))
+  annotations = property(_get_annotations, _set_annotations)
+
+  def _get_condition(self):
+    if self._rejected:
+      return Condition(impl = self._rejected.getError())
+    else:
+      return None
+  def _set_condition(self, obj):
+    if not self._rejected:
+      self._rejected = Rejected()
+    self._rejected.setError(obj._2J())
+  condition = property(_get_condition, _set_condition)
+
+  def _as_received(self):
+    if self._received is None:
+      self._received = Received()
+    return self._received
+
+  def _as_accepted(self):
+    if self._accepted is None:
+      self._accepted = Accepted.getInstance()
+    return self._accepted
+
+  def _as_rejected(self):
+    if self._rejected is None:
+      self._rejected = Rejected()
+    return self._rejected
+
+  def _as_released(self):
+    if self._released is None:
+      self._released = Released.getInstance()
+    return self._released
+
+  def _as_modified(self):
+    if self._modified is None:
+      self._modified = Modified()
+    return self._modified
+
+  PY2J = {
+    RECEIVED: _as_received,
+    ACCEPTED: _as_accepted,
+    REJECTED: _as_rejected,
+    RELEASED: _as_released,
+    MODIFIED: _as_modified
+  }
+
+  def _2J(self):
+    return self.PY2J[self.type](self)
+
+  def _from_received(self, s):
+    self.type = self.RECEIVED
+    self._received = s
+
+  def _from_accepted(self, s):
+    self.type = self.ACCEPTED
+    self._accepted = s
+
+  def _from_rejected(self, s):
+    self.type = self.REJECTED
+    self._rejected = s
+
+  def _from_released(self, s):
+    self.type = self.RELEASED
+    self._released = s
+
+  def _from_modified(self, s):
+    self.type = self.MODIFIED
+    self._modified = s
+
+  J2PY = {
+    Received: _from_received,
+    Accepted: _from_accepted,
+    Rejected: _from_rejected,
+    Released: _from_released,
+    Modified: _from_modified
+    }
+
+  def _2PY(self, impl):
+    self.J2PY[type(impl)](self, impl)
+
 def wrap_delivery(impl):
   if impl: return Delivery(impl)
 
 class Delivery(object):
 
-  RECEIVED = 1
-  ACCEPTED = 2
-  REJECTED = 3
-  RELEASED = 4
-  MODIFIED = 5
+  RECEIVED = Disposition.RECEIVED
+  ACCEPTED = Disposition.ACCEPTED
+  REJECTED = Disposition.REJECTED
+  RELEASED = Disposition.RELEASED
+  MODIFIED = Disposition.MODIFIED
 
   def __init__(self, impl):
     self.impl = impl
+    self.local = Disposition()
 
   @property
   def tag(self):
@@ -441,26 +610,22 @@ class Delivery(object):
     return self.impl.isUpdated()
 
   def update(self, disp):
-    if disp == self.ACCEPTED:
-      self.impl.disposition(Accepted.getInstance())
-    else:
-      raise Exception("xxx: %s" % disp)
+    self.local.type = disp
+    self.impl.disposition(self.local._2J())
+
+  @property
+  def remote(self):
+    d = Disposition()
+    d._2PY(self.impl.getRemoteState())
+    return d
 
   @property
   def remote_state(self):
-    rd = self.impl.getRemoteState()
-    if(rd == Accepted.getInstance()):
-      return self.ACCEPTED
-    else:
-      raise Exception("xxx: %s" % rd)
+    return self.remote.type
 
   @property
   def local_state(self):
-    ld = self.impl.getLocalState()
-    if(ld == Accepted.getInstance()):
-      return self.ACCEPTED
-    else:
-      raise Exception("xxx: %s" % ld)
+    return self.local.type
 
   def settle(self):
     self.impl.settle()
@@ -1441,7 +1606,9 @@ conversions_J2PY = {
   dict: lambda d: dict([(J2PY(k), J2PY(v)) for k, v in d.items()]),
   HashMap: lambda m: dict([(J2PY(e.getKey()), J2PY(e.getValue())) for e in m.entrySet()]),
   list: lambda l: [J2PY(x) for x in l],
-  Symbol: lambda s: symbol(s.toString())
+  Symbol: lambda s: symbol(s.toString()),
+  UnsignedInteger: lambda n: n.longValue(),
+  UnsignedLong: lambda n: n.longValue()
   }
 
 conversions_PY2J = {
@@ -1476,6 +1643,7 @@ __all__ = [
            "Connector",
            "Data",
            "Delivery",
+           "Disposition",
            "Described",
            "Driver",
            "Endpoint",

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Mon Jun 17 17:54:54 2013
@@ -49,20 +49,15 @@ public class DeliveryImpl implements Del
     private static final int DELIVERY_STATE_CHANGED = 1;
     private static final int ABLE_TO_SEND = 2;
     private static final int IO_WORK = 4;
+    private static final int DELIVERY_SETTLED = 8;
 
 
     /**
      * A bit-mask representing the outstanding work on this delivery received from the transport layer
      * that has not yet been processed by the application.
-     * Note that contrast with {@link #_transportFlags}.
      */
     private int _flags = (byte) 0;
 
-    /**
-     * A bit-mask representing the outstanding work done by the application on this delivery
-     * that has not yet been processed by the transport layer
-     */
-    private int _transportFlags = (byte) 0;
     private TransportDelivery _transportDelivery;
     private byte[] _data;
     private int _dataSize;
@@ -118,7 +113,7 @@ public class DeliveryImpl implements Del
         _deliveryState = state;
         if(!_remoteSettled)
         {
-            setTransportFlag(DELIVERY_STATE_CHANGED);
+            addToTransportWorkList();
         }
     }
 
@@ -128,7 +123,7 @@ public class DeliveryImpl implements Del
         _link.decrementUnsettled();
         if(!_remoteSettled)
         {
-            setTransportFlag(DELIVERY_STATE_CHANGED);
+            addToTransportWorkList();
         }
         else
         {
@@ -303,16 +298,6 @@ public class DeliveryImpl implements Del
     }
 
 
-    private void setTransportFlag(int flag)
-    {
-        boolean addWork = (_transportFlags == 0);
-        _transportFlags = _transportFlags | flag;
-        if(addWork)
-        {
-            addToTransportWorkList();
-        }
-    }
-
     DeliveryImpl getTransportWorkNext()
     {
         return _transportWorkNext;
@@ -334,11 +319,6 @@ public class DeliveryImpl implements Del
         _transportWorkPrev = transportWorkPrev;
     }
 
-    boolean isLocalStateChange()
-    {
-        return (_transportFlags & DELIVERY_STATE_CHANGED) != 0;
-    }
-
     TransportDelivery getTransportDelivery()
     {
         return _transportDelivery;
@@ -476,7 +456,6 @@ public class DeliveryImpl implements Del
             .append(", _remoteSettled=").append(_remoteSettled)
             .append(", _remoteDeliveryState=").append(_remoteDeliveryState)
             .append(", _flags=").append(_flags)
-            .append(", _transportFlags=").append(_transportFlags)
             .append(", _transportDelivery=").append(_transportDelivery)
             .append(", _dataSize=").append(_dataSize)
             .append(", _complete=").append(_complete)

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Mon Jun 17 17:54:54 2013
@@ -394,7 +394,7 @@ public class TransportImpl extends Endpo
             DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
             while(delivery != null && buffer.remaining() >= _maxFrameSize )
             {
-                if((delivery.getLink() instanceof SenderImpl) && delivery.isLocalStateChange() && delivery.getTransportDelivery() != null)
+                if((delivery.getLink() instanceof SenderImpl) && delivery.getTransportDelivery() != null)
                 {
                     TransportDelivery transportDelivery = delivery.getTransportDelivery();
                     Disposition disposition = new Disposition();
@@ -509,7 +509,7 @@ public class TransportImpl extends Endpo
             DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
             while(delivery != null && buffer.remaining() >= _maxFrameSize)
             {
-                if((delivery.getLink() instanceof ReceiverImpl) && delivery.isLocalStateChange())
+                if((delivery.getLink() instanceof ReceiverImpl))
                 {
                     TransportDelivery transportDelivery = delivery.getTransportDelivery();
                     Disposition disposition = new Disposition();

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1493859&r1=1493858&r2=1493859&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Mon Jun 17 17:54:54 2013
@@ -1510,5 +1510,161 @@ class ServerTest(Test):
 
     self.server.stop()
 
+class NoValue:
 
+  def __init__(self):
+    pass
 
+  def apply(self, dlv):
+    pass
+
+  def check(self, dlv):
+    assert dlv.data == None
+    assert dlv.section_number == 0
+    assert dlv.section_offset == 0
+    assert dlv.condition == None
+    assert dlv.failed == False
+    assert dlv.undeliverable == False
+    assert dlv.annotations == None
+
+class RejectValue:
+  def __init__(self, condition):
+    self.condition = condition
+
+  def apply(self, dlv):
+    dlv.condition = self.condition
+
+  def check(self, dlv):
+    assert dlv.data == None, dlv.data
+    assert dlv.section_number == 0
+    assert dlv.section_offset == 0
+    assert dlv.condition == self.condition, (dlv.condition, self.condition)
+    assert dlv.failed == False
+    assert dlv.undeliverable == False
+    assert dlv.annotations == None
+
+class ReceivedValue:
+  def __init__(self, section_number, section_offset):
+    self.section_number = section_number
+    self.section_offset = section_offset
+
+  def apply(self, dlv):
+    dlv.section_number = self.section_number
+    dlv.section_offset = self.section_offset
+
+  def check(self, dlv):
+    assert dlv.data == None, dlv.data
+    assert dlv.section_number == self.section_number, (dlv.section_number, self.section_number)
+    assert dlv.section_offset == self.section_offset
+    assert dlv.condition == None
+    assert dlv.failed == False
+    assert dlv.undeliverable == False
+    assert dlv.annotations == None
+
+class ModifiedValue:
+  def __init__(self, failed, undeliverable, annotations):
+    self.failed = failed
+    self.undeliverable = undeliverable
+    self.annotations = annotations
+
+  def apply(self, dlv):
+    dlv.failed = self.failed
+    dlv.undeliverable = self.undeliverable
+    dlv.annotations = self.annotations
+
+  def check(self, dlv):
+    assert dlv.data == None, dlv.data
+    assert dlv.section_number == 0
+    assert dlv.section_offset == 0
+    assert dlv.condition == None
+    assert dlv.failed == self.failed
+    assert dlv.undeliverable == self.undeliverable
+    assert dlv.annotations == self.annotations, (dlv.annotations, self.annotations)
+
+class CustomValue:
+  def __init__(self, data):
+    self.data = data
+
+  def apply(self, dlv):
+    dlv.data = self.data
+
+  def check(self, dlv):
+    assert dlv.data == self.data, (dlv.data, self.data)
+    assert dlv.section_number == 0
+    assert dlv.section_offset == 0
+    assert dlv.condition == None
+    assert dlv.failed == False
+    assert dlv.undeliverable == False
+    assert dlv.annotations == None
+
+class DeliveryTest(Test):
+
+  def teardown(self):
+    self.cleanup()
+
+  def testDisposition(self, count=1, tag="tag%i", type=Delivery.ACCEPTED, value=NoValue()):
+    snd, rcv = self.link("test-link")
+    snd.open()
+    rcv.open()
+
+    snd_deliveries = []
+    for i in range(count):
+      d = snd.delivery(tag % i)
+      snd_deliveries.append(d)
+      snd.advance()
+
+    rcv.flow(count)
+    self.pump()
+
+    rcv_deliveries = []
+    for i in range(count):
+      d = rcv.current
+      assert d.tag == (tag % i)
+      rcv_deliveries.append(d)
+      rcv.advance()
+
+    for d in rcv_deliveries:
+      value.apply(d.local)
+      d.update(type)
+
+    self.pump()
+
+    for d in snd_deliveries:
+      assert d.remote_state == type
+      assert d.remote.type == type
+      value.check(d.remote)
+      value.apply(d.local)
+      d.update(type)
+
+    self.pump()
+
+    for d in rcv_deliveries:
+      assert d.remote_state == type
+      assert d.remote.type == type
+      value.check(d.remote)
+
+    for d in snd_deliveries:
+      d.settle()
+
+    self.pump()
+
+    for d in rcv_deliveries:
+      assert d.settled, d.settled
+      d.settle()
+
+  def testReceived(self):
+    self.testDisposition(type=Disposition.RECEIVED, value=ReceivedValue(1, 2))
+
+  def testRejected(self):
+    self.testDisposition(type=Disposition.REJECTED, value=RejectValue(Condition(symbol("foo"))))
+
+  def testReleased(self):
+    self.testDisposition(type=Disposition.RELEASED)
+
+  def testModified(self):
+    self.testDisposition(type=Disposition.MODIFIED,
+                         value=ModifiedValue(failed=True, undeliverable=True,
+                                             annotations={"key": "value"}))
+
+  def testCustom(self):
+    self.testDisposition(type=0x12345, value=CustomValue([1, 2, 3]))



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org