You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2021/12/23 22:06:27 UTC

[qpid-proton] branch main updated: PROTON-2308: Add support for setting Dynamic Node Properties

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 596696a  PROTON-2308: Add support for setting Dynamic Node Properties
596696a is described below

commit 596696aede355d1e53832a41fbebb59d0b71d5f1
Author: Rakhi Kumari <ra...@gmail.com>
AuthorDate: Wed Dec 8 16:54:01 2021 +0530

    PROTON-2308: Add support for setting Dynamic Node Properties
    
    [Test fixed up by commiter] Closes #346
---
 cpp/include/proton/source_options.hpp |   4 ++
 cpp/include/proton/target_options.hpp |   4 ++
 cpp/include/proton/terminus.hpp       |   5 ++
 cpp/src/delivery_test.cpp             |  12 ++--
 cpp/src/link_test.cpp                 | 118 +++++++++++++++++++++++++++++++++-
 cpp/src/node_options.cpp              |  23 +++++++
 cpp/src/terminus.cpp                  |  12 +++-
 7 files changed, 168 insertions(+), 10 deletions(-)

diff --git a/cpp/include/proton/source_options.hpp b/cpp/include/proton/source_options.hpp
index fe1c34a..b221d72 100644
--- a/cpp/include/proton/source_options.hpp
+++ b/cpp/include/proton/source_options.hpp
@@ -27,6 +27,7 @@
 #include "./duration.hpp"
 #include "./source.hpp"
 
+#include <map>
 #include <string>
 
 /// @file
@@ -92,6 +93,9 @@ class source_options {
     /// Extension capabilities that are supported/requested
     PN_CPP_EXTERN source_options& capabilities(const std::vector<symbol>&);
 
+    /// Set the dynamic node properties.
+    PN_CPP_EXTERN source_options& dynamic_properties(const std::map<symbol, value>&);
+
   private:
     void apply(source&) const;
 
diff --git a/cpp/include/proton/target_options.hpp b/cpp/include/proton/target_options.hpp
index 834a185..1e24051 100644
--- a/cpp/include/proton/target_options.hpp
+++ b/cpp/include/proton/target_options.hpp
@@ -27,6 +27,7 @@
 #include "./duration.hpp"
 #include "./target.hpp"
 
+#include <map>
 #include <string>
 
 /// @file
@@ -83,6 +84,9 @@ class target_options {
     /// Extension capabilities that are supported/requested
     PN_CPP_EXTERN target_options& capabilities(const std::vector<symbol>&);
 
+    /// Set the dynamic node properties.
+    PN_CPP_EXTERN target_options& dynamic_properties(const std::map<symbol, value>&);
+
   private:
     void apply(target&) const;
 
diff --git a/cpp/include/proton/terminus.hpp b/cpp/include/proton/terminus.hpp
index 5b9c684..ca10d2c 100644
--- a/cpp/include/proton/terminus.hpp
+++ b/cpp/include/proton/terminus.hpp
@@ -27,6 +27,7 @@
 
 #include <proton/terminus.h>
 
+#include <map>
 #include <string>
 #include <vector>
 
@@ -103,6 +104,10 @@ class terminus {
     /// Extension capabilities that are supported/requested
     PN_CPP_EXTERN std::vector<symbol> capabilities() const;
 
+    /// Obtain the AMQP dynamic node properties for the
+    /// terminus as a standard map.
+    PN_CPP_EXTERN std::map<symbol, value> dynamic_properties() const;
+
   protected:
     pn_terminus_t *pn_object() const { return object_; }
   private:
diff --git a/cpp/src/delivery_test.cpp b/cpp/src/delivery_test.cpp
index af6441c..9765223 100644
--- a/cpp/src/delivery_test.cpp
+++ b/cpp/src/delivery_test.cpp
@@ -56,7 +56,7 @@ int tracker_settle_counter;
 proton::binary test_tag("TESTTAG");
 } // namespace
 
-class test_recv : public proton::messaging_handler {
+class test_server : public proton::messaging_handler {
   private:
     class listener_ready_handler : public proton::listen_handler {
         void on_open(proton::listener &l) override {
@@ -74,7 +74,7 @@ class test_recv : public proton::messaging_handler {
     listener_ready_handler listen_handler;
 
   public:
-    test_recv(const std::string &s) : url(s) {}
+    test_server (const std::string &s) : url(s) {}
 
     void on_container_start(proton::container &c) override {
         listener = c.listen(url, listen_handler);
@@ -88,13 +88,13 @@ class test_recv : public proton::messaging_handler {
     }
 };
 
-class test_send : public proton::messaging_handler {
+class test_client : public proton::messaging_handler {
   private:
     std::string url;
     proton::sender sender;
 
   public:
-    test_send(const std::string &s) : url(s) {}
+    test_client (const std::string &s) : url(s) {}
 
     void on_container_start(proton::container &c) override {
         proton::connection_options co;
@@ -125,7 +125,7 @@ int test_delivery_tag() {
     tracker_settle_counter = 0;
 
     std::string recv_address("127.0.0.1:0/test");
-    test_recv recv(recv_address);
+    test_server recv(recv_address);
     proton::container c(recv);
     std::thread thread_recv([&c]() -> void { c.run(); });
 
@@ -135,7 +135,7 @@ int test_delivery_tag() {
 
     std::string send_address =
         "127.0.0.1:" + std::to_string(listener_port) + "/test";
-    test_send send(send_address);
+    test_client send(send_address);
     proton::container(send).run();
     thread_recv.join();
 
diff --git a/cpp/src/link_test.cpp b/cpp/src/link_test.cpp
index 18188c6..cd9d931 100644
--- a/cpp/src/link_test.cpp
+++ b/cpp/src/link_test.cpp
@@ -20,12 +20,125 @@
 
 #include "test_bits.hpp"
 
-#include <proton/sender_options.hpp>
-#include <proton/receiver_options.hpp>
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
 #include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/listener.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender_options.hpp>
+#include <proton/source_options.hpp>
+#include <proton/target_options.hpp>
+#include <proton/types.hpp>
 
 #include <iostream>
 
+#include <condition_variable>
+#include <map>
+#include <mutex>
+#include <thread>
+
+typedef std::map<proton::symbol, proton::value> property_map;
+
+namespace {
+std::mutex m;
+std::condition_variable cv;
+bool listener_ready = false;
+int listener_port;
+const std::string DYNAMIC_ADDRESS = "test_dynamic_address";
+} // namespace
+
+class test_server : public proton::messaging_handler {
+  private:
+    class listener_ready_handler : public proton::listen_handler {
+        void on_open(proton::listener& l) override {
+            {
+                std::lock_guard<std::mutex> lk(m);
+                listener_port = l.port();
+                listener_ready = true;
+            }
+            cv.notify_one();
+        }
+    };
+
+    std::string url;
+    proton::listener listener;
+    listener_ready_handler listen_handler;
+
+  public:
+    test_server (const std::string& s) : url(s) {}
+
+    void on_container_start(proton::container& c) override {
+        listener = c.listen(url, listen_handler);
+    }
+
+    void on_sender_open(proton::sender& s) override {
+        ASSERT(s.source().dynamic());
+        ASSERT(s.source().address().empty());
+        property_map p = {{proton::symbol("supported-dist-modes"), proton::symbol("copy")}};
+        ASSERT_EQUAL(s.source().dynamic_properties(), p);
+
+        proton::source_options opts;
+        opts.address(DYNAMIC_ADDRESS)
+            // This fails due to a bug in the C++ bindings - PROTON-2480
+            // .dynamic(true)
+            .dynamic_properties(
+          property_map{{proton::symbol("supported-dist-modes"), proton::symbol("move")}}
+        );
+        s.open(proton::sender_options().source(opts));
+        listener.stop();
+    }
+};
+
+class test_client : public proton::messaging_handler {
+  private:
+    std::string url;
+
+  public:
+    test_client (const std::string& s) : url(s) {}
+
+    void on_container_start(proton::container& c) override {
+        proton::source_options opts;
+        opts.dynamic(true)
+            .dynamic_properties(
+              property_map{{proton::symbol{"supported-dist-modes"}, proton::symbol{"copy"}}}
+        );
+        c.open_receiver(url, proton::receiver_options().source(opts));
+    }
+
+    void on_receiver_open(proton::receiver& r) override {
+        // This fails due to a bug in the c++ bindings - PROTON-2480
+        // ASSERT(r.source().dynamic());
+        ASSERT_EQUAL(DYNAMIC_ADDRESS, r.source().address());
+        property_map m({{proton::symbol("supported-dist-modes"), proton::symbol("move")}});
+        ASSERT_EQUAL(m, r.source().dynamic_properties());
+
+        r.connection().close();
+    }
+};
+
+int test_dynamic_node_properties() {
+
+    std::string recv_address("127.0.0.1:0");
+    test_server server(recv_address);
+    proton::container c(server);
+    std::thread thread_recv([&c]() -> void { c.run(); });
+
+    // wait until listener is ready
+    std::unique_lock<std::mutex> lk(m);
+    cv.wait(lk, [] { return listener_ready; });
+
+    std::string send_address = "127.0.0.1:" + std::to_string(listener_port);
+    test_client client(send_address);
+    proton::container(client).run();
+    thread_recv.join();
+
+    return 0;
+}
+
 int test_link_name()
 {
     proton::container c;
@@ -48,5 +161,6 @@ int test_link_name()
 int main(int argc, char** argv) {
     int failed = 0;
     RUN_ARGV_TEST(failed, test_link_name());
+    RUN_ARGV_TEST(failed, test_dynamic_node_properties());
     return failed;
 }
diff --git a/cpp/src/node_options.cpp b/cpp/src/node_options.cpp
index 3b6d197..0e4d8a7 100644
--- a/cpp/src/node_options.cpp
+++ b/cpp/src/node_options.cpp
@@ -20,14 +20,17 @@
  */
 
 #include "proton/codec/vector.hpp"
+#include "proton/map.hpp"
 #include "proton/source.hpp"
 #include "proton/source_options.hpp"
 #include "proton/target.hpp"
 #include "proton/target_options.hpp"
+#include "proton/types.hpp"
 
 #include "proton_bits.hpp"
 
 #include <limits>
+#include <map>
 
 namespace proton {
 
@@ -97,6 +100,7 @@ class source_options::impl {
     option<enum source::distribution_mode> distribution_mode;
     option<source::filter_map> filters;
     option<std::vector<symbol> > capabilities;
+    option<std::map<symbol, value>> dynamic_properties;
 
     void apply(source& s) {
         node_address(s, address, dynamic, anonymous);
@@ -111,6 +115,11 @@ class source_options::impl {
         if (capabilities.set) {
             value(pn_terminus_capabilities(unwrap(s))) = capabilities.value;
         }
+        if (dynamic_properties.set) {
+            map<symbol, value> source_map;
+            get(dynamic_properties.value, source_map);
+            value(pn_terminus_properties(unwrap(s))) = source_map;
+        }
     }
 };
 
@@ -134,6 +143,10 @@ source_options& source_options::expiry_policy(enum source::expiry_policy m) { im
 source_options& source_options::distribution_mode(enum source::distribution_mode m) { impl_->distribution_mode = m; return *this; }
 source_options& source_options::filters(const source::filter_map &map) { impl_->filters = map; return *this; }
 source_options& source_options::capabilities(const std::vector<symbol>& c) { impl_->capabilities = c; return *this; }
+source_options& source_options::dynamic_properties(const std::map<symbol, value>& c) {
+    impl_->dynamic_properties = c;
+    return *this;
+}
 
 void source_options::apply(source& s) const { impl_->apply(s); }
 
@@ -148,6 +161,7 @@ class target_options::impl {
     option<duration> timeout;
     option<enum target::expiry_policy> expiry_policy;
     option<std::vector<symbol> > capabilities;
+    option<std::map<symbol, value>> dynamic_properties;
 
     void apply(target& t) {
         node_address(t, address, dynamic, anonymous);
@@ -156,6 +170,11 @@ class target_options::impl {
         if (capabilities.set) {
             value(pn_terminus_capabilities(unwrap(t))) = capabilities.value;
         }
+        if (dynamic_properties.set) {
+            map<symbol, value> target_map;
+            get(dynamic_properties.value, target_map);
+            value(pn_terminus_properties(unwrap(t))) = target_map;
+        }
     }
 };
 
@@ -177,6 +196,10 @@ target_options& target_options::durability_mode(enum target::durability_mode m)
 target_options& target_options::timeout(duration d) { impl_->timeout = d; return *this; }
 target_options& target_options::expiry_policy(enum target::expiry_policy m) { impl_->expiry_policy = m; return *this; }
 target_options& target_options::capabilities(const std::vector<symbol>& c) { impl_->capabilities = c; return *this; }
+target_options& target_options::dynamic_properties(const std::map<symbol, value>& c) {
+    impl_->dynamic_properties = c;
+    return *this;
+}
 
 void target_options::apply(target& s) const { impl_->apply(s); }
 
diff --git a/cpp/src/terminus.cpp b/cpp/src/terminus.cpp
index f04694f..5393bbe 100644
--- a/cpp/src/terminus.cpp
+++ b/cpp/src/terminus.cpp
@@ -19,10 +19,10 @@
  *
  */
 
-#include "proton/duration.hpp"
 #include "proton/terminus.hpp"
+#include "proton/duration.hpp"
+#include "proton/types.hpp"
 #include "proton/value.hpp"
-#include "proton/codec/vector.hpp"
 
 #include "proton_bits.hpp"
 
@@ -61,4 +61,12 @@ std::vector<symbol> terminus::capabilities() const {
     return caps.empty() ? std::vector<symbol>() : caps.get<std::vector<symbol> >();
 }
 
+std::map<symbol, value> terminus::dynamic_properties() const {
+    value props(pn_terminus_properties(object_));
+    std::map<symbol, value> properties;
+    if (!props.empty()) {
+        get(props, properties);
+    }
+    return properties;
+}
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org