You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2016/03/22 01:22:11 UTC

qpid-proton git commit: PROTON-1138: handler options become link or connection options (https://reviews.apache.org/r/44927)

Repository: qpid-proton
Updated Branches:
  refs/heads/master 9e2649ac4 -> f2ae27d7e


PROTON-1138: handler options become link or connection options (https://reviews.apache.org/r/44927)


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f2ae27d7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f2ae27d7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f2ae27d7

Branch: refs/heads/master
Commit: f2ae27d7e7a09c016a89be59e93d5796e01cc725
Parents: 9e2649a
Author: Clifford Jansen <cl...@apache.org>
Authored: Mon Mar 21 17:15:24 2016 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Mon Mar 21 17:15:24 2016 -0700

----------------------------------------------------------------------
 .../cpp/include/proton/connection_options.hpp   |  4 +-
 .../bindings/cpp/include/proton/container.hpp   |  1 +
 .../bindings/cpp/include/proton/handler.hpp     | 23 +-----
 proton-c/bindings/cpp/include/proton/link.hpp   |  1 +
 .../cpp/include/proton/link_options.hpp         | 17 +++-
 .../bindings/cpp/src/connection_options.cpp     | 36 ++++-----
 proton-c/bindings/cpp/src/container.cpp         |  2 +
 proton-c/bindings/cpp/src/container_impl.cpp    | 19 +++--
 proton-c/bindings/cpp/src/container_impl.hpp    |  1 +
 proton-c/bindings/cpp/src/contexts.cpp          | 12 +++
 proton-c/bindings/cpp/src/contexts.hpp          |  9 +++
 proton-c/bindings/cpp/src/handler.cpp           |  4 +-
 proton-c/bindings/cpp/src/link_options.cpp      | 35 +++++---
 proton-c/bindings/cpp/src/messaging_adapter.cpp | 85 ++++++++------------
 proton-c/bindings/cpp/src/messaging_adapter.hpp | 12 +--
 proton-c/bindings/cpp/src/proton_event.hpp      |  1 +
 tests/tools/apps/cpp/reactor_send.cpp           |  5 +-
 17 files changed, 134 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 71e12f1..1a22b73 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -76,9 +76,6 @@ class connection_options {
 
     // XXX add C++11 move operations
 
-    /// Override with options from other.
-    PN_CPP_EXTERN void override(const connection_options& other);
-
     /// Set a handler for the connection.
     PN_CPP_EXTERN connection_options& handler(class handler *);
 
@@ -145,6 +142,7 @@ class connection_options {
     static pn_connection_t *pn_connection(connection &);
     class ssl_client_options &ssl_client_options();
     class ssl_server_options &ssl_server_options();
+    PN_CPP_EXTERN void update(const connection_options& other);
 
     class impl;
     internal::pn_unique_ptr<impl> impl_;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 0af1963..7e0f0ee 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -134,6 +134,7 @@ class container {
 
     friend class connector;
     friend class link;
+    friend class messaging_adapter;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index 3bc0023..6ea11d4 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -41,28 +41,7 @@ class
 PN_CPP_CLASS_EXTERN handler
 {
   public:
-    /// @cond INTERNAL
-    /// XXX move configuration to connection or container
-
-    /// Create a handler.
-    ///
-    /// @param prefetch set flow control to automatically pre-fetch
-    /// this many messages
-    ///
-    /// @param auto_accept automatically accept received messages
-    /// after on_message()
-    ///
-    /// @param auto_settle automatically settle on receipt of delivery
-    /// for sent messages
-    ///
-    /// @param peer_close_is_error treat orderly remote connection
-    /// close as error
-    PN_CPP_EXTERN handler(int prefetch=10, bool auto_accept=true,
-                          bool auto_settle=true,
-                          bool peer_close_is_error=false);
-
-    /// @endcond
-
+    PN_CPP_EXTERN handler();
     PN_CPP_EXTERN virtual ~handler();
 
     /// @name Event callbacks

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/link.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp
index 8de2e7b..363ea16 100644
--- a/proton-c/bindings/cpp/include/proton/link.hpp
+++ b/proton-c/bindings/cpp/include/proton/link.hpp
@@ -151,6 +151,7 @@ PN_CPP_CLASS_EXTERN link : public internal::object<pn_link_t> , public endpoint
   friend class proton_event;
   friend class link_iterator;
   friend class link_options;
+  friend class messaging_adapter;
 };
 
 /// An iterator for links.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/link_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link_options.hpp b/proton-c/bindings/cpp/include/proton/link_options.hpp
index 2eb145d..7f70c4a 100644
--- a/proton-c/bindings/cpp/include/proton/link_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/link_options.hpp
@@ -120,9 +120,6 @@ class link_options {
     /// Copy options.
     PN_CPP_EXTERN link_options& operator=(const link_options&);
 
-    /// Override with options from other.
-    PN_CPP_EXTERN void override(const link_options& other);
-
     /// Set a handler for events scoped to the link.  If NULL,
     /// link-scoped events on the link are discarded.
     PN_CPP_EXTERN link_options& handler(class handler *);
@@ -155,6 +152,18 @@ class link_options {
     /// Set the local address for the link.
     PN_CPP_EXTERN link_options& local_address(const std::string &addr);
 
+    /// Automatically accept inbound messages that aren't otherwise
+    /// released, rejected or modified (default value:true).
+    PN_CPP_EXTERN link_options& auto_accept(bool);
+
+    /// Automatically settle messages (default value: true).
+    PN_CPP_EXTERN link_options& auto_settle(bool);
+
+    /// Set automated flow control to pre-fetch this many messages
+    /// (default value:10).  Set to zero to disable automatic credit
+    /// replenishing.
+    PN_CPP_EXTERN link_options& credit_window(int);
+
     /// @cond INTERNAL
     /// XXX need to discuss spec issues, jms versus amqp filters
     ///
@@ -168,11 +177,13 @@ class link_options {
   private:
     void apply(link&) const;
     proton_handler* handler() const;
+    PN_CPP_EXTERN void update(const link_options& other);
 
     class impl;
     internal::pn_unique_ptr<impl> impl_;
 
     friend class link;
+    friend class container_impl;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/connection_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp
index a5ee99e..c218d64 100644
--- a/proton-c/bindings/cpp/src/connection_options.cpp
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -41,7 +41,7 @@ template <class T> struct option {
 
     option() : value(), set(false) {}
     option& operator=(const T& x) { value = x;  set = true; return *this; }
-    void override(const option<T>& x) { if (x.set) *this = x.value; }
+    void update(const option<T>& x) { if (x.set) *this = x.value; }
 };
 
 class connection_options::impl {
@@ -124,22 +124,22 @@ class connection_options::impl {
         }
     }
 
-    void override(const impl& x) {
-        handler.override(x.handler);
-        max_frame_size.override(x.max_frame_size);
-        max_channels.override(x.max_channels);
-        idle_timeout.override(x.idle_timeout);
-        heartbeat.override(x.heartbeat);
-        container_id.override(x.container_id);
-        link_prefix.override(x.link_prefix);
-        reconnect.override(x.reconnect);
-        ssl_client_options.override(x.ssl_client_options);
-        ssl_server_options.override(x.ssl_server_options);
-        sasl_enabled.override(x.sasl_enabled);
-        sasl_allow_insecure_mechs.override(x.sasl_allow_insecure_mechs);
-        sasl_allowed_mechs.override(x.sasl_allowed_mechs);
-        sasl_config_name.override(x.sasl_config_name);
-        sasl_config_path.override(x.sasl_config_path);
+    void update(const impl& x) {
+        handler.update(x.handler);
+        max_frame_size.update(x.max_frame_size);
+        max_channels.update(x.max_channels);
+        idle_timeout.update(x.idle_timeout);
+        heartbeat.update(x.heartbeat);
+        container_id.update(x.container_id);
+        link_prefix.update(x.link_prefix);
+        reconnect.update(x.reconnect);
+        ssl_client_options.update(x.ssl_client_options);
+        ssl_server_options.update(x.ssl_server_options);
+        sasl_enabled.update(x.sasl_enabled);
+        sasl_allow_insecure_mechs.update(x.sasl_allow_insecure_mechs);
+        sasl_allowed_mechs.update(x.sasl_allowed_mechs);
+        sasl_config_name.update(x.sasl_config_name);
+        sasl_config_path.update(x.sasl_config_path);
     }
 
 };
@@ -155,7 +155,7 @@ connection_options& connection_options::operator=(const connection_options& x) {
     return *this;
 }
 
-void connection_options::override(const connection_options& x) { impl_->override(*x.impl_); }
+void connection_options::update(const connection_options& x) { impl_->update(*x.impl_); }
 
 connection_options& connection_options::handler(class handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; }
 connection_options& connection_options::max_frame_size(uint32_t n) { impl_->max_frame_size = n; return *this; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index fc34f39..2cc8df5 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -81,4 +81,6 @@ void container::client_connection_options(const connection_options &o) { impl_->
 
 void container::server_connection_options(const connection_options &o) { impl_->server_connection_options(o); }
 
+void container::link_options(const class link_options &o) { impl_->link_options(o); }
+
 } // namespace proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp
index 55547ca..979c20c 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -137,17 +137,16 @@ container_impl::container_impl(container& c, messaging_adapter *h, const std::st
         reactor_.pn_handler(cpp_handler(handler_).get());
     }
 
-    // Note: we have just set up the following handlers that see events in this order:
-    // messaging_handler (Proton C events), pn_flowcontroller (optional), messaging_adapter,
-    // messaging_handler (Messaging events from the messaging_adapter, i.e. the delegate),
-    // connector override, the reactor's default globalhandler (pn_iohandler)
+    // Note: we have just set up the following handlers that see
+    // events in this order: messaging_adapter, connector override,
+    // the reactor's default globalhandler (pn_iohandler)
 }
 
 container_impl::~container_impl() {}
 
 connection container_impl::connect(const proton::url &url, const connection_options &user_opts) {
     connection_options opts = client_connection_options(); // Defaults
-    opts.override(user_opts);
+    opts.update(user_opts);
     proton_handler *h = opts.handler();
 
     internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>();
@@ -165,9 +164,9 @@ connection container_impl::connect(const proton::url &url, const connection_opti
 
 sender container_impl::open_sender(const proton::url &url, const proton::link_options &o1, const connection_options &o2) {
     proton::link_options lopts(link_options_);
-    lopts.override(o1);
+    lopts.update(o1);
     connection_options copts(client_connection_options_);
-    copts.override(o2);
+    copts.update(o2);
     connection conn = connect(url, copts);
     std::string path = url.path();
     sender snd = conn.default_session().create_sender();
@@ -178,9 +177,9 @@ sender container_impl::open_sender(const proton::url &url, const proton::link_op
 
 receiver container_impl::open_receiver(const proton::url &url, const proton::link_options &o1, const connection_options &o2) {
     proton::link_options lopts(link_options_);
-    lopts.override(o1);
+    lopts.update(o1);
     connection_options copts(client_connection_options_);
-    copts.override(o2);
+    copts.update(o2);
     connection conn = connect(url, copts);
     std::string path = url.path();
     receiver rcv = conn.default_session().create_receiver();
@@ -191,7 +190,7 @@ receiver container_impl::open_receiver(const proton::url &url, const proton::lin
 
 acceptor container_impl::listen(const proton::url& url, const connection_options &user_opts) {
     connection_options opts = server_connection_options(); // Defaults
-    opts.override(user_opts);
+    opts.update(user_opts);
     proton_handler *h = opts.handler();
     internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>();
     pn_acceptor_t *acptr = pn_reactor_acceptor(reactor_.pn_object(), url.host().c_str(), url.port().c_str(), chandler.get());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp
index d1b476f..05e2058 100644
--- a/proton-c/bindings/cpp/src/container_impl.hpp
+++ b/proton-c/bindings/cpp/src/container_impl.hpp
@@ -83,6 +83,7 @@ class container_impl
     proton::link_options link_options_;
 
   friend class container;
+  friend class messaging_adapter;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp
index c2b76f6..99b05a1 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -50,6 +50,7 @@ pn_class_t cpp_context_class = PN_CLASS(cpp_context);
 PN_HANDLE(CONNECTION_CONTEXT)
 PN_HANDLE(CONTAINER_CONTEXT)
 PN_HANDLE(LISTENER_CONTEXT)
+PN_HANDLE(LINK_CONTEXT)
 
 void set_context(pn_record_t* record, pn_handle_t handle, const pn_class_t *clazz, void* value)
 {
@@ -99,4 +100,15 @@ listener_context& listener_context::get(pn_acceptor_t* a) {
     return *ctx;
 }
 
+link_context& link_context::get(pn_link_t* l) {
+    link_context* ctx =
+        get_context<link_context>(pn_link_attachments(l), LINK_CONTEXT);
+    if (!ctx) {
+        ctx =  context::create<link_context>();
+        set_context(pn_link_attachments(l), LINK_CONTEXT, context::pn_class(), ctx);
+        pn_decref(ctx);
+    }
+    return *ctx;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index b4fcdba..03271a1 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -127,6 +127,15 @@ class listener_context : public context {
     bool ssl;
 };
 
+class link_context : public context {
+  public:
+    static link_context& get(pn_link_t* l);
+    link_context() : credit_window(10), auto_accept(true), auto_settle(true) {}
+    int credit_window;
+    bool auto_accept;
+    bool auto_settle;
+};
+
 }
 
 #endif  /*!PROTON_CPP_CONTEXTS_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/handler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp
index 43febf5..1119720 100644
--- a/proton-c/bindings/cpp/src/handler.cpp
+++ b/proton-c/bindings/cpp/src/handler.cpp
@@ -32,9 +32,7 @@
 
 namespace proton {
 
-handler::handler(int prefetch0, bool auto_accept0, bool auto_settle0, bool peer_close_is_error0) :
-    messaging_adapter_(new messaging_adapter(*this, prefetch0, auto_accept0, auto_settle0, peer_close_is_error0))
-{}
+handler::handler() : messaging_adapter_(new messaging_adapter(*this)) {}
 
 handler::~handler(){}
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/link_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/link_options.cpp b/proton-c/bindings/cpp/src/link_options.cpp
index 534a6b6..ee41535 100644
--- a/proton-c/bindings/cpp/src/link_options.cpp
+++ b/proton-c/bindings/cpp/src/link_options.cpp
@@ -26,6 +26,7 @@
 
 #include "msg.hpp"
 #include "messaging_adapter.hpp"
+#include "contexts.hpp"
 
 
 namespace proton {
@@ -58,7 +59,7 @@ template <class T> struct option {
 
     option() : value(), set(false) {}
     option& operator=(const T& x) { value = x;  set = true; return *this; }
-    void override(const option<T>& x) { if (x.set) *this = x.value; }
+    void update(const option<T>& x) { if (x.set) *this = x.value; }
 };
 
 class link_options::impl {
@@ -71,6 +72,9 @@ class link_options::impl {
     option<std::string> local_address;
     option<enum lifetime_policy> lifetime_policy;
     option<std::string> selector;
+    option<bool> auto_accept;
+    option<bool> auto_settle;
+    option<int> credit_window;
 
     void apply(link& l) {
         if (l.state() & endpoint::LOCAL_UNINIT) {
@@ -118,6 +122,7 @@ class link_options::impl {
                     }
                 }
             }
+            if (auto_settle.set) link_context::get(l.pn_object()).auto_settle = auto_settle.value;
             if (!sender) {
                 // receiver only options
                 if (distribution_mode.set) l.local_source().distribution_mode(distribution_mode.value);
@@ -130,19 +135,24 @@ class link_options::impl {
                     enc << codec::start::map() << symbol("selector") << codec::start::described()
                         << symbol("apache.org:selector-filter:string") << binary(selector.value) << codec::finish();
                 }
+                if (auto_accept.set) link_context::get(l.pn_object()).auto_accept = auto_accept.value;
+                if (credit_window.set) link_context::get(l.pn_object()).credit_window = credit_window.value;
             }
         }
     }
 
-    void override(const impl& x) {
-        handler.override(x.handler);
-        distribution_mode.override(x.distribution_mode);
-        durable_subscription.override(x.durable_subscription);
-        delivery_mode.override(x.delivery_mode);
-        dynamic_address.override(x.dynamic_address);
-        local_address.override(x.local_address);
-        lifetime_policy.override(x.lifetime_policy);
-        selector.override(x.selector);
+    void update(const impl& x) {
+        handler.update(x.handler);
+        distribution_mode.update(x.distribution_mode);
+        durable_subscription.update(x.durable_subscription);
+        delivery_mode.update(x.delivery_mode);
+        dynamic_address.update(x.dynamic_address);
+        local_address.update(x.local_address);
+        lifetime_policy.update(x.lifetime_policy);
+        selector.update(x.selector);
+        auto_accept.update(x.auto_accept);
+        auto_settle.update(x.auto_settle);
+        credit_window.update(x.credit_window);
     }
 
 };
@@ -158,7 +168,7 @@ link_options& link_options::operator=(const link_options& x) {
     return *this;
 }
 
-void link_options::override(const link_options& x) { impl_->override(*x.impl_); }
+void link_options::update(const link_options& x) { impl_->update(*x.impl_); }
 
 link_options& link_options::handler(class handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; }
 link_options& link_options::browsing(bool b) { distribution_mode(b ? terminus::COPY : terminus::MOVE); return *this; }
@@ -169,6 +179,9 @@ link_options& link_options::dynamic_address(bool b) {impl_->dynamic_address = b;
 link_options& link_options::local_address(const std::string &addr) {impl_->local_address = addr; return *this; }
 link_options& link_options::lifetime_policy(enum lifetime_policy lp) {impl_->lifetime_policy = lp; return *this; }
 link_options& link_options::selector(const std::string &str) {impl_->selector = str; return *this; }
+link_options& link_options::auto_accept(bool b) {impl_->auto_accept = b; return *this; }
+link_options& link_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; }
+link_options& link_options::credit_window(int w) {impl_->credit_window = w; return *this; }
 
 void link_options::apply(link& l) const { impl_->apply(l); }
 proton_handler* link_options::handler() const { return impl_->handler.value; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index 4265644..4bc21af 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -26,6 +26,7 @@
 
 #include "contexts.hpp"
 #include "messaging_event.hpp"
+#include "container_impl.hpp"
 #include "msg.hpp"
 
 #include "proton/connection.h"
@@ -38,49 +39,7 @@
 
 namespace proton {
 
-namespace {
-class c_flow_controller : public proton_handler
-{
-  public:
-    pn_handler_t *flowcontroller;
-
-    // TODO: pn_flowcontroller requires a window > 1.
-    c_flow_controller(int window) : flowcontroller(pn_flowcontroller(std::max(window, 2))) {}
-    ~c_flow_controller() {
-        pn_decref(flowcontroller);
-    }
-
-    void redirect(proton_event &pne) {
-        pn_handler_dispatch(flowcontroller, pne.pn_event(), pn_event_type_t(pne.type()));
-    }
-
-    virtual void on_link_local_open(proton_event &e) { redirect(e); }
-    virtual void on_link_remote_open(proton_event &e) { redirect(e); }
-    virtual void on_link_flow(proton_event &e) { redirect(e); }
-    virtual void on_delivery(proton_event &e) { redirect(e); }
-};
-
-} // namespace
-
-void messaging_adapter::create_helpers() {
-  if (prefetch_ > 0) {
-    flow_controller_.reset(new c_flow_controller(prefetch_));
-    add_child_handler(*flow_controller_);
-  }
-}
-
-messaging_adapter::messaging_adapter(handler &delegate,
-                                     int prefetch, bool auto_accept, bool auto_settle, bool peer_close_iserror) :
-    delegate_(delegate),
-    prefetch_(prefetch),
-    auto_accept_(auto_accept),
-    auto_settle_(auto_settle),
-    peer_close_iserror_(peer_close_iserror)
-{
-    create_helpers();
-    //add_child_handler(*this);
-}
-
+messaging_adapter::messaging_adapter(handler &delegate) : delegate_(delegate) {}
 
 messaging_adapter::~messaging_adapter(){}
 
@@ -98,11 +57,13 @@ void messaging_adapter::on_link_flow(proton_event &pe) {
         messaging_event mevent(messaging_event::SENDABLE, pe);
         delegate_.on_sendable(mevent);;
     }
+    credit_topup(lnk);
 }
 
 void messaging_adapter::on_delivery(proton_event &pe) {
     pn_event_t *cevent = pe.pn_event();
     pn_link_t *lnk = pn_event_link(cevent);
+    link_context& lctx = link_context::get(lnk);
     delivery dlv = pe.delivery();
 
     if (pn_link_is_receiver(lnk)) {
@@ -118,11 +79,11 @@ void messaging_adapter::on_delivery(proton_event &pe) {
             mevent.message_ = &msg;
             mevent.message_->decode(dlv);
             if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
-                if (auto_accept_)
+                if (lctx.auto_accept)
                     dlv.release();
             } else {
                 delegate_.on_message(mevent);
-                if (auto_accept_ && !dlv.settled())
+                if (lctx.auto_accept && !dlv.settled())
                     dlv.accept();
             }
         }
@@ -130,6 +91,7 @@ void messaging_adapter::on_delivery(proton_event &pe) {
             messaging_event mevent(messaging_event::DELIVERY_SETTLE, pe);
             delegate_.on_delivery_settle(mevent);
         }
+        credit_topup(lnk);
     } else {
         // sender
         if (dlv.updated()) {
@@ -151,7 +113,7 @@ void messaging_adapter::on_delivery(proton_event &pe) {
                 messaging_event mevent(messaging_event::DELIVERY_SETTLE, pe);
                 delegate_.on_delivery_settle(mevent);
             }
-            if (auto_settle_)
+            if (lctx.auto_settle)
                 dlv.settle();
         }
     }
@@ -172,7 +134,7 @@ bool is_local_unititialised(pn_state_t state) {
 void messaging_adapter::on_link_remote_close(proton_event &pe) {
     pn_event_t *cevent = pe.pn_event();
     pn_link_t *lnk = pn_event_link(cevent);
-    if (peer_close_iserror_ || pn_condition_is_set(pn_link_remote_condition(lnk))) {
+    if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
         messaging_event mevent(messaging_event::LINK_ERROR, pe);
         delegate_.on_link_error(mevent);
     }
@@ -184,7 +146,7 @@ void messaging_adapter::on_link_remote_close(proton_event &pe) {
 void messaging_adapter::on_session_remote_close(proton_event &pe) {
     pn_event_t *cevent = pe.pn_event();
     pn_session_t *session = pn_event_session(cevent);
-    if (peer_close_iserror_ || pn_condition_is_set(pn_session_remote_condition(session))) {
+    if (pn_condition_is_set(pn_session_remote_condition(session))) {
         messaging_event mevent(messaging_event::SESSION_ERROR, pe);
         delegate_.on_session_error(mevent);
     }
@@ -196,7 +158,7 @@ void messaging_adapter::on_session_remote_close(proton_event &pe) {
 void messaging_adapter::on_connection_remote_close(proton_event &pe) {
     pn_event_t *cevent = pe.pn_event();
     pn_connection_t *connection = pn_event_connection(cevent);
-    if (peer_close_iserror_ || pn_condition_is_set(pn_connection_remote_condition(connection))) {
+    if (pn_condition_is_set(pn_connection_remote_condition(connection))) {
         messaging_event mevent(messaging_event::CONNECTION_ERROR, pe);
         delegate_.on_connection_error(mevent);
     }
@@ -223,13 +185,22 @@ void messaging_adapter::on_session_remote_open(proton_event &pe) {
     }
 }
 
+void messaging_adapter::on_link_local_open(proton_event &pe) {
+    credit_topup(pn_event_link(pe.pn_event()));
+}
+
 void messaging_adapter::on_link_remote_open(proton_event &pe) {
     messaging_event mevent(messaging_event::LINK_OPEN, pe);
     delegate_.on_link_open(mevent);
-    pn_link_t *link = pn_event_link(pe.pn_event());
-    if (!is_local_open(pn_link_state(link)) && is_local_unititialised(pn_link_state(link))) {
-        pn_link_open(link);
+    pn_link_t *pnlink = pn_event_link(pe.pn_event());
+    if (!is_local_open(pn_link_state(pnlink)) && is_local_unititialised(pn_link_state(pnlink))) {
+        link lnk(pnlink);
+        if (pe.container_)
+            lnk.open(pe.container_->impl_->link_options_);
+        else
+            lnk.open();    // No default for engine
     }
+    credit_topup(pnlink);
 }
 
 void messaging_adapter::on_transport_tail_closed(proton_event &pe) {
@@ -251,4 +222,14 @@ void messaging_adapter::on_timer_task(proton_event& pe)
     delegate_.on_timer(mevent);
 }
 
+void messaging_adapter::credit_topup(pn_link_t *link) {
+    if (link && pn_link_is_receiver(link)) {
+        int window = link_context::get(link).credit_window;
+        if (window) {
+            int delta = window - pn_link_credit(link);
+            pn_link_flow(link, delta);
+        }
+    }
+}
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/messaging_adapter.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.hpp b/proton-c/bindings/cpp/src/messaging_adapter.hpp
index a5b364a..655efb6 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.hpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.hpp
@@ -38,9 +38,7 @@ namespace proton {
 class messaging_adapter : public proton_handler
 {
   public:
-    messaging_adapter(handler &delegate,
-                                    int prefetch, bool auto_accept, bool auto_settle,
-                                    bool peer_close_is_error);
+    messaging_adapter(handler &delegate);
     virtual ~messaging_adapter();
 
     void on_reactor_init(proton_event &e);
@@ -50,6 +48,7 @@ class messaging_adapter : public proton_handler
     void on_connection_remote_close(proton_event &e);
     void on_session_remote_open(proton_event &e);
     void on_session_remote_close(proton_event &e);
+    void on_link_local_open(proton_event &e);
     void on_link_remote_open(proton_event &e);
     void on_link_remote_close(proton_event &e);
     void on_transport_tail_closed(proton_event &e);
@@ -57,12 +56,7 @@ class messaging_adapter : public proton_handler
 
   private:
     handler &delegate_;  // The handler for generated messaging_event's
-    int prefetch_;
-    bool auto_accept_;
-    bool auto_settle_;
-    bool peer_close_iserror_;
-    internal::pn_unique_ptr<proton_handler> flow_controller_;
-    void create_helpers();
+    void credit_topup(pn_link_t*);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/proton_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proton_event.hpp b/proton-c/bindings/cpp/src/proton_event.hpp
index 8dd2f8f..c671eb9 100644
--- a/proton-c/bindings/cpp/src/proton_event.hpp
+++ b/proton-c/bindings/cpp/src/proton_event.hpp
@@ -294,6 +294,7 @@ class proton_event
     class container *container_;
   friend class messaging_event;
   friend class connection_engine;
+  friend class messaging_adapter;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/tests/tools/apps/cpp/reactor_send.cpp
----------------------------------------------------------------------
diff --git a/tests/tools/apps/cpp/reactor_send.cpp b/tests/tools/apps/cpp/reactor_send.cpp
index 224ac71..5700c73 100644
--- a/tests/tools/apps/cpp/reactor_send.cpp
+++ b/tests/tools/apps/cpp/reactor_send.cpp
@@ -29,6 +29,7 @@
 #include "proton/event.hpp"
 #include "proton/reactor.h"
 #include "proton/value.hpp"
+#include "proton/link_options.hpp"
 
 #include <iostream>
 #include <map>
@@ -53,8 +54,7 @@ class reactor_send : public proton::handler {
   public:
 
     reactor_send(const std::string &url, int c, int size, bool replying)
-        : handler(1024), // prefetch=1024
-          url_(url), sent_(0), confirmed_(0), total_(c),
+        : url_(url), sent_(0), confirmed_(0), total_(c),
           received_(0), received_bytes_(0), replying_(replying) {
         if (replying_)
             message_.reply_to("localhost/test");
@@ -64,6 +64,7 @@ class reactor_send : public proton::handler {
     }
 
     void on_start(proton::event &e) {
+        e.container().link_options(proton::link_options().credit_window(1024));
         e.container().open_sender(url_);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org