You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2021/12/23 22:06:27 UTC
[qpid-proton] branch main updated: PROTON-2308: Add support for setting Dynamic Node Properties
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new 596696a PROTON-2308: Add support for setting Dynamic Node Properties
596696a is described below
commit 596696aede355d1e53832a41fbebb59d0b71d5f1
Author: Rakhi Kumari <ra...@gmail.com>
AuthorDate: Wed Dec 8 16:54:01 2021 +0530
PROTON-2308: Add support for setting Dynamic Node Properties
[Test fixed up by commiter] Closes #346
---
cpp/include/proton/source_options.hpp | 4 ++
cpp/include/proton/target_options.hpp | 4 ++
cpp/include/proton/terminus.hpp | 5 ++
cpp/src/delivery_test.cpp | 12 ++--
cpp/src/link_test.cpp | 118 +++++++++++++++++++++++++++++++++-
cpp/src/node_options.cpp | 23 +++++++
cpp/src/terminus.cpp | 12 +++-
7 files changed, 168 insertions(+), 10 deletions(-)
diff --git a/cpp/include/proton/source_options.hpp b/cpp/include/proton/source_options.hpp
index fe1c34a..b221d72 100644
--- a/cpp/include/proton/source_options.hpp
+++ b/cpp/include/proton/source_options.hpp
@@ -27,6 +27,7 @@
#include "./duration.hpp"
#include "./source.hpp"
+#include <map>
#include <string>
/// @file
@@ -92,6 +93,9 @@ class source_options {
/// Extension capabilities that are supported/requested
PN_CPP_EXTERN source_options& capabilities(const std::vector<symbol>&);
+ /// Set the dynamic node properties.
+ PN_CPP_EXTERN source_options& dynamic_properties(const std::map<symbol, value>&);
+
private:
void apply(source&) const;
diff --git a/cpp/include/proton/target_options.hpp b/cpp/include/proton/target_options.hpp
index 834a185..1e24051 100644
--- a/cpp/include/proton/target_options.hpp
+++ b/cpp/include/proton/target_options.hpp
@@ -27,6 +27,7 @@
#include "./duration.hpp"
#include "./target.hpp"
+#include <map>
#include <string>
/// @file
@@ -83,6 +84,9 @@ class target_options {
/// Extension capabilities that are supported/requested
PN_CPP_EXTERN target_options& capabilities(const std::vector<symbol>&);
+ /// Set the dynamic node properties.
+ PN_CPP_EXTERN target_options& dynamic_properties(const std::map<symbol, value>&);
+
private:
void apply(target&) const;
diff --git a/cpp/include/proton/terminus.hpp b/cpp/include/proton/terminus.hpp
index 5b9c684..ca10d2c 100644
--- a/cpp/include/proton/terminus.hpp
+++ b/cpp/include/proton/terminus.hpp
@@ -27,6 +27,7 @@
#include <proton/terminus.h>
+#include <map>
#include <string>
#include <vector>
@@ -103,6 +104,10 @@ class terminus {
/// Extension capabilities that are supported/requested
PN_CPP_EXTERN std::vector<symbol> capabilities() const;
+ /// Obtain the AMQP dynamic node properties for the
+ /// terminus as a standard map.
+ PN_CPP_EXTERN std::map<symbol, value> dynamic_properties() const;
+
protected:
pn_terminus_t *pn_object() const { return object_; }
private:
diff --git a/cpp/src/delivery_test.cpp b/cpp/src/delivery_test.cpp
index af6441c..9765223 100644
--- a/cpp/src/delivery_test.cpp
+++ b/cpp/src/delivery_test.cpp
@@ -56,7 +56,7 @@ int tracker_settle_counter;
proton::binary test_tag("TESTTAG");
} // namespace
-class test_recv : public proton::messaging_handler {
+class test_server : public proton::messaging_handler {
private:
class listener_ready_handler : public proton::listen_handler {
void on_open(proton::listener &l) override {
@@ -74,7 +74,7 @@ class test_recv : public proton::messaging_handler {
listener_ready_handler listen_handler;
public:
- test_recv(const std::string &s) : url(s) {}
+ test_server (const std::string &s) : url(s) {}
void on_container_start(proton::container &c) override {
listener = c.listen(url, listen_handler);
@@ -88,13 +88,13 @@ class test_recv : public proton::messaging_handler {
}
};
-class test_send : public proton::messaging_handler {
+class test_client : public proton::messaging_handler {
private:
std::string url;
proton::sender sender;
public:
- test_send(const std::string &s) : url(s) {}
+ test_client (const std::string &s) : url(s) {}
void on_container_start(proton::container &c) override {
proton::connection_options co;
@@ -125,7 +125,7 @@ int test_delivery_tag() {
tracker_settle_counter = 0;
std::string recv_address("127.0.0.1:0/test");
- test_recv recv(recv_address);
+ test_server recv(recv_address);
proton::container c(recv);
std::thread thread_recv([&c]() -> void { c.run(); });
@@ -135,7 +135,7 @@ int test_delivery_tag() {
std::string send_address =
"127.0.0.1:" + std::to_string(listener_port) + "/test";
- test_send send(send_address);
+ test_client send(send_address);
proton::container(send).run();
thread_recv.join();
diff --git a/cpp/src/link_test.cpp b/cpp/src/link_test.cpp
index 18188c6..cd9d931 100644
--- a/cpp/src/link_test.cpp
+++ b/cpp/src/link_test.cpp
@@ -20,12 +20,125 @@
#include "test_bits.hpp"
-#include <proton/sender_options.hpp>
-#include <proton/receiver_options.hpp>
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/listener.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender_options.hpp>
+#include <proton/source_options.hpp>
+#include <proton/target_options.hpp>
+#include <proton/types.hpp>
#include <iostream>
+#include <condition_variable>
+#include <map>
+#include <mutex>
+#include <thread>
+
+typedef std::map<proton::symbol, proton::value> property_map;
+
+namespace {
+std::mutex m;
+std::condition_variable cv;
+bool listener_ready = false;
+int listener_port;
+const std::string DYNAMIC_ADDRESS = "test_dynamic_address";
+} // namespace
+
+class test_server : public proton::messaging_handler {
+ private:
+ class listener_ready_handler : public proton::listen_handler {
+ void on_open(proton::listener& l) override {
+ {
+ std::lock_guard<std::mutex> lk(m);
+ listener_port = l.port();
+ listener_ready = true;
+ }
+ cv.notify_one();
+ }
+ };
+
+ std::string url;
+ proton::listener listener;
+ listener_ready_handler listen_handler;
+
+ public:
+ test_server (const std::string& s) : url(s) {}
+
+ void on_container_start(proton::container& c) override {
+ listener = c.listen(url, listen_handler);
+ }
+
+ void on_sender_open(proton::sender& s) override {
+ ASSERT(s.source().dynamic());
+ ASSERT(s.source().address().empty());
+ property_map p = {{proton::symbol("supported-dist-modes"), proton::symbol("copy")}};
+ ASSERT_EQUAL(s.source().dynamic_properties(), p);
+
+ proton::source_options opts;
+ opts.address(DYNAMIC_ADDRESS)
+ // This fails due to a bug in the C++ bindings - PROTON-2480
+ // .dynamic(true)
+ .dynamic_properties(
+ property_map{{proton::symbol("supported-dist-modes"), proton::symbol("move")}}
+ );
+ s.open(proton::sender_options().source(opts));
+ listener.stop();
+ }
+};
+
+class test_client : public proton::messaging_handler {
+ private:
+ std::string url;
+
+ public:
+ test_client (const std::string& s) : url(s) {}
+
+ void on_container_start(proton::container& c) override {
+ proton::source_options opts;
+ opts.dynamic(true)
+ .dynamic_properties(
+ property_map{{proton::symbol{"supported-dist-modes"}, proton::symbol{"copy"}}}
+ );
+ c.open_receiver(url, proton::receiver_options().source(opts));
+ }
+
+ void on_receiver_open(proton::receiver& r) override {
+ // This fails due to a bug in the c++ bindings - PROTON-2480
+ // ASSERT(r.source().dynamic());
+ ASSERT_EQUAL(DYNAMIC_ADDRESS, r.source().address());
+ property_map m({{proton::symbol("supported-dist-modes"), proton::symbol("move")}});
+ ASSERT_EQUAL(m, r.source().dynamic_properties());
+
+ r.connection().close();
+ }
+};
+
+int test_dynamic_node_properties() {
+
+ std::string recv_address("127.0.0.1:0");
+ test_server server(recv_address);
+ proton::container c(server);
+ std::thread thread_recv([&c]() -> void { c.run(); });
+
+ // wait until listener is ready
+ std::unique_lock<std::mutex> lk(m);
+ cv.wait(lk, [] { return listener_ready; });
+
+ std::string send_address = "127.0.0.1:" + std::to_string(listener_port);
+ test_client client(send_address);
+ proton::container(client).run();
+ thread_recv.join();
+
+ return 0;
+}
+
int test_link_name()
{
proton::container c;
@@ -48,5 +161,6 @@ int test_link_name()
int main(int argc, char** argv) {
int failed = 0;
RUN_ARGV_TEST(failed, test_link_name());
+ RUN_ARGV_TEST(failed, test_dynamic_node_properties());
return failed;
}
diff --git a/cpp/src/node_options.cpp b/cpp/src/node_options.cpp
index 3b6d197..0e4d8a7 100644
--- a/cpp/src/node_options.cpp
+++ b/cpp/src/node_options.cpp
@@ -20,14 +20,17 @@
*/
#include "proton/codec/vector.hpp"
+#include "proton/map.hpp"
#include "proton/source.hpp"
#include "proton/source_options.hpp"
#include "proton/target.hpp"
#include "proton/target_options.hpp"
+#include "proton/types.hpp"
#include "proton_bits.hpp"
#include <limits>
+#include <map>
namespace proton {
@@ -97,6 +100,7 @@ class source_options::impl {
option<enum source::distribution_mode> distribution_mode;
option<source::filter_map> filters;
option<std::vector<symbol> > capabilities;
+ option<std::map<symbol, value>> dynamic_properties;
void apply(source& s) {
node_address(s, address, dynamic, anonymous);
@@ -111,6 +115,11 @@ class source_options::impl {
if (capabilities.set) {
value(pn_terminus_capabilities(unwrap(s))) = capabilities.value;
}
+ if (dynamic_properties.set) {
+ map<symbol, value> source_map;
+ get(dynamic_properties.value, source_map);
+ value(pn_terminus_properties(unwrap(s))) = source_map;
+ }
}
};
@@ -134,6 +143,10 @@ source_options& source_options::expiry_policy(enum source::expiry_policy m) { im
source_options& source_options::distribution_mode(enum source::distribution_mode m) { impl_->distribution_mode = m; return *this; }
source_options& source_options::filters(const source::filter_map &map) { impl_->filters = map; return *this; }
source_options& source_options::capabilities(const std::vector<symbol>& c) { impl_->capabilities = c; return *this; }
+source_options& source_options::dynamic_properties(const std::map<symbol, value>& c) {
+ impl_->dynamic_properties = c;
+ return *this;
+}
void source_options::apply(source& s) const { impl_->apply(s); }
@@ -148,6 +161,7 @@ class target_options::impl {
option<duration> timeout;
option<enum target::expiry_policy> expiry_policy;
option<std::vector<symbol> > capabilities;
+ option<std::map<symbol, value>> dynamic_properties;
void apply(target& t) {
node_address(t, address, dynamic, anonymous);
@@ -156,6 +170,11 @@ class target_options::impl {
if (capabilities.set) {
value(pn_terminus_capabilities(unwrap(t))) = capabilities.value;
}
+ if (dynamic_properties.set) {
+ map<symbol, value> target_map;
+ get(dynamic_properties.value, target_map);
+ value(pn_terminus_properties(unwrap(t))) = target_map;
+ }
}
};
@@ -177,6 +196,10 @@ target_options& target_options::durability_mode(enum target::durability_mode m)
target_options& target_options::timeout(duration d) { impl_->timeout = d; return *this; }
target_options& target_options::expiry_policy(enum target::expiry_policy m) { impl_->expiry_policy = m; return *this; }
target_options& target_options::capabilities(const std::vector<symbol>& c) { impl_->capabilities = c; return *this; }
+target_options& target_options::dynamic_properties(const std::map<symbol, value>& c) {
+ impl_->dynamic_properties = c;
+ return *this;
+}
void target_options::apply(target& s) const { impl_->apply(s); }
diff --git a/cpp/src/terminus.cpp b/cpp/src/terminus.cpp
index f04694f..5393bbe 100644
--- a/cpp/src/terminus.cpp
+++ b/cpp/src/terminus.cpp
@@ -19,10 +19,10 @@
*
*/
-#include "proton/duration.hpp"
#include "proton/terminus.hpp"
+#include "proton/duration.hpp"
+#include "proton/types.hpp"
#include "proton/value.hpp"
-#include "proton/codec/vector.hpp"
#include "proton_bits.hpp"
@@ -61,4 +61,12 @@ std::vector<symbol> terminus::capabilities() const {
return caps.empty() ? std::vector<symbol>() : caps.get<std::vector<symbol> >();
}
+std::map<symbol, value> terminus::dynamic_properties() const {
+ value props(pn_terminus_properties(object_));
+ std::map<symbol, value> properties;
+ if (!props.empty()) {
+ get(props, properties);
+ }
+ return properties;
+}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org