You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/07/04 16:05:59 UTC

svn commit: r1499758 - in /qpid/proton/trunk: proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/engine/ proton-j/proton-api/src/main/resources/ tests/python/proton_tests/

Author: kgiusti
Date: Thu Jul  4 14:05:59 2013
New Revision: 1499758

URL: http://svn.apache.org/r1499758
Log:
PROTON-139: provide access to distribution mode in source terminus

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
    qpid/proton/trunk/tests/python/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Jul  4 14:05:59 2013
@@ -2159,6 +2159,10 @@ class Terminus(object):
   CONFIGURATION = PN_CONFIGURATION
   DELIVERIES = PN_DELIVERIES
 
+  DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
+  DIST_MODE_COPY = PN_DIST_MODE_COPY
+  DIST_MODE_MOVE = PN_DIST_MODE_MOVE
+
   def __init__(self, impl):
     self._impl = impl
 
@@ -2205,6 +2209,12 @@ class Terminus(object):
     self._check(pn_terminus_set_dynamic(self._impl, dynamic))
   dynamic = property(_is_dynamic, _set_dynamic)
 
+  def _get_distribution_mode(self):
+    return pn_terminus_get_distribution_mode(self._impl)
+  def _set_distribution_mode(self, mode):
+    self._check(pn_terminus_set_distribution_mode(self._impl, mode))
+  distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
+
   @property
   def properties(self):
     return Data(pn_terminus_properties(self._impl))

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Thu Jul  4 14:05:59 2013
@@ -65,6 +65,11 @@ typedef enum {
   PN_CONNECTION_CLOSE,
   PN_NEVER
 } pn_expiry_policy_t;
+typedef enum {
+  PN_DIST_MODE_UNSPECIFIED,
+  PN_DIST_MODE_COPY,
+  PN_DIST_MODE_MOVE
+} pn_distribution_mode_t;
 
 typedef struct pn_disposition_t pn_disposition_t;
 typedef struct pn_delivery_t pn_delivery_t;
@@ -483,6 +488,8 @@ PN_EXTERN pn_data_t *pn_terminus_propert
 PN_EXTERN pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus);
 PN_EXTERN pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus);
 PN_EXTERN pn_data_t *pn_terminus_filter(pn_terminus_t *terminus);
+PN_EXTERN pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus);
+PN_EXTERN int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m);
 PN_EXTERN int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src);
 
 // delivery

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Jul  4 14:05:59 2013
@@ -212,6 +212,7 @@ struct pn_terminus_t {
   pn_expiry_policy_t expiry_policy;
   pn_seconds_t timeout;
   bool dynamic;
+  pn_distribution_mode_t distribution_mode;
   pn_data_t *properties;
   pn_data_t *capabilities;
   pn_data_t *outcomes;

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Jul  4 14:05:59 2013
@@ -898,6 +898,7 @@ void pn_terminus_init(pn_terminus_t *ter
   terminus->expiry_policy = PN_SESSION_CLOSE;
   terminus->timeout = 0;
   terminus->dynamic = false;
+  terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED;
   terminus->properties = pn_data(16);
   terminus->capabilities = pn_data(16);
   terminus->outcomes = pn_data(16);
@@ -1082,6 +1083,18 @@ pn_data_t *pn_terminus_filter(pn_terminu
   return terminus ? terminus->filter : NULL;
 }
 
+pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus)
+{
+  return terminus ? terminus->distribution_mode : PN_DIST_MODE_UNSPECIFIED;
+}
+
+int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m)
+{
+  if (!terminus) return PN_ARG_ERR;
+  terminus->distribution_mode = m;
+  return 0;
+}
+
 int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src)
 {
   if (!terminus || !src) {
@@ -1095,6 +1108,7 @@ int pn_terminus_copy(pn_terminus_t *term
   terminus->expiry_policy = src->expiry_policy;
   terminus->timeout = src->timeout;
   terminus->dynamic = src->dynamic;
+  terminus->distribution_mode = src->distribution_mode;
   err = pn_data_copy(terminus->properties, src->properties);
   if (err) return err;
   err = pn_data_copy(terminus->capabilities, src->capabilities);
@@ -1669,6 +1683,32 @@ static pn_expiry_policy_t symbol2policy(
   return PN_SESSION_CLOSE;
 }
 
+static pn_distribution_mode_t symbol2dist_mode(const pn_bytes_t symbol)
+{
+  if (!symbol.start)
+    return PN_DIST_MODE_UNSPECIFIED;
+
+  if (!strncmp(symbol.start, "move", symbol.size))
+    return PN_DIST_MODE_MOVE;
+  if (!strncmp(symbol.start, "copy", symbol.size))
+    return PN_DIST_MODE_COPY;
+
+  return PN_DIST_MODE_UNSPECIFIED;
+}
+
+static const char *dist_mode2symbol(const pn_distribution_mode_t mode)
+{
+  switch (mode)
+  {
+  case PN_DIST_MODE_COPY:
+    return "copy";
+  case PN_DIST_MODE_MOVE:
+    return "move";
+  default:
+    return NULL;
+  }
+}
+
 int pn_do_attach(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = (pn_transport_t *) disp->context;
@@ -1681,9 +1721,10 @@ int pn_do_attach(pn_dispatcher_t *disp)
   pn_seconds_t src_timeout, tgt_timeout;
   bool src_dynamic, tgt_dynamic;
   pn_sequence_t idc;
-  int err = pn_scan_args(disp, "D.[SIo..D.[SIsIo]D.[SIsIo]..I]", &name, &handle,
+  pn_bytes_t dist_mode;
+  int err = pn_scan_args(disp, "D.[SIo..D.[SIsIo.s]D.[SIsIo]..I]", &name, &handle,
                          &is_sender,
-                         &source, &src_dr, &src_exp, &src_timeout, &src_dynamic,
+                         &source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode,
                          &target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic,
                          &idc);
   if (err) return err;
@@ -1717,6 +1758,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
     pn_terminus_set_expiry_policy(rsrc, symbol2policy(src_exp));
     pn_terminus_set_timeout(rsrc, src_timeout);
     pn_terminus_set_dynamic(rsrc, src_dynamic);
+    pn_terminus_set_distribution_mode(rsrc, symbol2dist_mode(dist_mode));
   } else {
     pn_terminus_set_type(rsrc, PN_UNSPECIFIED);
   }
@@ -2292,8 +2334,9 @@ int pn_process_link_setup(pn_transport_t
     {
       state->local_handle = allocate_alias(ssn_state->local_handles);
       pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+      const pn_distribution_mode_t dist_mode = link->source.distribution_mode;
       int err = pn_post_frame(transport->disp, ssn_state->local_channel,
-                              "DL[SIonn?DL[SIsIoCnCnCC]?DL[SIsIoCC]nnI]", ATTACH,
+                              "DL[SIonn?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH,
                               pn_string_get(link->name),
                               state->local_handle,
                               endpoint->type == RECEIVER,
@@ -2304,6 +2347,7 @@ int pn_process_link_setup(pn_transport_t
                               link->source.timeout,
                               link->source.dynamic,
                               link->source.properties,
+                              (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode),
                               link->source.filter,
                               link->source.outcomes,
                               link->source.capabilities,

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Thu Jul  4 14:05:59 2013
@@ -365,6 +365,10 @@ class Terminus(object):
 
   UNSPECIFIED = None
 
+  DIST_MODE_UNSPECIFIED = None
+  DIST_MODE_COPY = "copy"
+  DIST_MODE_MOVE = "move"
+
   def __init__(self, impl):
     self.impl = impl
     self.type = None
@@ -396,10 +400,28 @@ class Terminus(object):
     self.impl.setDynamic(dynamic)
   dynamic = property(_is_dynamic, _set_dynamic)
 
+  def _get_distribution_mode(self):
+    if isinstance(self.impl, Source):
+      sym = self.impl.getDistributionMode()
+      if sym is None:
+        return self.DIST_MODE_UNSPECIFIED
+      else:
+        return sym.toString()
+    else:
+      return self.DIST_MODE_UNSPECIFIED
+  def _set_distribution_mode(self, mode):
+    if isinstance(self.impl, Source):
+      if mode in [None, "copy", "move"]:
+        self.impl.setDistributionMode(Symbol.valueOf(mode))
+      else:
+        self.impl.setDistributionMode(None)
+  distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
+
   def copy(self, src):
     self.address = src.address
     self.timeout = src.timeout
     self.dynamic = src.dynamic
+    self.distribution_mode = src.distribution_mode
 
 class Sender(Link):
 

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Jul  4 14:05:59 2013
@@ -473,11 +473,18 @@ class LinkTest(Test):
   def test_source_target_full(self):
     self._test_source_target(TerminusConfig(address="source",
                                             timeout=3,
+                                            dist_mode=Terminus.DIST_MODE_MOVE,
                                             filter=[("int", 1), ("symbol", "two"), ("string", "three")],
                                             capabilities=["one", "two", "three"]),
                              TerminusConfig(address="source",
                                             timeout=7,
                                             capabilities=[]))
+  def test_distribution_mode(self):
+    self._test_source_target(TerminusConfig(address="source",
+                                            dist_mode=Terminus.DIST_MODE_COPY),
+                             TerminusConfig(address="target"))
+    assert self.rcv.remote_source.distribution_mode == Terminus.DIST_MODE_COPY
+    assert self.rcv.remote_target.distribution_mode == Terminus.DIST_MODE_UNSPECIFIED
 
   def test_dynamic_link(self):
     self._test_source_target(TerminusConfig(address=None, dynamic=True), None)
@@ -507,13 +514,14 @@ class LinkTest(Test):
 class TerminusConfig:
 
   def __init__(self, address=None, timeout=None, durability=None, filter=None,
-               capabilities=None, dynamic=False):
+               capabilities=None, dynamic=False, dist_mode=None):
     self.address = address
     self.timeout = timeout
     self.durability = durability
     self.filter = filter
     self.capabilities = capabilities
     self.dynamic = dynamic
+    self.dist_mode = dist_mode
 
   def __call__(self, terminus):
     if self.address is not None:
@@ -535,6 +543,8 @@ class TerminusConfig:
         setter(v)
     if self.dynamic:
       terminus.dynamic = True
+    if self.dist_mode is not None:
+      terminus.distribution_mode = self.dist_mode
 
 class TransferTest(Test):
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org