You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/06/14 19:46:55 UTC
[1/2] qpid-proton git commit: PROTON-1679: [c++] terminus properties
are not echoed to returning open
Repository: qpid-proton
Updated Branches:
refs/heads/master 27f9aec21 -> 21eb6d8be
PROTON-1679: [c++] terminus properties are not echoed to returning open
On receiving a link-open, a server should (by default) echo back the same set
of source/target properties that it received. The on_..._open() callback can modify
those properties as desired.
This is a behaviour of the python client that got dropped in the port to C++.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9364588c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9364588c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9364588c
Branch: refs/heads/master
Commit: 9364588c43b32f87482a1aff5e5ea371af815710
Parents: 27f9aec
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jun 14 14:31:27 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jun 14 15:27:40 2018 -0400
----------------------------------------------------------------------
cpp/src/connection_driver_test.cpp | 10 ++++------
cpp/src/messaging_adapter.cpp | 5 +++++
2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9364588c/cpp/src/connection_driver_test.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/connection_driver_test.cpp b/cpp/src/connection_driver_test.cpp
index 9c385ee..5a4bfcb 100644
--- a/cpp/src/connection_driver_test.cpp
+++ b/cpp/src/connection_driver_test.cpp
@@ -358,15 +358,13 @@ void test_link_options() {
proton::sender ax = quick_pop(ha.senders);
ASSERT_EQUAL("_x", ax.name());
- // TODO PROTON-1679 - the following assertion should pass.
- // ASSERT_EQUAL("x", ax.target().address());
- // ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ax.target().capabilities());
+ ASSERT_EQUAL("x", ax.target().address());
+ ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ax.target().capabilities());
proton::receiver ay = quick_pop(ha.receivers);
ASSERT_EQUAL("_y", ay.name());
- // TODO PROTON-1679 - the following assertion should pass.
- // ASSERT_EQUAL("y", ay.source().address());
- // ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ay.source().capabilities());
+ ASSERT_EQUAL("y", ay.source().address());
+ ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ay.source().capabilities());
proton::receiver bx = quick_pop(hb.receivers);
ASSERT_EQUAL("x", bx.target().address());
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9364588c/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp
index 7895aa7..e01f29f 100644
--- a/cpp/src/messaging_adapter.cpp
+++ b/cpp/src/messaging_adapter.cpp
@@ -258,6 +258,11 @@ void on_link_local_open(messaging_handler& handler, pn_event_t* event) {
void on_link_remote_open(messaging_handler& handler, pn_event_t* event) {
pn_link_t *lnk = pn_event_link(event);
+ if (pn_link_state(lnk) & PN_LOCAL_UNINIT) { // Incoming link
+ // Copy source and target from remote end.
+ pn_terminus_copy(pn_link_source(lnk), pn_link_remote_source(lnk));
+ pn_terminus_copy(pn_link_target(lnk), pn_link_remote_target(lnk));
+ }
if (pn_link_is_receiver(lnk)) {
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_open(r);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: PROTON-1863: [cpp] need support for
anonymous termini
Posted by ac...@apache.org.
PROTON-1863: [cpp] need support for anonymous termini
Added anonymous() to source/target_options, sets the address to NULL.
Also fixed the dynamic() option to set the address to NULL.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/21eb6d8b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/21eb6d8b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/21eb6d8b
Branch: refs/heads/master
Commit: 21eb6d8beee5e7e42aaf51d34c566bf1e8764901
Parents: 9364588
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jun 14 14:45:07 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jun 14 15:27:46 2018 -0400
----------------------------------------------------------------------
cpp/include/proton/source_options.hpp | 5 +
cpp/include/proton/target_options.hpp | 7 +-
cpp/include/proton/terminus.hpp | 3 +
cpp/src/connection_driver_test.cpp | 148 +++++++++++++++++++++++------
cpp/src/node_options.cpp | 18 ++--
cpp/src/terminus.cpp | 4 +
6 files changed, 149 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/include/proton/source_options.hpp
----------------------------------------------------------------------
diff --git a/cpp/include/proton/source_options.hpp b/cpp/include/proton/source_options.hpp
index dd9d9d0..fe1c34a 100644
--- a/cpp/include/proton/source_options.hpp
+++ b/cpp/include/proton/source_options.hpp
@@ -63,6 +63,11 @@ class source_options {
/// ignored.
PN_CPP_EXTERN source_options& dynamic(bool);
+ /// Request an anonymous node on the remote peer.
+ /// The default is false. Any specified target address() is
+ /// ignored if true.
+ PN_CPP_EXTERN source_options& anonymous(bool);
+
/// Control whether messages are browsed or consumed. The
/// default is source::MOVE, meaning consumed.
PN_CPP_EXTERN source_options& distribution_mode(enum source::distribution_mode);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/include/proton/target_options.hpp
----------------------------------------------------------------------
diff --git a/cpp/include/proton/target_options.hpp b/cpp/include/proton/target_options.hpp
index f9b8895..834a185 100644
--- a/cpp/include/proton/target_options.hpp
+++ b/cpp/include/proton/target_options.hpp
@@ -60,9 +60,14 @@ class target_options {
/// Request that a node be dynamically created by the remote peer.
/// The default is false. Any specified target address() is
- /// ignored.
+ /// ignored if true.
PN_CPP_EXTERN target_options& dynamic(bool);
+ /// Request an anonymous node on the remote peer.
+ /// The default is false. Any specified target address() is
+ /// ignored if true.
+ PN_CPP_EXTERN target_options& anonymous(bool);
+
/// Control the persistence of the target node. The default is
/// target::NONDURABLE, meaning non-persistent.
PN_CPP_EXTERN target_options& durability_mode(enum target::durability_mode);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/include/proton/terminus.hpp
----------------------------------------------------------------------
diff --git a/cpp/include/proton/terminus.hpp b/cpp/include/proton/terminus.hpp
index d0f755c..5b9c684 100644
--- a/cpp/include/proton/terminus.hpp
+++ b/cpp/include/proton/terminus.hpp
@@ -93,6 +93,9 @@ class terminus {
/// True if the remote node is created dynamically.
PN_CPP_EXTERN bool dynamic() const;
+ /// True if the remote node is an anonymous-relay
+ PN_CPP_EXTERN bool anonymous() const;
+
/// Obtain a reference to the AMQP dynamic node properties for the
/// terminus. See also lifetime_policy.
PN_CPP_EXTERN value node_properties() const;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/src/connection_driver_test.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/connection_driver_test.cpp b/cpp/src/connection_driver_test.cpp
index 5a4bfcb..5be857e 100644
--- a/cpp/src/connection_driver_test.cpp
+++ b/cpp/src/connection_driver_test.cpp
@@ -169,6 +169,8 @@ struct record_handler : public messaging_handler {
std::deque<std::string> unhandled_errors, transport_errors, connection_errors;
std::deque<proton::message> messages;
+ size_t link_count() const { return senders.size() + receivers.size(); }
+
void on_receiver_open(receiver &l) PN_CPP_OVERRIDE {
messaging_handler::on_receiver_open(l);
receivers.push_back(l);
@@ -331,55 +333,143 @@ void test_spin_interrupt() {
} catch (const test::error&) {}
}
-void test_link_options() {
- // Propagation of link and terminus properties
+#define ASSERT_ADDR(ADDR, TERMINUS) do { \
+ ASSERT_EQUAL((ADDR), (TERMINUS).address()); \
+ if ((ADDR) == std::string()) ASSERT((TERMINUS).anonymous()); \
+ else ASSERT(!(TERMINUS).anonymous()); \
+ } while(0);
+
+#define ASSERT_LINK(SRC, TGT, LINK) do { \
+ ASSERT_ADDR((SRC), (LINK).source()); \
+ ASSERT_ADDR((TGT), (LINK).target()); \
+ } while(0);
+
+void test_link_address() {
+ record_handler ha, hb;
+ driver_pair d(ha, hb);
+
+ // FIXME aconway 2018-06-14: also fixes PROTON-1679?
+
+ // Using open(address, opts)
+ d.a.connection().open_sender("tx", sender_options().name("_x").source(source_options().address("sx")));
+ d.a.connection().open_receiver("sy", receiver_options().name("_y").target(target_options().address("ty")));
+ while (ha.link_count()+hb.link_count() < 4) d.process();
+
+ proton::sender ax = quick_pop(ha.senders);
+ ASSERT_EQUAL("_x", ax.name());
+ ASSERT_LINK("sx", "tx", ax);
+ proton::receiver bx = quick_pop(hb.receivers);
+ ASSERT_EQUAL("_x", bx.name());
+ ASSERT_LINK("sx", "tx", bx);
+
+ proton::receiver ay = quick_pop(ha.receivers);
+ ASSERT_EQUAL("_y", ay.name());
+ ASSERT_LINK("sy", "ty", ay);
+ proton::sender by = quick_pop(hb.senders);
+ ASSERT_EQUAL("_y", by.name());
+ ASSERT_LINK("sy", "ty", by);
+
+ // Override address parameter in opts
+ d.a.connection().open_sender("x", sender_options().target(target_options().address("X")));
+ d.a.connection().open_receiver("y", receiver_options().source(source_options().address("Y")));
+ while (ha.link_count()+hb.link_count() < 4) d.process();
+
+ ax = quick_pop(ha.senders);
+ ASSERT_LINK("", "X", ax);
+ bx = quick_pop(hb.receivers);
+ ASSERT_LINK("", "X", bx);
+
+ ay = quick_pop(ha.receivers);
+ ASSERT_LINK("Y", "", ay);
+ by = quick_pop(hb.senders);
+ ASSERT_LINK("Y", "", by);
+}
+
+void test_link_anonymous_dynamic() {
record_handler ha, hb;
driver_pair d(ha, hb);
+ // Anonymous link should have NULL address
+ d.a.connection().open_sender("x", sender_options().target(target_options().anonymous(true)));
+ d.a.connection().open_receiver("y", receiver_options().source(source_options().anonymous(true)));
+ while (ha.link_count()+hb.link_count() < 4) d.process();
+
+ proton::sender ax = quick_pop(ha.senders);
+ ASSERT_LINK("", "", ax);
+ proton::receiver bx = quick_pop(hb.receivers);
+ ASSERT_LINK("", "", bx);
+
+ proton::receiver ay = quick_pop(ha.receivers);
+ ASSERT_LINK("", "", ay);
+ proton::sender by = quick_pop(hb.senders);
+ ASSERT_LINK("", "", by);
+
+ // Dynamic link should have NULL address and dynamic flag
+ d.a.connection().open_sender("x", sender_options().target(target_options().dynamic(true)));
+ d.a.connection().open_receiver("y", receiver_options().source(source_options().dynamic(true)));
+ while (ha.link_count()+hb.link_count() < 4) d.process();
+
+ ax = quick_pop(ha.senders);
+ ASSERT(ax.target().dynamic());
+ ASSERT_LINK("", "", ax);
+ bx = quick_pop(hb.receivers);
+ ASSERT(bx.target().dynamic());
+ ASSERT_LINK("", "", bx);
+
+ ay = quick_pop(ha.receivers);
+ ASSERT(ay.source().dynamic());
+ ASSERT_LINK("", "", ay);
+ by = quick_pop(hb.senders);
+ ASSERT(by.source().dynamic());
+ ASSERT_LINK("", "", by);
+
+ // Empty string as a link address is allowed and not considered anonymous.
+ d.a.connection().open_sender("", sender_options());
+ d.a.connection().open_receiver("", receiver_options());
+ while (ha.link_count()+hb.link_count() < 4) d.process();
+
+ ax = quick_pop(ha.senders);
+ ASSERT(ax.target().address().empty());
+ ASSERT(!ax.target().anonymous());
+
+ ay = quick_pop(ha.receivers);
+ ASSERT(ay.source().address().empty());
+ ASSERT(!ay.source().anonymous());
+}
+
+void test_link_capability_filter() {
+ record_handler ha, hb;
+ driver_pair d(ha, hb);
+
+ // Capabilities and filters
std::vector<proton::symbol> caps;
caps.push_back("foo");
caps.push_back("bar");
- source::filter_map f;
- f.put("xx", "xxx");
- ASSERT_EQUAL(1U, f.size());
- d.a.connection().open_sender(
- "x", sender_options().name("_x").target(target_options().capabilities(caps)));
-
- f.clear();
- f.put("yy", "yyy");
- ASSERT_EQUAL(1U, f.size());
- d.a.connection().open_receiver(
- "y", receiver_options().name("_y").source(source_options().filters(f).capabilities(caps)));
+ d.a.connection().open_sender("x", sender_options().target(target_options().capabilities(caps)));
- while (ha.senders.size()+ha.receivers.size() < 2 ||
- hb.senders.size()+hb.receivers.size() < 2)
- d.process();
+ source::filter_map f;
+ f.put("1", "11");
+ f.put("2", "22");
+ d.a.connection().open_receiver("y", receiver_options().source(source_options().filters(f).capabilities(caps)));
+ while (ha.link_count()+hb.link_count() < 4) d.process();
proton::sender ax = quick_pop(ha.senders);
- ASSERT_EQUAL("_x", ax.name());
- ASSERT_EQUAL("x", ax.target().address());
ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ax.target().capabilities());
proton::receiver ay = quick_pop(ha.receivers);
- ASSERT_EQUAL("_y", ay.name());
- ASSERT_EQUAL("y", ay.source().address());
ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ay.source().capabilities());
proton::receiver bx = quick_pop(hb.receivers);
- ASSERT_EQUAL("x", bx.target().address());
ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", bx.target().capabilities());
- ASSERT_EQUAL("_x", bx.name());
- ASSERT_EQUAL("", bx.source().address());
ASSERT_EQUAL(many<proton::symbol>(), bx.source().capabilities());
proton::sender by = quick_pop(hb.senders);
- ASSERT_EQUAL("y", by.source().address());
ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", by.source().capabilities());
- ASSERT_EQUAL("_y", by.name());
f = by.source().filters();
- ASSERT_EQUAL(1U, f.size());
- ASSERT_EQUAL(value("yyy"), f.get("yy"));
+ ASSERT_EQUAL(2U, f.size());
+ ASSERT_EQUAL(value("11"), f.get("1"));
+ ASSERT_EQUAL(value("22"), f.get("2"));
}
void test_message() {
@@ -457,7 +547,9 @@ int main(int argc, char** argv) {
RUN_ARGV_TEST(failed, test_driver_disconnected());
RUN_ARGV_TEST(failed, test_no_container());
RUN_ARGV_TEST(failed, test_spin_interrupt());
- RUN_ARGV_TEST(failed, test_link_options());
+ RUN_ARGV_TEST(failed, test_link_address());
+ RUN_ARGV_TEST(failed, test_link_anonymous_dynamic());
+ RUN_ARGV_TEST(failed, test_link_capability_filter());
RUN_ARGV_TEST(failed, test_message());
RUN_ARGV_TEST(failed, test_message_timeout_succeed());
RUN_ARGV_TEST(failed, test_message_timeout_fail());
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/src/node_options.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/node_options.cpp b/cpp/src/node_options.cpp
index 2156509..3b6d197 100644
--- a/cpp/src/node_options.cpp
+++ b/cpp/src/node_options.cpp
@@ -63,13 +63,13 @@ namespace {
// Options common to sources and targets
-void node_address(terminus &t, option<std::string> &addr, option<bool> &dynamic) {
+void node_address(terminus &t, option<std::string> &addr, option<bool> &dynamic, option<bool> &anonymous) {
if (dynamic.set && dynamic.value) {
pn_terminus_set_dynamic(unwrap(t), true);
- // Ignore any addr value for dynamic.
- return;
- }
- if (addr.set) {
+ pn_terminus_set_address(unwrap(t), NULL);
+ } else if (anonymous.set && anonymous.value) {
+ pn_terminus_set_address(unwrap(t), NULL);
+ } else if (addr.set) {
pn_terminus_set_address(unwrap(t), addr.value.c_str());
}
}
@@ -90,6 +90,7 @@ class source_options::impl {
public:
option<std::string> address;
option<bool> dynamic;
+ option<bool> anonymous;
option<enum source::durability_mode> durability_mode;
option<duration> timeout;
option<enum source::expiry_policy> expiry_policy;
@@ -98,7 +99,7 @@ class source_options::impl {
option<std::vector<symbol> > capabilities;
void apply(source& s) {
- node_address(s, address, dynamic);
+ node_address(s, address, dynamic, anonymous);
node_durability(s, durability_mode);
node_expiry(s, expiry_policy, timeout);
if (distribution_mode.set)
@@ -126,6 +127,7 @@ source_options& source_options::operator=(const source_options& x) {
source_options& source_options::address(const std::string &addr) { impl_->address = addr; return *this; }
source_options& source_options::dynamic(bool b) { impl_->dynamic = b; return *this; }
+source_options& source_options::anonymous(bool b) { impl_->anonymous = b; return *this; }
source_options& source_options::durability_mode(enum source::durability_mode m) { impl_->durability_mode = m; return *this; }
source_options& source_options::timeout(duration d) { impl_->timeout = d; return *this; }
source_options& source_options::expiry_policy(enum source::expiry_policy m) { impl_->expiry_policy = m; return *this; }
@@ -141,13 +143,14 @@ class target_options::impl {
public:
option<std::string> address;
option<bool> dynamic;
+ option<bool> anonymous;
option<enum target::durability_mode> durability_mode;
option<duration> timeout;
option<enum target::expiry_policy> expiry_policy;
option<std::vector<symbol> > capabilities;
void apply(target& t) {
- node_address(t, address, dynamic);
+ node_address(t, address, dynamic, anonymous);
node_durability(t, durability_mode);
node_expiry(t, expiry_policy, timeout);
if (capabilities.set) {
@@ -169,6 +172,7 @@ target_options& target_options::operator=(const target_options& x) {
target_options& target_options::address(const std::string &addr) { impl_->address = addr; return *this; }
target_options& target_options::dynamic(bool b) { impl_->dynamic = b; return *this; }
+target_options& target_options::anonymous(bool b) { impl_->anonymous = b; return *this; }
target_options& target_options::durability_mode(enum target::durability_mode m) { impl_->durability_mode = m; return *this; }
target_options& target_options::timeout(duration d) { impl_->timeout = d; return *this; }
target_options& target_options::expiry_policy(enum target::expiry_policy m) { impl_->expiry_policy = m; return *this; }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/src/terminus.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/terminus.cpp b/cpp/src/terminus.cpp
index 413c909..f04694f 100644
--- a/cpp/src/terminus.cpp
+++ b/cpp/src/terminus.cpp
@@ -48,6 +48,10 @@ bool terminus::dynamic() const {
return pn_terminus_is_dynamic(object_);
}
+bool terminus::anonymous() const {
+ return pn_terminus_get_address(object_) == NULL;
+}
+
value terminus::node_properties() const {
return value(pn_terminus_properties(object_));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org