You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2016/04/29 04:40:01 UTC
qpid-proton git commit: PROTON-1183: source::filters moved from
receiver
Repository: qpid-proton
Updated Branches:
refs/heads/master a39d6ac5e -> 6e6cb0499
PROTON-1183: source::filters moved from receiver
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6e6cb049
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6e6cb049
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6e6cb049
Branch: refs/heads/master
Commit: 6e6cb049901ea9f319adaddb7e91f77f740f6f78
Parents: a39d6ac
Author: Clifford Jansen <cl...@apache.org>
Authored: Thu Apr 28 19:34:44 2016 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Thu Apr 28 19:34:44 2016 -0700
----------------------------------------------------------------------
examples/cpp/selected_recv.cpp | 29 ++++++++++++++++++--
.../cpp/include/proton/receiver_options.hpp | 9 ------
proton-c/bindings/cpp/include/proton/source.hpp | 9 ++++++
.../cpp/include/proton/source_options.hpp | 8 ++++--
.../bindings/cpp/include/proton/terminus.hpp | 21 +++-----------
proton-c/bindings/cpp/src/node_options.cpp | 8 ++++++
proton-c/bindings/cpp/src/receiver_options.cpp | 1 -
proton-c/bindings/cpp/src/source.cpp | 10 +++++++
proton-c/bindings/cpp/src/terminus.cpp | 15 ++++------
9 files changed, 70 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/examples/cpp/selected_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/selected_recv.cpp b/examples/cpp/selected_recv.cpp
index f02760f..c450cba 100644
--- a/examples/cpp/selected_recv.cpp
+++ b/examples/cpp/selected_recv.cpp
@@ -24,11 +24,35 @@
#include "proton/handler.hpp"
#include "proton/url.hpp"
#include "proton/receiver_options.hpp"
+#include "proton/source_options.hpp"
#include <iostream>
#include "fake_cpp11.hpp"
+namespace {
+
+ // Example custom function to configure an AMQP filter,
+ // specifically an APACHE.ORG:SELECTOR
+ // (http://www.amqp.org/specification/1.0/filters)
+
+ void set_filter(proton::source_options &opts, const std::string& selector_str) {
+ proton::source::filter_map map;
+ proton::symbol filter_key("selector");
+ proton::value filter_value;
+ // The value is a specific AMQP "described type": binary string with symbolic descriptor
+ proton::codec::encoder enc(filter_value);
+ enc << proton::codec::start::described()
+ << proton::symbol("apache.org:selector-filter:string")
+ << proton::binary(selector_str)
+ << proton::codec::finish();
+ // In our case, the map has this one element
+ map[filter_key] = filter_value;
+ opts.filters(map);
+ }
+}
+
+
class selected_recv : public proton::handler {
private:
proton::url url;
@@ -37,9 +61,10 @@ class selected_recv : public proton::handler {
selected_recv(const proton::url& u) : url(u) {}
void on_container_start(proton::container &c) override {
+ proton::source_options custom_selector;
+ set_filter(custom_selector, "colour = 'green'");
proton::connection conn = c.connect(url);
- // Note: the following signature is changing in Proton 0.13
- conn.open_receiver(url.path(), proton::receiver_options().selector("colour = 'green'"));
+ conn.open_receiver(url.path(), proton::receiver_options().source(custom_selector));
}
void on_message(proton::delivery &d, proton::message &m) override {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/include/proton/receiver_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/receiver_options.hpp b/proton-c/bindings/cpp/include/proton/receiver_options.hpp
index bb9ac3f..babc602 100644
--- a/proton-c/bindings/cpp/include/proton/receiver_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/receiver_options.hpp
@@ -91,15 +91,6 @@ class receiver_options {
/// Set the delivery mode on the receiver.
PN_CPP_EXTERN receiver_options& delivery_mode(delivery_mode);
- /// @cond INTERNAL
- /// XXX need to discuss spec issues, jms versus amqp filters
- ///
- /// Set a selector on the receiver to str. This sets a single
- /// registered filter on the link of type
- /// apache.org:selector-filter with value str.
- PN_CPP_EXTERN receiver_options& selector(const std::string&);
- /// @endcond
-
/// Automatically accept inbound messages that aren't otherwise
/// released, rejected or modified (default value:true).
PN_CPP_EXTERN receiver_options& auto_accept(bool);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/include/proton/source.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/source.hpp b/proton-c/bindings/cpp/include/proton/source.hpp
index 218610d..ef6f1a3 100644
--- a/proton-c/bindings/cpp/include/proton/source.hpp
+++ b/proton-c/bindings/cpp/include/proton/source.hpp
@@ -26,6 +26,7 @@
#include "proton/object.hpp"
#include "proton/value.hpp"
#include "proton/terminus.hpp"
+#include <proton/map.hpp>
#include <string>
@@ -40,8 +41,16 @@ class receiver;
/// @see proton::sender proton::receiver proton::target
class source : public terminus {
public:
+ /// A map of AMQP symbol keys and filter specifiers.
+ typedef std::map<symbol, value> filter_map;
+
source() : terminus() {}
+
+ /// The address of the source.
PN_CPP_EXTERN std::string address() const;
+
+ /// Obtain the set of message filters.
+ PN_CPP_EXTERN filter_map filters() const;
/// @cond INTERNAL
private:
source(pn_terminus_t* t);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/include/proton/source_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/source_options.hpp b/proton-c/bindings/cpp/include/proton/source_options.hpp
index 0dc945b..b1676b7 100644
--- a/proton-c/bindings/cpp/include/proton/source_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/source_options.hpp
@@ -27,7 +27,7 @@
#include "proton/pn_unique_ptr.hpp"
#include "proton/types.hpp"
#include "proton/delivery_mode.hpp"
-#include "proton/terminus.hpp"
+#include "proton/source.hpp"
#include <vector>
#include <string>
@@ -35,7 +35,7 @@
namespace proton {
class proton_handler;
-class source;
+
/// Options for creating a source node for a sender or receiver.
///
@@ -76,6 +76,10 @@ class source_options {
/// Control when the clock for expiration begins.
PN_CPP_EXTERN source_options& expiry_policy(enum expiry_policy);
+ /// Specify a filter mechanism on the source that restricts
+ /// message flow to a subset of the available messages.
+ PN_CPP_EXTERN source_options& filters(const source::filter_map&);
+
/// @cond INTERNAL
private:
void apply(source&) const;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/include/proton/terminus.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/terminus.hpp b/proton-c/bindings/cpp/include/proton/terminus.hpp
index d73717b..7ee60f4 100644
--- a/proton-c/bindings/cpp/include/proton/terminus.hpp
+++ b/proton-c/bindings/cpp/include/proton/terminus.hpp
@@ -32,10 +32,6 @@
namespace proton {
-namespace internal {
-class noderef;
-}
-
/// One end of a link, either a source or a target.
///
/// The source terminus is where messages originate; the target
@@ -63,32 +59,23 @@ class terminus {
/// Get the durability flag.
PN_CPP_EXTERN enum durability_mode durability_mode();
- /// Get the source or target node's address.
- PN_CPP_EXTERN std::string address() const;
-
/// True if the remote node is created dynamically.
PN_CPP_EXTERN bool dynamic() const;
/// Obtain a reference to the AMQP dynamic node properties for the
/// terminus. See also lifetime_policy.
- PN_CPP_EXTERN const value& node_properties() const;
-
- /// Obtain a reference to the AMQP filter set for the terminus.
- /// See also selector.
- PN_CPP_EXTERN const value& filter() const;
+ PN_CPP_EXTERN value node_properties() const;
/// @cond INTERNAL
protected:
pn_terminus_t *pn_object() { return object_; }
private:
pn_terminus_t* object_;
- value properties_, filter_;
pn_link_t* parent_;
-
- friend class internal::factory<terminus>;
- friend class source;
- friend class target;
+ friend class internal::factory<terminus>;
+ friend class source;
+ friend class target;
/// @endcond
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/src/node_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/node_options.cpp b/proton-c/bindings/cpp/src/node_options.cpp
index b8438fa..7eebec6 100644
--- a/proton-c/bindings/cpp/src/node_options.cpp
+++ b/proton-c/bindings/cpp/src/node_options.cpp
@@ -96,6 +96,7 @@ class source_options::impl {
option<duration> timeout;
option<enum expiry_policy> expiry_policy;
option<enum distribution_mode> distribution_mode;
+ option<source::filter_map> filters;
void apply(source& s) {
node_address(s, address, dynamic);
@@ -103,6 +104,11 @@ class source_options::impl {
node_expiry(s, expiry_policy, timeout);
if (distribution_mode.set)
pn_terminus_set_distribution_mode(unwrap(s), pn_distribution_mode_t(distribution_mode.value));
+ if (filters.set && !filters.value.empty()) {
+ // Applied at most once via source_option. No need to clear.
+ codec::encoder e(pn_terminus_filter(unwrap(s)));
+ e << filters.value;
+ }
}
void update(const impl& x) {
@@ -112,6 +118,7 @@ class source_options::impl {
timeout.update(x.timeout);
expiry_policy.update(x.expiry_policy);
distribution_mode.update(x.distribution_mode);
+ filters.update(x.filters);
}
};
@@ -135,6 +142,7 @@ source_options& source_options::durability_mode(enum durability_mode m) { impl_-
source_options& source_options::timeout(duration d) { impl_->timeout = d; return *this; }
source_options& source_options::expiry_policy(enum expiry_policy m) { impl_->expiry_policy = m; return *this; }
source_options& source_options::distribution_mode(enum distribution_mode m) { impl_->distribution_mode = m; return *this; }
+source_options& source_options::filters(const source::filter_map &map) { impl_->filters = map; return *this; }
void source_options::apply(source& s) const { impl_->apply(s); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/src/receiver_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp
index e2eb416..733bd00 100644
--- a/proton-c/bindings/cpp/src/receiver_options.cpp
+++ b/proton-c/bindings/cpp/src/receiver_options.cpp
@@ -130,7 +130,6 @@ receiver_options& receiver_options::auto_settle(bool b) {impl_->auto_settle = b;
receiver_options& receiver_options::credit_window(int w) {impl_->credit_window = w; return *this; }
receiver_options& receiver_options::source(source_options &s) {impl_->source = s; return *this; }
receiver_options& receiver_options::target(target_options &s) {impl_->target = s; return *this; }
-receiver_options& receiver_options::selector(const std::string&) { return *this; }
void receiver_options::apply(receiver& r) const { impl_->apply(r); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/src/source.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/source.cpp b/proton-c/bindings/cpp/src/source.cpp
index 26de203..3816c78 100644
--- a/proton-c/bindings/cpp/src/source.cpp
+++ b/proton-c/bindings/cpp/src/source.cpp
@@ -43,4 +43,14 @@ std::string source::address() const {
return str(pn_terminus_get_address(authoritative));
}
+source::filter_map source::filters() const {
+ codec::decoder d(pn_terminus_filter(object_));
+ filter_map map;
+ if (!d.empty()) {
+ d.rewind();
+ d >> map;
+ }
+ return map;
+}
+
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/src/terminus.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/terminus.cpp b/proton-c/bindings/cpp/src/terminus.cpp
index f9460bb..f1cc61a 100644
--- a/proton-c/bindings/cpp/src/terminus.cpp
+++ b/proton-c/bindings/cpp/src/terminus.cpp
@@ -27,7 +27,7 @@
namespace proton {
terminus::terminus(pn_terminus_t* t) :
- object_(t), properties_(pn_terminus_properties(t)), filter_(pn_terminus_filter(t)), parent_(0)
+ object_(t), parent_(0)
{}
enum expiry_policy terminus::expiry_policy() const {
@@ -46,17 +46,14 @@ enum durability_mode terminus::durability_mode() {
return (enum durability_mode) pn_terminus_get_durability(object_);
}
-std::string terminus::address() const {
- return str(pn_terminus_get_address(object_));
-}
-
bool terminus::dynamic() const {
return pn_terminus_is_dynamic(object_);
}
-const value& terminus::filter() const { return filter_; }
-
-const value& terminus::node_properties() const { return properties_; }
-
+value terminus::node_properties() const {
+ value x(pn_terminus_properties(object_));
+ pn_terminus_properties(object_); // ZZZ
+ return x;
+}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org