You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/08/04 19:21:44 UTC

[qpid-proton] branch master updated: PROTON-335: Add access to link attach properties (C and Python) pn_link_properties() pn_link_remote_properties()

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new 550ddad  PROTON-335: Add access to link attach properties (C and Python)   pn_link_properties()   pn_link_remote_properties()
550ddad is described below

commit 550ddad4ac12b2f973aa8f1a951f5e57317712a1
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Sun Jun 7 22:10:11 2020 -0400

    PROTON-335: Add access to link attach properties (C and Python)
      pn_link_properties()
      pn_link_remote_properties()
    
    This closes #260.
---
 c/include/proton/link.h             | 29 +++++++++++++++++++++
 c/src/core/engine-internal.h        |  2 ++
 c/src/core/engine.c                 | 19 ++++++++++++++
 c/src/core/transport.c              | 25 +++++++++++++++----
 c/tests/engine_test.cpp             | 50 +++++++++++++++++++++++++++++++++++++
 python/proton/_endpoints.py         | 43 ++++++++++++++++++++++++++++++-
 python/tests/proton_tests/engine.py | 11 ++++++++
 7 files changed, 173 insertions(+), 6 deletions(-)

diff --git a/c/include/proton/link.h b/c/include/proton/link.h
index c7e3f5b..0143976 100644
--- a/c/include/proton/link.h
+++ b/c/include/proton/link.h
@@ -681,6 +681,35 @@ PN_EXTERN void pn_link_set_max_message_size(pn_link_t *link, uint64_t size);
 PN_EXTERN uint64_t pn_link_remote_max_message_size(pn_link_t *link);
 
 /**
+ * Access/modify the AMQP properties data for a link object.
+ *
+ * This operation will return a pointer to a ::pn_data_t object that is valid
+ * until the link object is freed. Any data contained by the ::pn_data_t object
+ * will be sent as the AMQP properties for the link object when the link is
+ * opened by calling ::pn_link_open. Note that this MUST take the form of a
+ * symbol keyed map to be valid.
+ *
+ * The ::pn_data_t pointer returned is valid until the link object is freed.
+ *
+ * @param[in] link the link object
+ * @return a pointer to a pn_data_t representing the link properties
+ */
+PN_EXTERN pn_data_t *pn_link_properties(pn_link_t *link);
+
+/**
+ * Access the AMQP link properties supplied by the remote link endpoint.
+ *
+ * This operation will return a pointer to a ::pn_data_t object that
+ * is valid until the link object is freed. This data object
+ * will be empty until the remote link is opened as indicated by
+ * the ::PN_REMOTE_ACTIVE flag.
+ *
+ * @param[in] link the link object
+ * @return the remote link properties
+ */
+PN_EXTERN pn_data_t *pn_link_remote_properties(pn_link_t *link);
+
+/**
  * @}
  */
 
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index ba6c051..9dbc919 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -296,6 +296,8 @@ struct pn_link_t {
   pn_delivery_t *unsettled_tail;
   pn_delivery_t *current;
   pn_record_t *context;
+  pn_data_t *properties;
+  pn_data_t *remote_properties;
   size_t unsettled_count;
   uint64_t max_message_size;
   uint64_t remote_max_message_size;
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
index 19dcc63..02062f6 100644
--- a/c/src/core/engine.c
+++ b/c/src/core/engine.c
@@ -1125,6 +1125,8 @@ static void pn_link_finalize(void *object)
   if (endpoint->referenced) {
     pn_decref(link->session);
   }
+  pn_free(link->properties);
+  pn_free(link->remote_properties);
 }
 
 #define pn_link_refcount pn_object_refcount
@@ -1168,6 +1170,8 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
   link->remote_snd_settle_mode = PN_SND_MIXED;
   link->remote_rcv_settle_mode = PN_RCV_FIRST;
   link->detached = false;
+  link->properties = 0;
+  link->remote_properties = 0;
 
   // begin transport state
   link->state.local_handle = -1;
@@ -1972,6 +1976,21 @@ uint64_t pn_link_remote_max_message_size(pn_link_t *link)
   return link->remote_max_message_size;
 }
 
+pn_data_t *pn_link_properties(pn_link_t *link)
+{
+  assert(link);
+  if (!link->properties)
+      link->properties = pn_data(0);
+  return link->properties;
+}
+
+pn_data_t *pn_link_remote_properties(pn_link_t *link)
+{
+  assert(link);
+  return link->remote_properties;
+}
+
+
 pn_link_t *pn_delivery_link(pn_delivery_t *delivery)
 {
   assert(delivery);
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index dde697d..c2f1c05 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1335,14 +1335,19 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel
   bool snd_settle, rcv_settle;
   uint8_t snd_settle_mode, rcv_settle_mode;
   uint64_t max_msgsz;
-  int err = pn_data_scan(args, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..IL]", &name, &handle,
+  bool has_props;
+  pn_data_t *rem_props = pn_data(0);
+  int err = pn_data_scan(args, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..IL..?C]", &name, &handle,
                          &is_sender,
                          &snd_settle, &snd_settle_mode,
                          &rcv_settle, &rcv_settle_mode,
                          &source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode,
                          &target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic,
-                         &idc, &max_msgsz);
-  if (err) return err;
+                         &idc, &max_msgsz, &has_props, rem_props);
+  if (err) {
+      pn_free(rem_props);
+      return err;
+  }
   char strbuf[128];      // avoid malloc for most link names
   char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 1) : NULL;
   char *strname = strheap ? strheap : strbuf;
@@ -1353,12 +1358,14 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel
   if (!ssn) {
       pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel);
       if (strheap) free(strheap);
+      pn_free(rem_props);
       return PN_EOS;
   }
   pn_link_t *link = pni_find_link(ssn, name, is_sender);
   if (link && (int32_t)link->state.remote_handle >= 0) {
     pn_do_error(transport, "amqp:invalid-field", "link name already attached: %s", strname);
     if (strheap) free(strheap);
+    pn_free(rem_props);
     return PN_EOS;
   }
   if (!link) {                  /* Make a new link for the attach */
@@ -1373,6 +1380,12 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel
     free(strheap);
   }
 
+  if (has_props) {
+    link->remote_properties = rem_props;
+  } else {
+    pn_free(rem_props);
+  }
+
   pni_map_remote_handle(link, handle);
   PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
   pn_terminus_t *rsrc = &link->remote_source;
@@ -2078,7 +2091,7 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp
         if (err) return err;
       } else {
         int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel,
-                                "DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnIL]", ATTACH,
+                                "DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnILnnC]", ATTACH,
                                 pn_string_get(link->name),
                                 state->local_handle,
                                 endpoint->type == RECEIVER,
@@ -2106,7 +2119,9 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp
                                 link->target.properties,
                                 link->target.capabilities,
 
-                                0, link->max_message_size);
+                                0,
+                                link->max_message_size,
+                                link->properties);
         if (err) return err;
       }
     }
diff --git a/c/tests/engine_test.cpp b/c/tests/engine_test.cpp
index fa6e23f..f8b174e 100644
--- a/c/tests/engine_test.cpp
+++ b/c/tests/engine_test.cpp
@@ -23,6 +23,8 @@
 
 #include <proton/engine.h>
 
+using namespace pn_test;
+
 // push data from one transport to another
 static int xfer(pn_transport_t *src, pn_transport_t *dest) {
   ssize_t out = pn_transport_pending(src);
@@ -316,3 +318,51 @@ TEST_CASE("engine_link_name_prefix)") {
   pn_transport_free(t2);
   pn_connection_free(c2);
 }
+
+
+TEST_CASE("link_properties)") {
+  pn_connection_t *c1 = pn_connection();
+  pn_transport_t *t1 = pn_transport();
+  pn_transport_bind(t1, c1);
+
+  pn_connection_t *c2 = pn_connection();
+  pn_transport_t *t2 = pn_transport();
+  pn_transport_set_server(t2);
+  pn_transport_bind(t2, c2);
+
+  pn_connection_open(c1);
+  pn_connection_open(c2);
+
+  pn_session_t *s1 = pn_session(c1);
+  pn_session_open(s1);
+
+  pn_link_t *rx = pn_receiver(s1, "props");
+  pn_data_t *props = pn_link_properties(rx);
+  REQUIRE(props != NULL);
+
+  pn_data_clear(props);
+  pn_data_fill(props, "{S[iii]SI}", "foo", 1, 987, 3, "bar", 965);
+  pn_link_open(rx);
+
+  while (pump(t1, t2)) {
+    process_endpoints(c1);
+    process_endpoints(c2);
+  }
+
+  // session and link should be up, c2 should have a sender link:
+  REQUIRE(pn_link_state(rx) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+  REQUIRE(pn_link_remote_properties(rx) == NULL);
+
+  pn_link_t *tx = pn_link_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+
+  REQUIRE(pn_link_remote_properties(tx) != NULL);
+  CHECK("{\"foo\"=[1, 987, 3], \"bar\"=965}" == pn_test::inspect(pn_link_remote_properties(tx)));
+
+  pn_transport_unbind(t1);
+  pn_transport_free(t1);
+  pn_connection_free(c1);
+
+  pn_transport_unbind(t2);
+  pn_transport_free(t2);
+  pn_connection_free(c2);
+}
diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py
index f518137..50cf677 100644
--- a/python/proton/_endpoints.py
+++ b/python/proton/_endpoints.py
@@ -53,7 +53,8 @@ from cproton import PN_CONFIGURATION, PN_COORDINATOR, PN_DELIVERIES, PN_DIST_MOD
     pn_terminus_get_durability, pn_terminus_get_expiry_policy, pn_terminus_get_timeout, pn_terminus_get_type, \
     pn_terminus_is_dynamic, pn_terminus_outcomes, pn_terminus_properties, pn_terminus_set_address, \
     pn_terminus_set_distribution_mode, pn_terminus_set_durability, pn_terminus_set_dynamic, \
-    pn_terminus_set_expiry_policy, pn_terminus_set_timeout, pn_terminus_set_type, pn_work_head
+    pn_terminus_set_expiry_policy, pn_terminus_set_timeout, pn_terminus_set_type, pn_work_head, \
+    pn_link_properties, pn_link_remote_properties
 
 from ._common import unicode2utf8, utf82unicode
 from ._condition import cond2obj, obj2cond
@@ -771,6 +772,10 @@ class Link(Wrapper, Endpoint):
     def __init__(self, impl):
         Wrapper.__init__(self, impl, pn_link_attachments)
 
+    def _init(self):
+        Endpoint._init(self)
+        self.properties = None
+
     def _get_attachments(self):
         return pn_link_attachments(self._impl)
 
@@ -796,6 +801,7 @@ class Link(Wrapper, Endpoint):
         sent to the peer. A link is fully active once both peers have
         attached it.
         """
+        obj2dat(self.properties, pn_link_properties(self._impl))
         pn_link_open(self._impl)
 
     def close(self):
@@ -1200,6 +1206,41 @@ class Link(Wrapper, Endpoint):
         """
         pn_link_free(self._impl)
 
+    @property
+    def remote_properties(self):
+        """
+        The properties specified by the remote peer for this link.
+
+        This operation will return a :class:`Data` object that
+        is valid until the link object is freed. This :class:`Data`
+        object will be empty until the remote link is opened as
+        indicated by the :const:`REMOTE_ACTIVE` flag.
+
+        :type: :class:`Data`
+        """
+        return dat2obj(pn_link_remote_properties(self._impl))
+
+
+    def _get_properties(self):
+        return self._properties_dict
+
+    def _set_properties(self, properties_dict):
+        if isinstance(properties_dict, dict):
+            self._properties_dict = PropertyDict(properties_dict, raise_on_error=False)
+        else:
+            self._properties_dict = properties_dict
+
+    properties = property(_get_properties, _set_properties, doc="""
+    Link properties as a dictionary of key/values. The AMQP 1.0
+    specification restricts this dictionary to have keys that are only
+    :class:`symbol` types. It is possible to use the special ``dict``
+    subclass :class:`PropertyDict` which will by default enforce this
+    restrictions on construction. In addition, if strings type are used,
+    this will silently convert them into symbols.
+
+    :type: ``dict`` containing :class:`symbol`` keys.
+    """)
+
 
 class Sender(Link):
     """
diff --git a/python/tests/proton_tests/engine.py b/python/tests/proton_tests/engine.py
index 48bee6b..1b4c02b 100644
--- a/python/tests/proton_tests/engine.py
+++ b/python/tests/proton_tests/engine.py
@@ -765,6 +765,17 @@ class LinkTest(Test):
     self.pump()
     assert self.rcv.remote_max_message_size == 13579
 
+  def test_properties(self):
+      sender_props = {symbol('key1'): 'value1',
+                      symbol('key2'): 'value2'}
+      self.snd.properties = sender_props
+      self.snd.open()
+      self.rcv.open()
+      self.pump()
+
+      assert self.rcv.remote_properties == sender_props, (self.rcv.remote_properties, sender_props)
+      assert self.snd.remote_properties == None, (self.snd.remote_properties, None)
+
   def test_cleanup(self):
     snd, rcv = self.link("test-link")
     snd.open()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org