You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2016/03/22 01:22:11 UTC
qpid-proton git commit: PROTON-1138: handler options become link or
connection options (https://reviews.apache.org/r/44927)
Repository: qpid-proton
Updated Branches:
refs/heads/master 9e2649ac4 -> f2ae27d7e
PROTON-1138: handler options become link or connection options (https://reviews.apache.org/r/44927)
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f2ae27d7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f2ae27d7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f2ae27d7
Branch: refs/heads/master
Commit: f2ae27d7e7a09c016a89be59e93d5796e01cc725
Parents: 9e2649a
Author: Clifford Jansen <cl...@apache.org>
Authored: Mon Mar 21 17:15:24 2016 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Mon Mar 21 17:15:24 2016 -0700
----------------------------------------------------------------------
.../cpp/include/proton/connection_options.hpp | 4 +-
.../bindings/cpp/include/proton/container.hpp | 1 +
.../bindings/cpp/include/proton/handler.hpp | 23 +-----
proton-c/bindings/cpp/include/proton/link.hpp | 1 +
.../cpp/include/proton/link_options.hpp | 17 +++-
.../bindings/cpp/src/connection_options.cpp | 36 ++++-----
proton-c/bindings/cpp/src/container.cpp | 2 +
proton-c/bindings/cpp/src/container_impl.cpp | 19 +++--
proton-c/bindings/cpp/src/container_impl.hpp | 1 +
proton-c/bindings/cpp/src/contexts.cpp | 12 +++
proton-c/bindings/cpp/src/contexts.hpp | 9 +++
proton-c/bindings/cpp/src/handler.cpp | 4 +-
proton-c/bindings/cpp/src/link_options.cpp | 35 +++++---
proton-c/bindings/cpp/src/messaging_adapter.cpp | 85 ++++++++------------
proton-c/bindings/cpp/src/messaging_adapter.hpp | 12 +--
proton-c/bindings/cpp/src/proton_event.hpp | 1 +
tests/tools/apps/cpp/reactor_send.cpp | 5 +-
17 files changed, 134 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 71e12f1..1a22b73 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -76,9 +76,6 @@ class connection_options {
// XXX add C++11 move operations
- /// Override with options from other.
- PN_CPP_EXTERN void override(const connection_options& other);
-
/// Set a handler for the connection.
PN_CPP_EXTERN connection_options& handler(class handler *);
@@ -145,6 +142,7 @@ class connection_options {
static pn_connection_t *pn_connection(connection &);
class ssl_client_options &ssl_client_options();
class ssl_server_options &ssl_server_options();
+ PN_CPP_EXTERN void update(const connection_options& other);
class impl;
internal::pn_unique_ptr<impl> impl_;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 0af1963..7e0f0ee 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -134,6 +134,7 @@ class container {
friend class connector;
friend class link;
+ friend class messaging_adapter;
/// @endcond
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index 3bc0023..6ea11d4 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -41,28 +41,7 @@ class
PN_CPP_CLASS_EXTERN handler
{
public:
- /// @cond INTERNAL
- /// XXX move configuration to connection or container
-
- /// Create a handler.
- ///
- /// @param prefetch set flow control to automatically pre-fetch
- /// this many messages
- ///
- /// @param auto_accept automatically accept received messages
- /// after on_message()
- ///
- /// @param auto_settle automatically settle on receipt of delivery
- /// for sent messages
- ///
- /// @param peer_close_is_error treat orderly remote connection
- /// close as error
- PN_CPP_EXTERN handler(int prefetch=10, bool auto_accept=true,
- bool auto_settle=true,
- bool peer_close_is_error=false);
-
- /// @endcond
-
+ PN_CPP_EXTERN handler();
PN_CPP_EXTERN virtual ~handler();
/// @name Event callbacks
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/link.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp
index 8de2e7b..363ea16 100644
--- a/proton-c/bindings/cpp/include/proton/link.hpp
+++ b/proton-c/bindings/cpp/include/proton/link.hpp
@@ -151,6 +151,7 @@ PN_CPP_CLASS_EXTERN link : public internal::object<pn_link_t> , public endpoint
friend class proton_event;
friend class link_iterator;
friend class link_options;
+ friend class messaging_adapter;
};
/// An iterator for links.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/link_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link_options.hpp b/proton-c/bindings/cpp/include/proton/link_options.hpp
index 2eb145d..7f70c4a 100644
--- a/proton-c/bindings/cpp/include/proton/link_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/link_options.hpp
@@ -120,9 +120,6 @@ class link_options {
/// Copy options.
PN_CPP_EXTERN link_options& operator=(const link_options&);
- /// Override with options from other.
- PN_CPP_EXTERN void override(const link_options& other);
-
/// Set a handler for events scoped to the link. If NULL,
/// link-scoped events on the link are discarded.
PN_CPP_EXTERN link_options& handler(class handler *);
@@ -155,6 +152,18 @@ class link_options {
/// Set the local address for the link.
PN_CPP_EXTERN link_options& local_address(const std::string &addr);
+ /// Automatically accept inbound messages that aren't otherwise
+ /// released, rejected or modified (default value:true).
+ PN_CPP_EXTERN link_options& auto_accept(bool);
+
+ /// Automatically settle messages (default value: true).
+ PN_CPP_EXTERN link_options& auto_settle(bool);
+
+ /// Set automated flow control to pre-fetch this many messages
+ /// (default value:10). Set to zero to disable automatic credit
+ /// replenishing.
+ PN_CPP_EXTERN link_options& credit_window(int);
+
/// @cond INTERNAL
/// XXX need to discuss spec issues, jms versus amqp filters
///
@@ -168,11 +177,13 @@ class link_options {
private:
void apply(link&) const;
proton_handler* handler() const;
+ PN_CPP_EXTERN void update(const link_options& other);
class impl;
internal::pn_unique_ptr<impl> impl_;
friend class link;
+ friend class container_impl;
/// @endcond
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/connection_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp
index a5ee99e..c218d64 100644
--- a/proton-c/bindings/cpp/src/connection_options.cpp
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -41,7 +41,7 @@ template <class T> struct option {
option() : value(), set(false) {}
option& operator=(const T& x) { value = x; set = true; return *this; }
- void override(const option<T>& x) { if (x.set) *this = x.value; }
+ void update(const option<T>& x) { if (x.set) *this = x.value; }
};
class connection_options::impl {
@@ -124,22 +124,22 @@ class connection_options::impl {
}
}
- void override(const impl& x) {
- handler.override(x.handler);
- max_frame_size.override(x.max_frame_size);
- max_channels.override(x.max_channels);
- idle_timeout.override(x.idle_timeout);
- heartbeat.override(x.heartbeat);
- container_id.override(x.container_id);
- link_prefix.override(x.link_prefix);
- reconnect.override(x.reconnect);
- ssl_client_options.override(x.ssl_client_options);
- ssl_server_options.override(x.ssl_server_options);
- sasl_enabled.override(x.sasl_enabled);
- sasl_allow_insecure_mechs.override(x.sasl_allow_insecure_mechs);
- sasl_allowed_mechs.override(x.sasl_allowed_mechs);
- sasl_config_name.override(x.sasl_config_name);
- sasl_config_path.override(x.sasl_config_path);
+ void update(const impl& x) {
+ handler.update(x.handler);
+ max_frame_size.update(x.max_frame_size);
+ max_channels.update(x.max_channels);
+ idle_timeout.update(x.idle_timeout);
+ heartbeat.update(x.heartbeat);
+ container_id.update(x.container_id);
+ link_prefix.update(x.link_prefix);
+ reconnect.update(x.reconnect);
+ ssl_client_options.update(x.ssl_client_options);
+ ssl_server_options.update(x.ssl_server_options);
+ sasl_enabled.update(x.sasl_enabled);
+ sasl_allow_insecure_mechs.update(x.sasl_allow_insecure_mechs);
+ sasl_allowed_mechs.update(x.sasl_allowed_mechs);
+ sasl_config_name.update(x.sasl_config_name);
+ sasl_config_path.update(x.sasl_config_path);
}
};
@@ -155,7 +155,7 @@ connection_options& connection_options::operator=(const connection_options& x) {
return *this;
}
-void connection_options::override(const connection_options& x) { impl_->override(*x.impl_); }
+void connection_options::update(const connection_options& x) { impl_->update(*x.impl_); }
connection_options& connection_options::handler(class handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; }
connection_options& connection_options::max_frame_size(uint32_t n) { impl_->max_frame_size = n; return *this; }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index fc34f39..2cc8df5 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -81,4 +81,6 @@ void container::client_connection_options(const connection_options &o) { impl_->
void container::server_connection_options(const connection_options &o) { impl_->server_connection_options(o); }
+void container::link_options(const class link_options &o) { impl_->link_options(o); }
+
} // namespace proton
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp
index 55547ca..979c20c 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -137,17 +137,16 @@ container_impl::container_impl(container& c, messaging_adapter *h, const std::st
reactor_.pn_handler(cpp_handler(handler_).get());
}
- // Note: we have just set up the following handlers that see events in this order:
- // messaging_handler (Proton C events), pn_flowcontroller (optional), messaging_adapter,
- // messaging_handler (Messaging events from the messaging_adapter, i.e. the delegate),
- // connector override, the reactor's default globalhandler (pn_iohandler)
+ // Note: we have just set up the following handlers that see
+ // events in this order: messaging_adapter, connector override,
+ // the reactor's default globalhandler (pn_iohandler)
}
container_impl::~container_impl() {}
connection container_impl::connect(const proton::url &url, const connection_options &user_opts) {
connection_options opts = client_connection_options(); // Defaults
- opts.override(user_opts);
+ opts.update(user_opts);
proton_handler *h = opts.handler();
internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>();
@@ -165,9 +164,9 @@ connection container_impl::connect(const proton::url &url, const connection_opti
sender container_impl::open_sender(const proton::url &url, const proton::link_options &o1, const connection_options &o2) {
proton::link_options lopts(link_options_);
- lopts.override(o1);
+ lopts.update(o1);
connection_options copts(client_connection_options_);
- copts.override(o2);
+ copts.update(o2);
connection conn = connect(url, copts);
std::string path = url.path();
sender snd = conn.default_session().create_sender();
@@ -178,9 +177,9 @@ sender container_impl::open_sender(const proton::url &url, const proton::link_op
receiver container_impl::open_receiver(const proton::url &url, const proton::link_options &o1, const connection_options &o2) {
proton::link_options lopts(link_options_);
- lopts.override(o1);
+ lopts.update(o1);
connection_options copts(client_connection_options_);
- copts.override(o2);
+ copts.update(o2);
connection conn = connect(url, copts);
std::string path = url.path();
receiver rcv = conn.default_session().create_receiver();
@@ -191,7 +190,7 @@ receiver container_impl::open_receiver(const proton::url &url, const proton::lin
acceptor container_impl::listen(const proton::url& url, const connection_options &user_opts) {
connection_options opts = server_connection_options(); // Defaults
- opts.override(user_opts);
+ opts.update(user_opts);
proton_handler *h = opts.handler();
internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>();
pn_acceptor_t *acptr = pn_reactor_acceptor(reactor_.pn_object(), url.host().c_str(), url.port().c_str(), chandler.get());
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp
index d1b476f..05e2058 100644
--- a/proton-c/bindings/cpp/src/container_impl.hpp
+++ b/proton-c/bindings/cpp/src/container_impl.hpp
@@ -83,6 +83,7 @@ class container_impl
proton::link_options link_options_;
friend class container;
+ friend class messaging_adapter;
};
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp
index c2b76f6..99b05a1 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -50,6 +50,7 @@ pn_class_t cpp_context_class = PN_CLASS(cpp_context);
PN_HANDLE(CONNECTION_CONTEXT)
PN_HANDLE(CONTAINER_CONTEXT)
PN_HANDLE(LISTENER_CONTEXT)
+PN_HANDLE(LINK_CONTEXT)
void set_context(pn_record_t* record, pn_handle_t handle, const pn_class_t *clazz, void* value)
{
@@ -99,4 +100,15 @@ listener_context& listener_context::get(pn_acceptor_t* a) {
return *ctx;
}
+link_context& link_context::get(pn_link_t* l) {
+ link_context* ctx =
+ get_context<link_context>(pn_link_attachments(l), LINK_CONTEXT);
+ if (!ctx) {
+ ctx = context::create<link_context>();
+ set_context(pn_link_attachments(l), LINK_CONTEXT, context::pn_class(), ctx);
+ pn_decref(ctx);
+ }
+ return *ctx;
+}
+
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index b4fcdba..03271a1 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -127,6 +127,15 @@ class listener_context : public context {
bool ssl;
};
+class link_context : public context {
+ public:
+ static link_context& get(pn_link_t* l);
+ link_context() : credit_window(10), auto_accept(true), auto_settle(true) {}
+ int credit_window;
+ bool auto_accept;
+ bool auto_settle;
+};
+
}
#endif /*!PROTON_CPP_CONTEXTS_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/handler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp
index 43febf5..1119720 100644
--- a/proton-c/bindings/cpp/src/handler.cpp
+++ b/proton-c/bindings/cpp/src/handler.cpp
@@ -32,9 +32,7 @@
namespace proton {
-handler::handler(int prefetch0, bool auto_accept0, bool auto_settle0, bool peer_close_is_error0) :
- messaging_adapter_(new messaging_adapter(*this, prefetch0, auto_accept0, auto_settle0, peer_close_is_error0))
-{}
+handler::handler() : messaging_adapter_(new messaging_adapter(*this)) {}
handler::~handler(){}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/link_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/link_options.cpp b/proton-c/bindings/cpp/src/link_options.cpp
index 534a6b6..ee41535 100644
--- a/proton-c/bindings/cpp/src/link_options.cpp
+++ b/proton-c/bindings/cpp/src/link_options.cpp
@@ -26,6 +26,7 @@
#include "msg.hpp"
#include "messaging_adapter.hpp"
+#include "contexts.hpp"
namespace proton {
@@ -58,7 +59,7 @@ template <class T> struct option {
option() : value(), set(false) {}
option& operator=(const T& x) { value = x; set = true; return *this; }
- void override(const option<T>& x) { if (x.set) *this = x.value; }
+ void update(const option<T>& x) { if (x.set) *this = x.value; }
};
class link_options::impl {
@@ -71,6 +72,9 @@ class link_options::impl {
option<std::string> local_address;
option<enum lifetime_policy> lifetime_policy;
option<std::string> selector;
+ option<bool> auto_accept;
+ option<bool> auto_settle;
+ option<int> credit_window;
void apply(link& l) {
if (l.state() & endpoint::LOCAL_UNINIT) {
@@ -118,6 +122,7 @@ class link_options::impl {
}
}
}
+ if (auto_settle.set) link_context::get(l.pn_object()).auto_settle = auto_settle.value;
if (!sender) {
// receiver only options
if (distribution_mode.set) l.local_source().distribution_mode(distribution_mode.value);
@@ -130,19 +135,24 @@ class link_options::impl {
enc << codec::start::map() << symbol("selector") << codec::start::described()
<< symbol("apache.org:selector-filter:string") << binary(selector.value) << codec::finish();
}
+ if (auto_accept.set) link_context::get(l.pn_object()).auto_accept = auto_accept.value;
+ if (credit_window.set) link_context::get(l.pn_object()).credit_window = credit_window.value;
}
}
}
- void override(const impl& x) {
- handler.override(x.handler);
- distribution_mode.override(x.distribution_mode);
- durable_subscription.override(x.durable_subscription);
- delivery_mode.override(x.delivery_mode);
- dynamic_address.override(x.dynamic_address);
- local_address.override(x.local_address);
- lifetime_policy.override(x.lifetime_policy);
- selector.override(x.selector);
+ void update(const impl& x) {
+ handler.update(x.handler);
+ distribution_mode.update(x.distribution_mode);
+ durable_subscription.update(x.durable_subscription);
+ delivery_mode.update(x.delivery_mode);
+ dynamic_address.update(x.dynamic_address);
+ local_address.update(x.local_address);
+ lifetime_policy.update(x.lifetime_policy);
+ selector.update(x.selector);
+ auto_accept.update(x.auto_accept);
+ auto_settle.update(x.auto_settle);
+ credit_window.update(x.credit_window);
}
};
@@ -158,7 +168,7 @@ link_options& link_options::operator=(const link_options& x) {
return *this;
}
-void link_options::override(const link_options& x) { impl_->override(*x.impl_); }
+void link_options::update(const link_options& x) { impl_->update(*x.impl_); }
link_options& link_options::handler(class handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; }
link_options& link_options::browsing(bool b) { distribution_mode(b ? terminus::COPY : terminus::MOVE); return *this; }
@@ -169,6 +179,9 @@ link_options& link_options::dynamic_address(bool b) {impl_->dynamic_address = b;
link_options& link_options::local_address(const std::string &addr) {impl_->local_address = addr; return *this; }
link_options& link_options::lifetime_policy(enum lifetime_policy lp) {impl_->lifetime_policy = lp; return *this; }
link_options& link_options::selector(const std::string &str) {impl_->selector = str; return *this; }
+link_options& link_options::auto_accept(bool b) {impl_->auto_accept = b; return *this; }
+link_options& link_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; }
+link_options& link_options::credit_window(int w) {impl_->credit_window = w; return *this; }
void link_options::apply(link& l) const { impl_->apply(l); }
proton_handler* link_options::handler() const { return impl_->handler.value; }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index 4265644..4bc21af 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -26,6 +26,7 @@
#include "contexts.hpp"
#include "messaging_event.hpp"
+#include "container_impl.hpp"
#include "msg.hpp"
#include "proton/connection.h"
@@ -38,49 +39,7 @@
namespace proton {
-namespace {
-class c_flow_controller : public proton_handler
-{
- public:
- pn_handler_t *flowcontroller;
-
- // TODO: pn_flowcontroller requires a window > 1.
- c_flow_controller(int window) : flowcontroller(pn_flowcontroller(std::max(window, 2))) {}
- ~c_flow_controller() {
- pn_decref(flowcontroller);
- }
-
- void redirect(proton_event &pne) {
- pn_handler_dispatch(flowcontroller, pne.pn_event(), pn_event_type_t(pne.type()));
- }
-
- virtual void on_link_local_open(proton_event &e) { redirect(e); }
- virtual void on_link_remote_open(proton_event &e) { redirect(e); }
- virtual void on_link_flow(proton_event &e) { redirect(e); }
- virtual void on_delivery(proton_event &e) { redirect(e); }
-};
-
-} // namespace
-
-void messaging_adapter::create_helpers() {
- if (prefetch_ > 0) {
- flow_controller_.reset(new c_flow_controller(prefetch_));
- add_child_handler(*flow_controller_);
- }
-}
-
-messaging_adapter::messaging_adapter(handler &delegate,
- int prefetch, bool auto_accept, bool auto_settle, bool peer_close_iserror) :
- delegate_(delegate),
- prefetch_(prefetch),
- auto_accept_(auto_accept),
- auto_settle_(auto_settle),
- peer_close_iserror_(peer_close_iserror)
-{
- create_helpers();
- //add_child_handler(*this);
-}
-
+messaging_adapter::messaging_adapter(handler &delegate) : delegate_(delegate) {}
messaging_adapter::~messaging_adapter(){}
@@ -98,11 +57,13 @@ void messaging_adapter::on_link_flow(proton_event &pe) {
messaging_event mevent(messaging_event::SENDABLE, pe);
delegate_.on_sendable(mevent);;
}
+ credit_topup(lnk);
}
void messaging_adapter::on_delivery(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_link_t *lnk = pn_event_link(cevent);
+ link_context& lctx = link_context::get(lnk);
delivery dlv = pe.delivery();
if (pn_link_is_receiver(lnk)) {
@@ -118,11 +79,11 @@ void messaging_adapter::on_delivery(proton_event &pe) {
mevent.message_ = &msg;
mevent.message_->decode(dlv);
if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
- if (auto_accept_)
+ if (lctx.auto_accept)
dlv.release();
} else {
delegate_.on_message(mevent);
- if (auto_accept_ && !dlv.settled())
+ if (lctx.auto_accept && !dlv.settled())
dlv.accept();
}
}
@@ -130,6 +91,7 @@ void messaging_adapter::on_delivery(proton_event &pe) {
messaging_event mevent(messaging_event::DELIVERY_SETTLE, pe);
delegate_.on_delivery_settle(mevent);
}
+ credit_topup(lnk);
} else {
// sender
if (dlv.updated()) {
@@ -151,7 +113,7 @@ void messaging_adapter::on_delivery(proton_event &pe) {
messaging_event mevent(messaging_event::DELIVERY_SETTLE, pe);
delegate_.on_delivery_settle(mevent);
}
- if (auto_settle_)
+ if (lctx.auto_settle)
dlv.settle();
}
}
@@ -172,7 +134,7 @@ bool is_local_unititialised(pn_state_t state) {
void messaging_adapter::on_link_remote_close(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_link_t *lnk = pn_event_link(cevent);
- if (peer_close_iserror_ || pn_condition_is_set(pn_link_remote_condition(lnk))) {
+ if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
messaging_event mevent(messaging_event::LINK_ERROR, pe);
delegate_.on_link_error(mevent);
}
@@ -184,7 +146,7 @@ void messaging_adapter::on_link_remote_close(proton_event &pe) {
void messaging_adapter::on_session_remote_close(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_session_t *session = pn_event_session(cevent);
- if (peer_close_iserror_ || pn_condition_is_set(pn_session_remote_condition(session))) {
+ if (pn_condition_is_set(pn_session_remote_condition(session))) {
messaging_event mevent(messaging_event::SESSION_ERROR, pe);
delegate_.on_session_error(mevent);
}
@@ -196,7 +158,7 @@ void messaging_adapter::on_session_remote_close(proton_event &pe) {
void messaging_adapter::on_connection_remote_close(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_connection_t *connection = pn_event_connection(cevent);
- if (peer_close_iserror_ || pn_condition_is_set(pn_connection_remote_condition(connection))) {
+ if (pn_condition_is_set(pn_connection_remote_condition(connection))) {
messaging_event mevent(messaging_event::CONNECTION_ERROR, pe);
delegate_.on_connection_error(mevent);
}
@@ -223,13 +185,22 @@ void messaging_adapter::on_session_remote_open(proton_event &pe) {
}
}
+void messaging_adapter::on_link_local_open(proton_event &pe) {
+ credit_topup(pn_event_link(pe.pn_event()));
+}
+
void messaging_adapter::on_link_remote_open(proton_event &pe) {
messaging_event mevent(messaging_event::LINK_OPEN, pe);
delegate_.on_link_open(mevent);
- pn_link_t *link = pn_event_link(pe.pn_event());
- if (!is_local_open(pn_link_state(link)) && is_local_unititialised(pn_link_state(link))) {
- pn_link_open(link);
+ pn_link_t *pnlink = pn_event_link(pe.pn_event());
+ if (!is_local_open(pn_link_state(pnlink)) && is_local_unititialised(pn_link_state(pnlink))) {
+ link lnk(pnlink);
+ if (pe.container_)
+ lnk.open(pe.container_->impl_->link_options_);
+ else
+ lnk.open(); // No default for engine
}
+ credit_topup(pnlink);
}
void messaging_adapter::on_transport_tail_closed(proton_event &pe) {
@@ -251,4 +222,14 @@ void messaging_adapter::on_timer_task(proton_event& pe)
delegate_.on_timer(mevent);
}
+void messaging_adapter::credit_topup(pn_link_t *link) {
+ if (link && pn_link_is_receiver(link)) {
+ int window = link_context::get(link).credit_window;
+ if (window) {
+ int delta = window - pn_link_credit(link);
+ pn_link_flow(link, delta);
+ }
+ }
+}
+
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/messaging_adapter.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.hpp b/proton-c/bindings/cpp/src/messaging_adapter.hpp
index a5b364a..655efb6 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.hpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.hpp
@@ -38,9 +38,7 @@ namespace proton {
class messaging_adapter : public proton_handler
{
public:
- messaging_adapter(handler &delegate,
- int prefetch, bool auto_accept, bool auto_settle,
- bool peer_close_is_error);
+ messaging_adapter(handler &delegate);
virtual ~messaging_adapter();
void on_reactor_init(proton_event &e);
@@ -50,6 +48,7 @@ class messaging_adapter : public proton_handler
void on_connection_remote_close(proton_event &e);
void on_session_remote_open(proton_event &e);
void on_session_remote_close(proton_event &e);
+ void on_link_local_open(proton_event &e);
void on_link_remote_open(proton_event &e);
void on_link_remote_close(proton_event &e);
void on_transport_tail_closed(proton_event &e);
@@ -57,12 +56,7 @@ class messaging_adapter : public proton_handler
private:
handler &delegate_; // The handler for generated messaging_event's
- int prefetch_;
- bool auto_accept_;
- bool auto_settle_;
- bool peer_close_iserror_;
- internal::pn_unique_ptr<proton_handler> flow_controller_;
- void create_helpers();
+ void credit_topup(pn_link_t*);
};
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/proton_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proton_event.hpp b/proton-c/bindings/cpp/src/proton_event.hpp
index 8dd2f8f..c671eb9 100644
--- a/proton-c/bindings/cpp/src/proton_event.hpp
+++ b/proton-c/bindings/cpp/src/proton_event.hpp
@@ -294,6 +294,7 @@ class proton_event
class container *container_;
friend class messaging_event;
friend class connection_engine;
+ friend class messaging_adapter;
};
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/tests/tools/apps/cpp/reactor_send.cpp
----------------------------------------------------------------------
diff --git a/tests/tools/apps/cpp/reactor_send.cpp b/tests/tools/apps/cpp/reactor_send.cpp
index 224ac71..5700c73 100644
--- a/tests/tools/apps/cpp/reactor_send.cpp
+++ b/tests/tools/apps/cpp/reactor_send.cpp
@@ -29,6 +29,7 @@
#include "proton/event.hpp"
#include "proton/reactor.h"
#include "proton/value.hpp"
+#include "proton/link_options.hpp"
#include <iostream>
#include <map>
@@ -53,8 +54,7 @@ class reactor_send : public proton::handler {
public:
reactor_send(const std::string &url, int c, int size, bool replying)
- : handler(1024), // prefetch=1024
- url_(url), sent_(0), confirmed_(0), total_(c),
+ : url_(url), sent_(0), confirmed_(0), total_(c),
received_(0), received_bytes_(0), replying_(replying) {
if (replying_)
message_.reply_to("localhost/test");
@@ -64,6 +64,7 @@ class reactor_send : public proton::handler {
}
void on_start(proton::event &e) {
+ e.container().link_options(proton::link_options().credit_window(1024));
e.container().open_sender(url_);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org