You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/10/29 20:27:58 UTC

svn commit: r1635266 - in /qpid/proton/trunk: proton-c/src/protocol.py proton-c/src/transport/transport.c proton-c/src/transport/transport.c.orig proton-j/src/main/resources/cengine.py tests/python/proton_tests/engine.py

Author: rhs
Date: Wed Oct 29 19:27:57 2014
New Revision: 1635266

URL: http://svn.apache.org/r1635266
Log:
PROTON-723: based on gordon's patch, added support for the coordinator target

Added:
    qpid/proton/trunk/proton-c/src/transport/transport.c.orig
      - copied, changed from r1634966, qpid/proton/trunk/proton-c/src/transport/transport.c
Modified:
    qpid/proton/trunk/proton-c/src/protocol.py
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/proton-j/src/main/resources/cengine.py
    qpid/proton/trunk/tests/python/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/src/protocol.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/protocol.py?rev=1635266&r1=1635265&r2=1635266&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/protocol.py (original)
+++ qpid/proton/trunk/proton-c/src/protocol.py Wed Oct 29 19:27:57 2014
@@ -20,6 +20,7 @@ import mllib, os, sys
 
 doc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transport.xml"))
 mdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "messaging.xml"))
+tdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transactions.xml"))
 sdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "security.xml"))
 
 def eq(attr, value):
@@ -27,6 +28,7 @@ def eq(attr, value):
 
 TYPES = doc.query["amqp/section/type", eq("@class", "composite")] + \
     mdoc.query["amqp/section/type", eq("@class", "composite")] + \
+    tdoc.query["amqp/section/type", eq("@class", "composite")] + \
     sdoc.query["amqp/section/type", eq("@class", "composite")] + \
     mdoc.query["amqp/section/type", eq("@provides", "section")]
 RESTRICTIONS = {}

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1635266&r1=1635265&r2=1635266&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Wed Oct 29 19:27:57 2014
@@ -695,7 +695,15 @@ int pn_do_attach(pn_dispatcher_t *disp)
     pn_terminus_set_timeout(rtgt, tgt_timeout);
     pn_terminus_set_dynamic(rtgt, tgt_dynamic);
   } else {
-    pn_terminus_set_type(rtgt, PN_UNSPECIFIED);
+    uint64_t code = 0;
+    pn_data_clear(link->remote_target.capabilities);
+    err = pn_scan_args(disp, "D.[.....D..DL[C]...]", &code,
+                       link->remote_target.capabilities);
+    if (code == COORDINATOR) {
+      pn_terminus_set_type(rtgt, PN_COORDINATOR);
+    } else {
+      pn_terminus_set_type(rtgt, PN_UNSPECIFIED);
+    }
   }
 
   if (snd_settle)
@@ -1323,34 +1331,58 @@ int pn_process_link_setup(pn_transport_t
     {
       pni_map_local_handle(link);
       const pn_distribution_mode_t dist_mode = link->source.distribution_mode;
-      int err = pn_post_frame(transport->disp, ssn_state->local_channel,
-                              "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH,
-                              pn_string_get(link->name),
-                              state->local_handle,
-                              endpoint->type == RECEIVER,
-                              link->snd_settle_mode,
-                              link->rcv_settle_mode,
-                              (bool) link->source.type, SOURCE,
-                              pn_string_get(link->source.address),
-                              link->source.durability,
-                              expiry_symbol(link->source.expiry_policy),
-                              link->source.timeout,
-                              link->source.dynamic,
-                              link->source.properties,
-                              (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode),
-                              link->source.filter,
-                              link->source.outcomes,
-                              link->source.capabilities,
-                              (bool) link->target.type, TARGET,
-                              pn_string_get(link->target.address),
-                              link->target.durability,
-                              expiry_symbol(link->target.expiry_policy),
-                              link->target.timeout,
-                              link->target.dynamic,
-                              link->target.properties,
-                              link->target.capabilities,
-                              0);
-      if (err) return err;
+      if (link->target.type == PN_COORDINATOR) {
+        int err = pn_post_frame(transport->disp, ssn_state->local_channel,
+                                "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", ATTACH,
+                                pn_string_get(link->name),
+                                state->local_handle,
+                                endpoint->type == RECEIVER,
+                                link->snd_settle_mode,
+                                link->rcv_settle_mode,
+                                (bool) link->source.type, SOURCE,
+                                pn_string_get(link->source.address),
+                                link->source.durability,
+                                expiry_symbol(link->source.expiry_policy),
+                                link->source.timeout,
+                                link->source.dynamic,
+                                link->source.properties,
+                                (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode),
+                                link->source.filter,
+                                link->source.outcomes,
+                                link->source.capabilities,
+                                COORDINATOR, link->target.capabilities,
+                                0);
+        if (err) return err;
+      } else {
+        int err = pn_post_frame(transport->disp, ssn_state->local_channel,
+                                "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH,
+                                pn_string_get(link->name),
+                                state->local_handle,
+                                endpoint->type == RECEIVER,
+                                link->snd_settle_mode,
+                                link->rcv_settle_mode,
+                                (bool) link->source.type, SOURCE,
+                                pn_string_get(link->source.address),
+                                link->source.durability,
+                                expiry_symbol(link->source.expiry_policy),
+                                link->source.timeout,
+                                link->source.dynamic,
+                                link->source.properties,
+                                (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode),
+                                link->source.filter,
+                                link->source.outcomes,
+                                link->source.capabilities,
+                                (bool) link->target.type, TARGET,
+                                pn_string_get(link->target.address),
+                                link->target.durability,
+                                expiry_symbol(link->target.expiry_policy),
+                                link->target.timeout,
+                                link->target.dynamic,
+                                link->target.properties,
+                                link->target.capabilities,
+                                0);
+        if (err) return err;
+      }
     }
   }
 

Copied: qpid/proton/trunk/proton-c/src/transport/transport.c.orig (from r1634966, qpid/proton/trunk/proton-c/src/transport/transport.c)
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c.orig?p2=qpid/proton/trunk/proton-c/src/transport/transport.c.orig&p1=qpid/proton/trunk/proton-c/src/transport/transport.c&r1=1634966&r2=1635266&rev=1635266&view=diff
==============================================================================
    (empty)

Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1635266&r1=1635265&r2=1635266&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Wed Oct 29 19:27:57 2014
@@ -419,21 +419,22 @@ class pn_terminus:
   def decode(self, impl):
     if impl is not None:
       self.type = TERMINUS_TYPES_J2P[impl.__class__]
-      self.address = impl.getAddress()
-      self.durability = DURABILITY_J2P[impl.getDurable()]
-      self.expiry_policy = EXPIRY_POLICY_J2P[impl.getExpiryPolicy()]
-      self.timeout = impl.getTimeout().longValue()
-      self.dynamic = impl.getDynamic()
-      obj2dat(impl.getDynamicNodeProperties(), self.properties)
-      array2dat(impl.getCapabilities(), PN_SYMBOL, self.capabilities)
-      if self.type == PN_SOURCE:
-        self.distribution_mode = DISTRIBUTION_MODE_J2P[impl.getDistributionMode()]
-        array2dat(impl.getOutcomes(), PN_SYMBOL, self.outcomes)
-        obj2dat(impl.getFilter(), self.filter)
+      if self.type in (PN_SOURCE, PN_TARGET):
+        self.address = impl.getAddress()
+        self.durability = DURABILITY_J2P[impl.getDurable()]
+        self.expiry_policy = EXPIRY_POLICY_J2P[impl.getExpiryPolicy()]
+        self.timeout = impl.getTimeout().longValue()
+        self.dynamic = impl.getDynamic()
+        obj2dat(impl.getDynamicNodeProperties(), self.properties)
+        array2dat(impl.getCapabilities(), PN_SYMBOL, self.capabilities)
+        if self.type == PN_SOURCE:
+          self.distribution_mode = DISTRIBUTION_MODE_J2P[impl.getDistributionMode()]
+          array2dat(impl.getOutcomes(), PN_SYMBOL, self.outcomes)
+          obj2dat(impl.getFilter(), self.filter)
 
   def encode(self):
     impl = TERMINUS_TYPES_P2J[self.type]()
-    if impl is not None:
+    if self.type in (PN_SOURCE, PN_TARGET):
       impl.setAddress(self.address)
       impl.setDurable(DURABILITY_P2J[self.durability])
       impl.setExpiryPolicy(EXPIRY_POLICY_P2J[self.expiry_policy])

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1635266&r1=1635265&r2=1635266&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Wed Oct 29 19:27:57 2014
@@ -547,6 +547,9 @@ class LinkTest(Test):
   def test_target(self):
     self._test_source_target(None, TerminusConfig(address="target"))
 
+  def test_coordinator(self):
+    self._test_source_target(None, TerminusConfig(type=Terminus.COORDINATOR))
+
   def test_source_target_full(self):
     self._test_source_target(TerminusConfig(address="source",
                                             timeout=3,
@@ -619,8 +622,8 @@ class LinkTest(Test):
 
 class TerminusConfig:
 
-  def __init__(self, address=None, timeout=None, durability=None, filter=None,
-               capabilities=None, dynamic=False, dist_mode=None):
+  def __init__(self, type=None, address=None, timeout=None, durability=None,
+               filter=None, capabilities=None, dynamic=False, dist_mode=None):
     self.address = address
     self.timeout = timeout
     self.durability = durability
@@ -628,8 +631,11 @@ class TerminusConfig:
     self.capabilities = capabilities
     self.dynamic = dynamic
     self.dist_mode = dist_mode
+    self.type = type
 
   def __call__(self, terminus):
+    if self.type is not None:
+      terminus.type = self.type
     if self.address is not None:
       terminus.address = self.address
     if self.timeout is not None:



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org