You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/07/04 16:05:59 UTC
svn commit: r1499758 - in /qpid/proton/trunk: proton-c/bindings/python/
proton-c/include/proton/ proton-c/src/engine/
proton-j/proton-api/src/main/resources/ tests/python/proton_tests/
Author: kgiusti
Date: Thu Jul 4 14:05:59 2013
New Revision: 1499758
URL: http://svn.apache.org/r1499758
Log:
PROTON-139: provide access to distribution mode in source terminus
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/tests/python/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Jul 4 14:05:59 2013
@@ -2159,6 +2159,10 @@ class Terminus(object):
CONFIGURATION = PN_CONFIGURATION
DELIVERIES = PN_DELIVERIES
+ DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
+ DIST_MODE_COPY = PN_DIST_MODE_COPY
+ DIST_MODE_MOVE = PN_DIST_MODE_MOVE
+
def __init__(self, impl):
self._impl = impl
@@ -2205,6 +2209,12 @@ class Terminus(object):
self._check(pn_terminus_set_dynamic(self._impl, dynamic))
dynamic = property(_is_dynamic, _set_dynamic)
+ def _get_distribution_mode(self):
+ return pn_terminus_get_distribution_mode(self._impl)
+ def _set_distribution_mode(self, mode):
+ self._check(pn_terminus_set_distribution_mode(self._impl, mode))
+ distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
+
@property
def properties(self):
return Data(pn_terminus_properties(self._impl))
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Thu Jul 4 14:05:59 2013
@@ -65,6 +65,11 @@ typedef enum {
PN_CONNECTION_CLOSE,
PN_NEVER
} pn_expiry_policy_t;
+typedef enum {
+ PN_DIST_MODE_UNSPECIFIED,
+ PN_DIST_MODE_COPY,
+ PN_DIST_MODE_MOVE
+} pn_distribution_mode_t;
typedef struct pn_disposition_t pn_disposition_t;
typedef struct pn_delivery_t pn_delivery_t;
@@ -483,6 +488,8 @@ PN_EXTERN pn_data_t *pn_terminus_propert
PN_EXTERN pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus);
PN_EXTERN pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus);
PN_EXTERN pn_data_t *pn_terminus_filter(pn_terminus_t *terminus);
+PN_EXTERN pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus);
+PN_EXTERN int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m);
PN_EXTERN int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src);
// delivery
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Jul 4 14:05:59 2013
@@ -212,6 +212,7 @@ struct pn_terminus_t {
pn_expiry_policy_t expiry_policy;
pn_seconds_t timeout;
bool dynamic;
+ pn_distribution_mode_t distribution_mode;
pn_data_t *properties;
pn_data_t *capabilities;
pn_data_t *outcomes;
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Jul 4 14:05:59 2013
@@ -898,6 +898,7 @@ void pn_terminus_init(pn_terminus_t *ter
terminus->expiry_policy = PN_SESSION_CLOSE;
terminus->timeout = 0;
terminus->dynamic = false;
+ terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED;
terminus->properties = pn_data(16);
terminus->capabilities = pn_data(16);
terminus->outcomes = pn_data(16);
@@ -1082,6 +1083,18 @@ pn_data_t *pn_terminus_filter(pn_terminu
return terminus ? terminus->filter : NULL;
}
+pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus)
+{
+ return terminus ? terminus->distribution_mode : PN_DIST_MODE_UNSPECIFIED;
+}
+
+int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m)
+{
+ if (!terminus) return PN_ARG_ERR;
+ terminus->distribution_mode = m;
+ return 0;
+}
+
int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src)
{
if (!terminus || !src) {
@@ -1095,6 +1108,7 @@ int pn_terminus_copy(pn_terminus_t *term
terminus->expiry_policy = src->expiry_policy;
terminus->timeout = src->timeout;
terminus->dynamic = src->dynamic;
+ terminus->distribution_mode = src->distribution_mode;
err = pn_data_copy(terminus->properties, src->properties);
if (err) return err;
err = pn_data_copy(terminus->capabilities, src->capabilities);
@@ -1669,6 +1683,32 @@ static pn_expiry_policy_t symbol2policy(
return PN_SESSION_CLOSE;
}
+static pn_distribution_mode_t symbol2dist_mode(const pn_bytes_t symbol)
+{
+ if (!symbol.start)
+ return PN_DIST_MODE_UNSPECIFIED;
+
+ if (!strncmp(symbol.start, "move", symbol.size))
+ return PN_DIST_MODE_MOVE;
+ if (!strncmp(symbol.start, "copy", symbol.size))
+ return PN_DIST_MODE_COPY;
+
+ return PN_DIST_MODE_UNSPECIFIED;
+}
+
+static const char *dist_mode2symbol(const pn_distribution_mode_t mode)
+{
+ switch (mode)
+ {
+ case PN_DIST_MODE_COPY:
+ return "copy";
+ case PN_DIST_MODE_MOVE:
+ return "move";
+ default:
+ return NULL;
+ }
+}
+
int pn_do_attach(pn_dispatcher_t *disp)
{
pn_transport_t *transport = (pn_transport_t *) disp->context;
@@ -1681,9 +1721,10 @@ int pn_do_attach(pn_dispatcher_t *disp)
pn_seconds_t src_timeout, tgt_timeout;
bool src_dynamic, tgt_dynamic;
pn_sequence_t idc;
- int err = pn_scan_args(disp, "D.[SIo..D.[SIsIo]D.[SIsIo]..I]", &name, &handle,
+ pn_bytes_t dist_mode;
+ int err = pn_scan_args(disp, "D.[SIo..D.[SIsIo.s]D.[SIsIo]..I]", &name, &handle,
&is_sender,
- &source, &src_dr, &src_exp, &src_timeout, &src_dynamic,
+ &source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode,
&target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic,
&idc);
if (err) return err;
@@ -1717,6 +1758,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
pn_terminus_set_expiry_policy(rsrc, symbol2policy(src_exp));
pn_terminus_set_timeout(rsrc, src_timeout);
pn_terminus_set_dynamic(rsrc, src_dynamic);
+ pn_terminus_set_distribution_mode(rsrc, symbol2dist_mode(dist_mode));
} else {
pn_terminus_set_type(rsrc, PN_UNSPECIFIED);
}
@@ -2292,8 +2334,9 @@ int pn_process_link_setup(pn_transport_t
{
state->local_handle = allocate_alias(ssn_state->local_handles);
pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+ const pn_distribution_mode_t dist_mode = link->source.distribution_mode;
int err = pn_post_frame(transport->disp, ssn_state->local_channel,
- "DL[SIonn?DL[SIsIoCnCnCC]?DL[SIsIoCC]nnI]", ATTACH,
+ "DL[SIonn?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH,
pn_string_get(link->name),
state->local_handle,
endpoint->type == RECEIVER,
@@ -2304,6 +2347,7 @@ int pn_process_link_setup(pn_transport_t
link->source.timeout,
link->source.dynamic,
link->source.properties,
+ (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode),
link->source.filter,
link->source.outcomes,
link->source.capabilities,
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Thu Jul 4 14:05:59 2013
@@ -365,6 +365,10 @@ class Terminus(object):
UNSPECIFIED = None
+ DIST_MODE_UNSPECIFIED = None
+ DIST_MODE_COPY = "copy"
+ DIST_MODE_MOVE = "move"
+
def __init__(self, impl):
self.impl = impl
self.type = None
@@ -396,10 +400,28 @@ class Terminus(object):
self.impl.setDynamic(dynamic)
dynamic = property(_is_dynamic, _set_dynamic)
+ def _get_distribution_mode(self):
+ if isinstance(self.impl, Source):
+ sym = self.impl.getDistributionMode()
+ if sym is None:
+ return self.DIST_MODE_UNSPECIFIED
+ else:
+ return sym.toString()
+ else:
+ return self.DIST_MODE_UNSPECIFIED
+ def _set_distribution_mode(self, mode):
+ if isinstance(self.impl, Source):
+ if mode in [None, "copy", "move"]:
+ self.impl.setDistributionMode(Symbol.valueOf(mode))
+ else:
+ self.impl.setDistributionMode(None)
+ distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
+
def copy(self, src):
self.address = src.address
self.timeout = src.timeout
self.dynamic = src.dynamic
+ self.distribution_mode = src.distribution_mode
class Sender(Link):
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1499758&r1=1499757&r2=1499758&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Jul 4 14:05:59 2013
@@ -473,11 +473,18 @@ class LinkTest(Test):
def test_source_target_full(self):
self._test_source_target(TerminusConfig(address="source",
timeout=3,
+ dist_mode=Terminus.DIST_MODE_MOVE,
filter=[("int", 1), ("symbol", "two"), ("string", "three")],
capabilities=["one", "two", "three"]),
TerminusConfig(address="source",
timeout=7,
capabilities=[]))
+ def test_distribution_mode(self):
+ self._test_source_target(TerminusConfig(address="source",
+ dist_mode=Terminus.DIST_MODE_COPY),
+ TerminusConfig(address="target"))
+ assert self.rcv.remote_source.distribution_mode == Terminus.DIST_MODE_COPY
+ assert self.rcv.remote_target.distribution_mode == Terminus.DIST_MODE_UNSPECIFIED
def test_dynamic_link(self):
self._test_source_target(TerminusConfig(address=None, dynamic=True), None)
@@ -507,13 +514,14 @@ class LinkTest(Test):
class TerminusConfig:
def __init__(self, address=None, timeout=None, durability=None, filter=None,
- capabilities=None, dynamic=False):
+ capabilities=None, dynamic=False, dist_mode=None):
self.address = address
self.timeout = timeout
self.durability = durability
self.filter = filter
self.capabilities = capabilities
self.dynamic = dynamic
+ self.dist_mode = dist_mode
def __call__(self, terminus):
if self.address is not None:
@@ -535,6 +543,8 @@ class TerminusConfig:
setter(v)
if self.dynamic:
terminus.dynamic = True
+ if self.dist_mode is not None:
+ terminus.distribution_mode = self.dist_mode
class TransferTest(Test):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org