You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2016/05/12 07:23:43 UTC

qpid-proton git commit: PROTON-1194: C++ binding basic flow control

Repository: qpid-proton
Updated Branches:
  refs/heads/master 1e9e243a3 -> 200a4e221


PROTON-1194: C++ binding basic flow control


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/200a4e22
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/200a4e22
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/200a4e22

Branch: refs/heads/master
Commit: 200a4e2217b9e2fdaea46d28577120c8c4633930
Parents: 1e9e243
Author: Clifford Jansen <cl...@apache.org>
Authored: Thu May 12 00:23:03 2016 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Thu May 12 00:23:03 2016 -0700

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |   1 +
 examples/cpp/example_test.py                    |   8 +
 examples/cpp/flow_control.cpp                   | 229 +++++++++++++++++++
 examples/cpp/queue_browser.cpp                  |   4 -
 .../bindings/cpp/include/proton/handler.hpp     |   5 +
 proton-c/bindings/cpp/include/proton/link.hpp   |  19 +-
 .../bindings/cpp/include/proton/receiver.hpp    |  15 ++
 proton-c/bindings/cpp/include/proton/sender.hpp |   6 +
 proton-c/bindings/cpp/src/contexts.hpp          |   7 +-
 proton-c/bindings/cpp/src/handler.cpp           |   2 +
 .../bindings/cpp/src/io/connection_engine.cpp   |   1 +
 proton-c/bindings/cpp/src/link.cpp              |  17 +-
 proton-c/bindings/cpp/src/messaging_adapter.cpp |  49 +++-
 proton-c/bindings/cpp/src/receiver.cpp          |  32 +++
 proton-c/bindings/cpp/src/receiver_options.cpp  |   2 +-
 proton-c/bindings/cpp/src/sender.cpp            |   9 +
 proton-c/bindings/cpp/src/sender_options.cpp    |   2 +-
 17 files changed, 376 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 2cb258f..a9f8700 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -38,6 +38,7 @@ foreach(example
     connection_options
     queue_browser
     selected_recv
+    flow_control
     ssl
     ssl_client_cert
     encode_decode)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 274efcf..7d4dc78 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -258,6 +258,14 @@ class ContainerExampleTest(BrokerTestCase):
         self.assertEqual(CLIENT_EXPECT,
                          self.proc(["client", "-a", addr+"/examples"]).wait_exit())
 
+    def test_flow_control(self):
+        want="""success: Example 1: simple credit
+success: Example 2: basic drain
+success: Example 3: drain without credit
+success: Exmaple 4: high/low watermark
+"""
+        self.assertEqual(want, self.proc(["flow_control", pick_addr(), "-quiet"]).wait_exit())
+
     def test_encode_decode(self):
         want="""
 == Array, list and map of uniform type.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/flow_control.cpp b/examples/cpp/flow_control.cpp
new file mode 100644
index 0000000..59d9a46
--- /dev/null
+++ b/examples/cpp/flow_control.cpp
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/acceptor.hpp"
+#include "proton/connection.hpp"
+#include "proton/container.hpp"
+#include "proton/handler.hpp"
+#include "proton/sender.hpp"
+#include "proton/tracker.hpp"
+#include "proton/delivery.hpp"
+
+#include <iostream>
+#include <sstream>
+
+#include "fake_cpp11.hpp"
+
+bool verbose = true;
+
+void verify(bool success, const std::string &msg) {
+    if (!success)
+        throw std::runtime_error("example failure:" + msg);
+    else {
+        std::cout << "success: " << msg << std::endl;
+        if (verbose) std::cout << std::endl;
+    }
+}
+
+
+// flow_sender manages the incoming connection and acts as the message sender.
+class flow_sender : public proton::handler {
+  private:
+    int available;  // Number of messages the sender may send assuming sufficient credit.
+    int sequence;
+
+  public:
+    flow_sender() : available(0), sequence(0) {}
+
+    void on_sendable(proton::sender &s) override {
+        if (verbose)
+            std::cout << "flow_sender in \"on_sendable\" with credit " << s.credit()
+                      << " and " << available << " available messages" << std::endl;
+        for (int i = sequence; available && s.credit() > 0; i++) {
+            std::ostringstream mbody;
+            mbody << "flow_sender message " << sequence++;
+            proton::message m(mbody.str());
+            s.send(m);
+            available--;
+        }
+    }
+
+    void on_sender_drain_start(proton::sender &s) override {
+        if (verbose)
+            std::cout << "flow_sender in \"on_drain_start\" with credit " << s.credit()
+                      << " making an internal call to \"on_sendble\"" << std::endl;
+        on_sendable(s); // send as many as we can
+        if (s.credit()) {
+            s.return_credit(); // return the rest
+        }
+    }
+
+    void set_available(int n) { available = n; }
+};
+
+class flow_receiver : public proton::handler {
+  public:
+    int stage;
+    int received;
+    flow_sender &sender;
+
+    flow_receiver(flow_sender &s) : stage(0), sender(s) {}
+
+    void example_setup(int n) {
+        received = 0;
+        sender.set_available(n);
+    }
+
+    void run_stage(proton::receiver &r, const std::string &caller) {
+        // Serialize the progression of the flow control examples.
+        switch (stage) {
+        case 0:
+            if (verbose) std::cout << "Example 1.  Simple use of credit." << std::endl;
+            // TODO: add timeout callbacks, show no messages until credit.
+            example_setup(2);
+            r.add_credit(2);
+            break;
+        case 1:
+            if (r.credit() > 0) return;
+            verify(received == 2, "Example 1: simple credit");
+
+            if (verbose) std::cout << "Example 2.   Use basic drain, sender has 3 \"immediate\" messages." << std::endl;
+            example_setup(3);
+            r.add_credit(5); // ask for up to 5
+            r.drain();       // but only use what's available
+            break;
+        case 2:
+            if (caller == "on_message") return;
+            if (caller == "on_receiver_drain_finish") {
+                // Note that unused credit of 2 at sender is returned and is now 0.
+                verify(received == 3 && r.credit() == 0, "Example 2: basic drain");
+
+                if (verbose) std::cout << "Example 3. Drain use with no credit." << std::endl;
+                example_setup(0);
+                r.drain();
+                break;
+            }
+            verify(false, "example 2 run_stage");
+            return;
+
+        case 3:
+            verify(caller == "on_receiver_drain_finish" && received == 0, "Example 3: drain without credit");
+
+            if (verbose) std::cout << "Example 4. Show using high(10)/low(3) watermark for 25 messages." << std::endl;
+            example_setup(25);
+            r.add_credit(10);
+            break;
+
+        case 4:
+            if (received < 25) {
+                // Top up credit as needed.
+                uint32_t credit = r.credit();
+                if (credit <= 3) {
+                    uint32_t new_credit = 10;
+                    uint32_t remaining = 25 - received;
+                    if (new_credit > remaining)
+                        new_credit = remaining;
+                    if (new_credit > credit) {
+                        r.add_credit(new_credit - credit);
+                        if (verbose)
+                            std::cout << "flow_receiver adding credit for " << new_credit - credit
+                                      << " messages" << std::endl;
+                    }
+                }
+                return;
+            }
+
+            verify(received == 25 && r.credit() == 0, "Exmaple 4: high/low watermark");
+            r.connection().close();
+            break;
+
+        default:
+            throw std::runtime_error("run_stage sequencing error");
+        }
+        stage++;
+    }
+
+    void on_receiver_open(proton::receiver &r) override {
+        run_stage(r, "on_receiver_open");
+    }
+
+    void on_message(proton::delivery &d, proton::message &m) override {
+        if (verbose)
+            std::cout << "flow_receiver in \"on_message\" with " << m.body() << std::endl;
+        proton::receiver r(d.receiver());
+        received++;
+        run_stage(r, "on_message");
+    }
+
+    void on_receiver_drain_finish(proton::receiver &r) override {
+        if (verbose)
+            std::cout << "flow_receiver in \"on_receiver_drain_finish\"" << std::endl;
+        run_stage(r, "on_receiver_drain_finish");
+    }
+};
+
+
+class flow_control : public proton::handler {
+  private:
+    std::string url;
+    proton::acceptor acceptor;
+    flow_sender send_handler;
+    flow_receiver receive_handler;
+
+  public:
+    flow_control(const std::string& u) : url(u), receive_handler(send_handler) {}
+
+    void on_container_start(proton::container &c) override {
+        acceptor = c.listen(url, proton::connection_options().handler(&send_handler));
+        c.connect(url);
+    }
+
+    void on_connection_open(proton::connection &c) override {
+        if (c.active()) {
+            // outbound connection
+            c.open_receiver("flow_example", proton::receiver_options().handler(&receive_handler).credit_window(0));
+        }
+    }
+
+    void on_connection_close(proton::connection &) override {
+        acceptor.close();
+    }
+};
+
+int main(int argc, char **argv) {
+    std::string quiet_arg("-quiet");
+    if (argc > 2 && quiet_arg == argv[2])
+        verbose = false;
+    try {
+        // Pick an "unusual" port since we are going to be talking to
+        // ourselves, not a broker.
+        std::string url = argc > 1 ? argv[1] : "127.0.0.1:8888/examples";
+
+        flow_control fc(url);
+        proton::container(fc).run();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/queue_browser.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/queue_browser.cpp b/examples/cpp/queue_browser.cpp
index a1fa471..87cb147 100644
--- a/examples/cpp/queue_browser.cpp
+++ b/examples/cpp/queue_browser.cpp
@@ -48,10 +48,6 @@ class browser : public proton::handler {
 
     void on_message(proton::delivery &d, proton::message &m) override {
         std::cout << m.body() << std::endl;
-
-        if (d.receiver().queued() == 0 && d.receiver().drained() > 0) {
-            d.connection().close();
-        }
     }
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index b59e0cf..3cc2759 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -141,6 +141,11 @@ PN_CPP_CLASS_EXTERN handler
     /// The sending peer settled a transfer.
     PN_CPP_EXTERN virtual void on_delivery_settle(delivery &d);
 
+    /// The receiving peer has requested a drain of remaining credit.
+    PN_CPP_EXTERN virtual void on_sender_drain_start(sender &s);
+    /// The credit outstanding at the time of the call to receiver::drain has been consumed or returned.
+    PN_CPP_EXTERN virtual void on_receiver_drain_finish(receiver &r);
+
     /// Fallback error handling.
     PN_CPP_EXTERN virtual void on_error(const error_condition &c);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/link.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp
index f57754c..1a5347c 100644
--- a/proton-c/bindings/cpp/include/proton/link.hpp
+++ b/proton-c/bindings/cpp/include/proton/link.hpp
@@ -86,20 +86,11 @@ PN_CPP_CLASS_EXTERN link : public internal::object<pn_link_t> , public endpoint
     /// Credit available on the link.
     PN_CPP_EXTERN int credit() const;
 
-    /// The number of deliveries queued on the link.
-    PN_CPP_EXTERN int queued();
-
-    /// @cond INTERNAL
-    /// XXX ask about when this is used
-    /// The number of unsettled deliveries on the link.
-    PN_CPP_EXTERN int unsettled();
-    /// @endcond
-
-    /// @cond INTERNAL
-    /// XXX revisit mind-melting API inherited from C
-    /// XXX flush() ? drain, and drain_completed (sender and receiver ends)
-    PN_CPP_EXTERN int drained();
-    /// @endcond
+    /// True for a receiver if a drain cycle has been started and the
+    /// corresponding on_receiver_drain_finish event is still pending.
+    /// @see receiver::drain.  True for a sender if the receiver has
+    /// requested a drain of credit and the sender has unused credit.
+    PN_CPP_EXTERN bool draining();
 
     /// Get the link name.
     PN_CPP_EXTERN std::string name() const;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/receiver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/receiver.hpp b/proton-c/bindings/cpp/include/proton/receiver.hpp
index 0bdff5d..af2ab1e 100644
--- a/proton-c/bindings/cpp/include/proton/receiver.hpp
+++ b/proton-c/bindings/cpp/include/proton/receiver.hpp
@@ -52,6 +52,21 @@ PN_CPP_CLASS_EXTERN receiver : public link {
     /// Get the target node.
     PN_CPP_EXTERN class target target() const;
 
+    /// Increment the credit available to the sender.  Credit granted
+    /// during a drain cycle is not communicated to the receiver until
+    /// the drain completes.
+    PN_CPP_EXTERN void add_credit(uint32_t);
+
+    /// Commence a drain cycle.  If there is positive credit, a
+    /// request is sent to the sender to immediately use up all of the
+    /// existing credit balance by sending messages that are
+    /// immediately available and releasing any unused credit (see
+    /// sender::return_credit).  Throws proton::error if a drain cycle
+    /// is already in progress.  An on_receiver_drain_finish event
+    /// will be generated when the outstanding drained credit reaches
+    /// zero.
+    PN_CPP_EXTERN void drain();
+
     /// @cond INTERNAL
   friend class internal::factory<receiver>;
   friend class receiver_iterator;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/sender.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp
index 452809e..fd63ff4 100644
--- a/proton-c/bindings/cpp/include/proton/sender.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender.hpp
@@ -58,6 +58,12 @@ PN_CPP_CLASS_EXTERN sender : public link
     /// Get the target node.
     PN_CPP_EXTERN class target target() const;
 
+    /// Return all unused credit to the receiver in response to a
+    /// drain request.  Has no effect unless there has been a drain
+    /// request and there is remaining credit to use or return.
+    /// @see receiver::drain.
+    PN_CPP_EXTERN void return_credit();
+
   /// @cond INTERNAL
   friend class internal::factory<sender>;
   friend class sender_iterator;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index 0305b8a..9a4a9fe 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -83,13 +83,14 @@ class context {
 // Connection context used by all connections.
 class connection_context : public context {
   public:
-    connection_context() : default_session(0), work_queue(0) {}
+    connection_context() : default_session(0), work_queue(0), collector(0) {}
 
     // Used by all connections
     pn_session_t *default_session; // Owned by connection.
     message event_message;      // re-used by messaging_adapter for performance.
     id_generator link_gen;      // Link name generator.
     class work_queue* work_queue; // Work queue if this is proton::controller connection.
+    pn_collector_t* collector;
 
     internal::pn_unique_ptr<proton_handler> handler;
 
@@ -120,10 +121,12 @@ class listener_context : public context {
 class link_context : public context {
   public:
     static link_context& get(pn_link_t* l);
-    link_context() : credit_window(10), auto_accept(true), auto_settle(true) {}
+    link_context() : credit_window(10), auto_accept(true), auto_settle(true), draining(false), pending_credit(0) {}
     int credit_window;
     bool auto_accept;
     bool auto_settle;
+    bool draining;
+    uint32_t pending_credit;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/handler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp
index 5bb3859..d1d1745 100644
--- a/proton-c/bindings/cpp/src/handler.cpp
+++ b/proton-c/bindings/cpp/src/handler.cpp
@@ -58,6 +58,8 @@ void handler::on_tracker_reject(tracker &) {}
 void handler::on_tracker_release(tracker &) {}
 void handler::on_tracker_settle(tracker &) {}
 void handler::on_delivery_settle(delivery &) {}
+void handler::on_sender_drain_start(sender &) {}
+void handler::on_receiver_drain_finish(receiver &) {}
 
 void handler::on_error(const error_condition& c) { throw proton::error(c.what()); }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index 045ae66..3865953 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -58,6 +58,7 @@ connection_engine::connection_engine(class handler &h, const connection_options&
     if (connection_.container_id().empty())
         pn_connection_set_container(unwrap(connection_), uuid::random().str().c_str());
     id_generator &link_gen = connection_context::get(connection_).link_gen;
+    connection_context::get(connection_).collector = collector_.get();
     if (link_gen.prefix().empty())
         link_gen.prefix(uuid::random().str()+"/");
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/link.cpp b/proton-c/bindings/cpp/src/link.cpp
index 725b3b3..9b4f325 100644
--- a/proton-c/bindings/cpp/src/link.cpp
+++ b/proton-c/bindings/cpp/src/link.cpp
@@ -49,12 +49,21 @@ void link::detach() {
 }
 
 int link::credit() const {
-    return pn_link_credit(pn_object());
+    pn_link_t *lnk = pn_object();
+    if (pn_link_is_sender(lnk))
+        return pn_link_credit(lnk);
+    link_context& lctx = link_context::get(lnk);
+    return pn_link_credit(lnk) + lctx.pending_credit;
 }
 
-int link::queued() { return pn_link_queued(pn_object()); }
-int link::unsettled() { return pn_link_unsettled(pn_object()); }
-int link::drained() { return pn_link_drained(pn_object()); }
+bool link::draining() {
+    pn_link_t *lnk = pn_object();
+    link_context& lctx = link_context::get(lnk);
+    if (pn_link_is_sender(lnk))
+        return pn_link_credit(lnk) > 0 && lctx.draining;
+    else
+        return lctx.draining;
+}
 
 std::string link::name() const { return str(pn_link_name(pn_object()));}
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index 8ac377e..1fe9bcd 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -68,13 +68,35 @@ void messaging_adapter::on_reactor_init(proton_event &pe) {
 void messaging_adapter::on_link_flow(proton_event &pe) {
     pn_event_t *pne = pe.pn_event();
     pn_link_t *lnk = pn_event_link(pne);
-    sender s(make_wrapper<sender>(lnk));
+    // TODO: process session flow data, if no link-specific data, just return.
+    if (!lnk) return;
+    link_context& lctx = link_context::get(lnk);
     int state = pn_link_state(lnk);
-    if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0 &&
-        (state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE))
-    {
-        // create on_message extended event
-        delegate_.on_sendable(s);
+    if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) {
+        if (pn_link_is_sender(lnk)) {
+            if (pn_link_credit(lnk) > 0) {
+                sender s(make_wrapper<sender>(lnk));
+                if (pn_link_draining(lnk)) {
+                    if (!lctx.draining) {
+                        lctx.draining = true;
+                        delegate_.on_sender_drain_start(s);
+                    }
+                }
+                else {
+                    lctx.draining = false;
+                }
+                // create on_message extended event
+                delegate_.on_sendable(s);
+            }
+        }
+        else {
+            // receiver
+            if (!pn_link_credit(lnk) && lctx.draining) {
+                lctx.draining = false;
+                receiver r(make_wrapper<receiver>(lnk));
+                delegate_.on_receiver_drain_finish(r);
+            }
+        }
     }
     credit_topup(lnk);
 }
@@ -103,11 +125,26 @@ void messaging_adapter::on_delivery(proton_event &pe) {
                 delegate_.on_message(d, msg);
                 if (lctx.auto_accept && !d.settled())
                     d.accept();
+                if (lctx.draining && !pn_link_credit(lnk)) {
+                    lctx.draining = false;
+                    receiver r(make_wrapper<receiver>(lnk));
+                    delegate_.on_receiver_drain_finish(r);
+                }
             }
         }
         else if (pn_delivery_updated(dlv) && d.settled()) {
             delegate_.on_delivery_settle(d);
         }
+        if (lctx.draining && pn_link_credit(lnk) == 0) {
+            lctx.draining = false;
+            pn_link_set_drain(lnk, false);
+            receiver r(make_wrapper<receiver>(lnk));
+            delegate_.on_receiver_drain_finish(r);
+            if (lctx.pending_credit) {
+                pn_link_flow(lnk, lctx.pending_credit);
+                lctx.pending_credit = 0;
+            }
+        }
         credit_topup(lnk);
     } else {
         tracker t(make_wrapper<tracker>(dlv));

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp
index 4760c71..eb1c06e 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -24,10 +24,13 @@
 
 #include "msg.hpp"
 #include "proton_bits.hpp"
+#include "contexts.hpp"
 
 #include "proton/connection.h"
 #include "proton/session.h"
 #include "proton/link.h"
+#include "proton/event.h"
+#include "proton/reactor.h"
 
 namespace proton {
 
@@ -46,6 +49,35 @@ class target receiver::target() const {
     return proton::target(*this);
 }
 
+void receiver::add_credit(uint32_t credit) {
+    link_context &ctx = link_context::get(pn_object());
+    if (ctx.draining)
+        ctx.pending_credit += credit;
+    else
+        pn_link_flow(pn_object(), credit);
+}
+
+void receiver::drain() {
+    link_context &ctx = link_context::get(pn_object());
+    if (ctx.draining)
+        throw proton::error("drain already in progress");
+    else {
+        ctx.draining = true;
+        if (credit() > 0)
+            pn_link_set_drain(pn_object(), true);
+        else {
+            // Drain is already complete.  No state to communicate over the wire.
+            // Create dummy flow event where "drain finish" can be detected.
+            pn_connection_t *pnc = pn_session_connection(pn_link_session(pn_object()));
+            connection_context& cctx = connection_context::get(pnc);
+            // connection_engine collector is per connection.  Reactor collector is global.
+            pn_collector_t *coll = cctx.collector;
+            if (!coll)
+                coll = pn_reactor_collector(pn_object_reactor(pnc));
+            pn_collector_put(coll, PN_OBJECT, pn_object(), PN_LINK_FLOW);
+        }
+    }
+}
 
 receiver_iterator receiver_iterator::operator++() {
     if (!!obj_) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/receiver_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp
index f39ea66..1a1b3f3 100644
--- a/proton-c/bindings/cpp/src/receiver_options.cpp
+++ b/proton-c/bindings/cpp/src/receiver_options.cpp
@@ -45,7 +45,7 @@ template <class T> struct option {
 class receiver_options::impl {
     static void set_handler(receiver l, proton_handler &h) {
         pn_record_t *record = pn_link_attachments(unwrap(l));
-        internal::pn_ptr<pn_handler_t> chandler = connection().container().impl_->cpp_handler(&h);
+        internal::pn_ptr<pn_handler_t> chandler = l.connection().container().impl_->cpp_handler(&h);
         pn_record_set_handler(record, chandler.get());
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender.cpp b/proton-c/bindings/cpp/src/sender.cpp
index fc33524..e45cf7a 100644
--- a/proton-c/bindings/cpp/src/sender.cpp
+++ b/proton-c/bindings/cpp/src/sender.cpp
@@ -28,6 +28,7 @@
 #include "proton/types.h"
 
 #include "proton_bits.hpp"
+#include "contexts.hpp"
 
 namespace proton {
 
@@ -61,9 +62,17 @@ tracker sender::send(const message &message) {
     pn_link_advance(pn_object());
     if (pn_link_snd_settle_mode(pn_object()) == PN_SND_SETTLED)
         pn_delivery_settle(dlv);
+    if (!pn_link_credit(pn_object()))
+        link_context::get(pn_object()).draining = false;
     return make_wrapper<tracker>(dlv);
 }
 
+void sender::return_credit() {
+    link_context &lctx = link_context::get(pn_object());
+    lctx.draining = false;
+    pn_link_drained(pn_object());
+}
+
 sender_iterator sender_iterator::operator++() {
     if (!!obj_) {
         pn_link_t *lnk = pn_link_next(obj_.pn_object(), 0);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/sender_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp
index ed030d9..4786937 100644
--- a/proton-c/bindings/cpp/src/sender_options.cpp
+++ b/proton-c/bindings/cpp/src/sender_options.cpp
@@ -43,7 +43,7 @@ template <class T> struct option {
 class sender_options::impl {
     static void set_handler(sender l, proton_handler &h) {
         pn_record_t *record = pn_link_attachments(unwrap(l));
-        internal::pn_ptr<pn_handler_t> chandler = connection().container().impl_->cpp_handler(&h);
+        internal::pn_ptr<pn_handler_t> chandler = l.connection().container().impl_->cpp_handler(&h);
         pn_record_set_handler(record, chandler.get());
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org