You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2016/05/12 07:23:43 UTC
qpid-proton git commit: PROTON-1194: C++ binding basic flow control
Repository: qpid-proton
Updated Branches:
refs/heads/master 1e9e243a3 -> 200a4e221
PROTON-1194: C++ binding basic flow control
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/200a4e22
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/200a4e22
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/200a4e22
Branch: refs/heads/master
Commit: 200a4e2217b9e2fdaea46d28577120c8c4633930
Parents: 1e9e243
Author: Clifford Jansen <cl...@apache.org>
Authored: Thu May 12 00:23:03 2016 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Thu May 12 00:23:03 2016 -0700
----------------------------------------------------------------------
examples/cpp/CMakeLists.txt | 1 +
examples/cpp/example_test.py | 8 +
examples/cpp/flow_control.cpp | 229 +++++++++++++++++++
examples/cpp/queue_browser.cpp | 4 -
.../bindings/cpp/include/proton/handler.hpp | 5 +
proton-c/bindings/cpp/include/proton/link.hpp | 19 +-
.../bindings/cpp/include/proton/receiver.hpp | 15 ++
proton-c/bindings/cpp/include/proton/sender.hpp | 6 +
proton-c/bindings/cpp/src/contexts.hpp | 7 +-
proton-c/bindings/cpp/src/handler.cpp | 2 +
.../bindings/cpp/src/io/connection_engine.cpp | 1 +
proton-c/bindings/cpp/src/link.cpp | 17 +-
proton-c/bindings/cpp/src/messaging_adapter.cpp | 49 +++-
proton-c/bindings/cpp/src/receiver.cpp | 32 +++
proton-c/bindings/cpp/src/receiver_options.cpp | 2 +-
proton-c/bindings/cpp/src/sender.cpp | 9 +
proton-c/bindings/cpp/src/sender_options.cpp | 2 +-
17 files changed, 376 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 2cb258f..a9f8700 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -38,6 +38,7 @@ foreach(example
connection_options
queue_browser
selected_recv
+ flow_control
ssl
ssl_client_cert
encode_decode)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 274efcf..7d4dc78 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -258,6 +258,14 @@ class ContainerExampleTest(BrokerTestCase):
self.assertEqual(CLIENT_EXPECT,
self.proc(["client", "-a", addr+"/examples"]).wait_exit())
+ def test_flow_control(self):
+ want="""success: Example 1: simple credit
+success: Example 2: basic drain
+success: Example 3: drain without credit
+success: Exmaple 4: high/low watermark
+"""
+ self.assertEqual(want, self.proc(["flow_control", pick_addr(), "-quiet"]).wait_exit())
+
def test_encode_decode(self):
want="""
== Array, list and map of uniform type.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/flow_control.cpp b/examples/cpp/flow_control.cpp
new file mode 100644
index 0000000..59d9a46
--- /dev/null
+++ b/examples/cpp/flow_control.cpp
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/acceptor.hpp"
+#include "proton/connection.hpp"
+#include "proton/container.hpp"
+#include "proton/handler.hpp"
+#include "proton/sender.hpp"
+#include "proton/tracker.hpp"
+#include "proton/delivery.hpp"
+
+#include <iostream>
+#include <sstream>
+
+#include "fake_cpp11.hpp"
+
+bool verbose = true;
+
+void verify(bool success, const std::string &msg) {
+ if (!success)
+ throw std::runtime_error("example failure:" + msg);
+ else {
+ std::cout << "success: " << msg << std::endl;
+ if (verbose) std::cout << std::endl;
+ }
+}
+
+
+// flow_sender manages the incoming connection and acts as the message sender.
+class flow_sender : public proton::handler {
+ private:
+ int available; // Number of messages the sender may send assuming sufficient credit.
+ int sequence;
+
+ public:
+ flow_sender() : available(0), sequence(0) {}
+
+ void on_sendable(proton::sender &s) override {
+ if (verbose)
+ std::cout << "flow_sender in \"on_sendable\" with credit " << s.credit()
+ << " and " << available << " available messages" << std::endl;
+ for (int i = sequence; available && s.credit() > 0; i++) {
+ std::ostringstream mbody;
+ mbody << "flow_sender message " << sequence++;
+ proton::message m(mbody.str());
+ s.send(m);
+ available--;
+ }
+ }
+
+ void on_sender_drain_start(proton::sender &s) override {
+ if (verbose)
+ std::cout << "flow_sender in \"on_drain_start\" with credit " << s.credit()
+ << " making an internal call to \"on_sendble\"" << std::endl;
+ on_sendable(s); // send as many as we can
+ if (s.credit()) {
+ s.return_credit(); // return the rest
+ }
+ }
+
+ void set_available(int n) { available = n; }
+};
+
+class flow_receiver : public proton::handler {
+ public:
+ int stage;
+ int received;
+ flow_sender &sender;
+
+ flow_receiver(flow_sender &s) : stage(0), sender(s) {}
+
+ void example_setup(int n) {
+ received = 0;
+ sender.set_available(n);
+ }
+
+ void run_stage(proton::receiver &r, const std::string &caller) {
+ // Serialize the progression of the flow control examples.
+ switch (stage) {
+ case 0:
+ if (verbose) std::cout << "Example 1. Simple use of credit." << std::endl;
+ // TODO: add timeout callbacks, show no messages until credit.
+ example_setup(2);
+ r.add_credit(2);
+ break;
+ case 1:
+ if (r.credit() > 0) return;
+ verify(received == 2, "Example 1: simple credit");
+
+ if (verbose) std::cout << "Example 2. Use basic drain, sender has 3 \"immediate\" messages." << std::endl;
+ example_setup(3);
+ r.add_credit(5); // ask for up to 5
+ r.drain(); // but only use what's available
+ break;
+ case 2:
+ if (caller == "on_message") return;
+ if (caller == "on_receiver_drain_finish") {
+ // Note that unused credit of 2 at sender is returned and is now 0.
+ verify(received == 3 && r.credit() == 0, "Example 2: basic drain");
+
+ if (verbose) std::cout << "Example 3. Drain use with no credit." << std::endl;
+ example_setup(0);
+ r.drain();
+ break;
+ }
+ verify(false, "example 2 run_stage");
+ return;
+
+ case 3:
+ verify(caller == "on_receiver_drain_finish" && received == 0, "Example 3: drain without credit");
+
+ if (verbose) std::cout << "Example 4. Show using high(10)/low(3) watermark for 25 messages." << std::endl;
+ example_setup(25);
+ r.add_credit(10);
+ break;
+
+ case 4:
+ if (received < 25) {
+ // Top up credit as needed.
+ uint32_t credit = r.credit();
+ if (credit <= 3) {
+ uint32_t new_credit = 10;
+ uint32_t remaining = 25 - received;
+ if (new_credit > remaining)
+ new_credit = remaining;
+ if (new_credit > credit) {
+ r.add_credit(new_credit - credit);
+ if (verbose)
+ std::cout << "flow_receiver adding credit for " << new_credit - credit
+ << " messages" << std::endl;
+ }
+ }
+ return;
+ }
+
+ verify(received == 25 && r.credit() == 0, "Exmaple 4: high/low watermark");
+ r.connection().close();
+ break;
+
+ default:
+ throw std::runtime_error("run_stage sequencing error");
+ }
+ stage++;
+ }
+
+ void on_receiver_open(proton::receiver &r) override {
+ run_stage(r, "on_receiver_open");
+ }
+
+ void on_message(proton::delivery &d, proton::message &m) override {
+ if (verbose)
+ std::cout << "flow_receiver in \"on_message\" with " << m.body() << std::endl;
+ proton::receiver r(d.receiver());
+ received++;
+ run_stage(r, "on_message");
+ }
+
+ void on_receiver_drain_finish(proton::receiver &r) override {
+ if (verbose)
+ std::cout << "flow_receiver in \"on_receiver_drain_finish\"" << std::endl;
+ run_stage(r, "on_receiver_drain_finish");
+ }
+};
+
+
+class flow_control : public proton::handler {
+ private:
+ std::string url;
+ proton::acceptor acceptor;
+ flow_sender send_handler;
+ flow_receiver receive_handler;
+
+ public:
+ flow_control(const std::string& u) : url(u), receive_handler(send_handler) {}
+
+ void on_container_start(proton::container &c) override {
+ acceptor = c.listen(url, proton::connection_options().handler(&send_handler));
+ c.connect(url);
+ }
+
+ void on_connection_open(proton::connection &c) override {
+ if (c.active()) {
+ // outbound connection
+ c.open_receiver("flow_example", proton::receiver_options().handler(&receive_handler).credit_window(0));
+ }
+ }
+
+ void on_connection_close(proton::connection &) override {
+ acceptor.close();
+ }
+};
+
+int main(int argc, char **argv) {
+ std::string quiet_arg("-quiet");
+ if (argc > 2 && quiet_arg == argv[2])
+ verbose = false;
+ try {
+ // Pick an "unusual" port since we are going to be talking to
+ // ourselves, not a broker.
+ std::string url = argc > 1 ? argv[1] : "127.0.0.1:8888/examples";
+
+ flow_control fc(url);
+ proton::container(fc).run();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/queue_browser.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/queue_browser.cpp b/examples/cpp/queue_browser.cpp
index a1fa471..87cb147 100644
--- a/examples/cpp/queue_browser.cpp
+++ b/examples/cpp/queue_browser.cpp
@@ -48,10 +48,6 @@ class browser : public proton::handler {
void on_message(proton::delivery &d, proton::message &m) override {
std::cout << m.body() << std::endl;
-
- if (d.receiver().queued() == 0 && d.receiver().drained() > 0) {
- d.connection().close();
- }
}
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index b59e0cf..3cc2759 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -141,6 +141,11 @@ PN_CPP_CLASS_EXTERN handler
/// The sending peer settled a transfer.
PN_CPP_EXTERN virtual void on_delivery_settle(delivery &d);
+ /// The receiving peer has requested a drain of remaining credit.
+ PN_CPP_EXTERN virtual void on_sender_drain_start(sender &s);
+ /// The credit outstanding at the time of the call to receiver::drain has been consumed or returned.
+ PN_CPP_EXTERN virtual void on_receiver_drain_finish(receiver &r);
+
/// Fallback error handling.
PN_CPP_EXTERN virtual void on_error(const error_condition &c);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/link.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp
index f57754c..1a5347c 100644
--- a/proton-c/bindings/cpp/include/proton/link.hpp
+++ b/proton-c/bindings/cpp/include/proton/link.hpp
@@ -86,20 +86,11 @@ PN_CPP_CLASS_EXTERN link : public internal::object<pn_link_t> , public endpoint
/// Credit available on the link.
PN_CPP_EXTERN int credit() const;
- /// The number of deliveries queued on the link.
- PN_CPP_EXTERN int queued();
-
- /// @cond INTERNAL
- /// XXX ask about when this is used
- /// The number of unsettled deliveries on the link.
- PN_CPP_EXTERN int unsettled();
- /// @endcond
-
- /// @cond INTERNAL
- /// XXX revisit mind-melting API inherited from C
- /// XXX flush() ? drain, and drain_completed (sender and receiver ends)
- PN_CPP_EXTERN int drained();
- /// @endcond
+ /// True for a receiver if a drain cycle has been started and the
+ /// corresponding on_receiver_drain_finish event is still pending.
+ /// @see receiver::drain. True for a sender if the receiver has
+ /// requested a drain of credit and the sender has unused credit.
+ PN_CPP_EXTERN bool draining();
/// Get the link name.
PN_CPP_EXTERN std::string name() const;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/receiver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/receiver.hpp b/proton-c/bindings/cpp/include/proton/receiver.hpp
index 0bdff5d..af2ab1e 100644
--- a/proton-c/bindings/cpp/include/proton/receiver.hpp
+++ b/proton-c/bindings/cpp/include/proton/receiver.hpp
@@ -52,6 +52,21 @@ PN_CPP_CLASS_EXTERN receiver : public link {
/// Get the target node.
PN_CPP_EXTERN class target target() const;
+ /// Increment the credit available to the sender. Credit granted
+ /// during a drain cycle is not communicated to the receiver until
+ /// the drain completes.
+ PN_CPP_EXTERN void add_credit(uint32_t);
+
+ /// Commence a drain cycle. If there is positive credit, a
+ /// request is sent to the sender to immediately use up all of the
+ /// existing credit balance by sending messages that are
+ /// immediately available and releasing any unused credit (see
+ /// sender::return_credit). Throws proton::error if a drain cycle
+ /// is already in progress. An on_receiver_drain_finish event
+ /// will be generated when the outstanding drained credit reaches
+ /// zero.
+ PN_CPP_EXTERN void drain();
+
/// @cond INTERNAL
friend class internal::factory<receiver>;
friend class receiver_iterator;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/sender.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp
index 452809e..fd63ff4 100644
--- a/proton-c/bindings/cpp/include/proton/sender.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender.hpp
@@ -58,6 +58,12 @@ PN_CPP_CLASS_EXTERN sender : public link
/// Get the target node.
PN_CPP_EXTERN class target target() const;
+ /// Return all unused credit to the receiver in response to a
+ /// drain request. Has no effect unless there has been a drain
+ /// request and there is remaining credit to use or return.
+ /// @see receiver::drain.
+ PN_CPP_EXTERN void return_credit();
+
/// @cond INTERNAL
friend class internal::factory<sender>;
friend class sender_iterator;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index 0305b8a..9a4a9fe 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -83,13 +83,14 @@ class context {
// Connection context used by all connections.
class connection_context : public context {
public:
- connection_context() : default_session(0), work_queue(0) {}
+ connection_context() : default_session(0), work_queue(0), collector(0) {}
// Used by all connections
pn_session_t *default_session; // Owned by connection.
message event_message; // re-used by messaging_adapter for performance.
id_generator link_gen; // Link name generator.
class work_queue* work_queue; // Work queue if this is proton::controller connection.
+ pn_collector_t* collector;
internal::pn_unique_ptr<proton_handler> handler;
@@ -120,10 +121,12 @@ class listener_context : public context {
class link_context : public context {
public:
static link_context& get(pn_link_t* l);
- link_context() : credit_window(10), auto_accept(true), auto_settle(true) {}
+ link_context() : credit_window(10), auto_accept(true), auto_settle(true), draining(false), pending_credit(0) {}
int credit_window;
bool auto_accept;
bool auto_settle;
+ bool draining;
+ uint32_t pending_credit;
};
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/handler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp
index 5bb3859..d1d1745 100644
--- a/proton-c/bindings/cpp/src/handler.cpp
+++ b/proton-c/bindings/cpp/src/handler.cpp
@@ -58,6 +58,8 @@ void handler::on_tracker_reject(tracker &) {}
void handler::on_tracker_release(tracker &) {}
void handler::on_tracker_settle(tracker &) {}
void handler::on_delivery_settle(delivery &) {}
+void handler::on_sender_drain_start(sender &) {}
+void handler::on_receiver_drain_finish(receiver &) {}
void handler::on_error(const error_condition& c) { throw proton::error(c.what()); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index 045ae66..3865953 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -58,6 +58,7 @@ connection_engine::connection_engine(class handler &h, const connection_options&
if (connection_.container_id().empty())
pn_connection_set_container(unwrap(connection_), uuid::random().str().c_str());
id_generator &link_gen = connection_context::get(connection_).link_gen;
+ connection_context::get(connection_).collector = collector_.get();
if (link_gen.prefix().empty())
link_gen.prefix(uuid::random().str()+"/");
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/link.cpp b/proton-c/bindings/cpp/src/link.cpp
index 725b3b3..9b4f325 100644
--- a/proton-c/bindings/cpp/src/link.cpp
+++ b/proton-c/bindings/cpp/src/link.cpp
@@ -49,12 +49,21 @@ void link::detach() {
}
int link::credit() const {
- return pn_link_credit(pn_object());
+ pn_link_t *lnk = pn_object();
+ if (pn_link_is_sender(lnk))
+ return pn_link_credit(lnk);
+ link_context& lctx = link_context::get(lnk);
+ return pn_link_credit(lnk) + lctx.pending_credit;
}
-int link::queued() { return pn_link_queued(pn_object()); }
-int link::unsettled() { return pn_link_unsettled(pn_object()); }
-int link::drained() { return pn_link_drained(pn_object()); }
+bool link::draining() {
+ pn_link_t *lnk = pn_object();
+ link_context& lctx = link_context::get(lnk);
+ if (pn_link_is_sender(lnk))
+ return pn_link_credit(lnk) > 0 && lctx.draining;
+ else
+ return lctx.draining;
+}
std::string link::name() const { return str(pn_link_name(pn_object()));}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index 8ac377e..1fe9bcd 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -68,13 +68,35 @@ void messaging_adapter::on_reactor_init(proton_event &pe) {
void messaging_adapter::on_link_flow(proton_event &pe) {
pn_event_t *pne = pe.pn_event();
pn_link_t *lnk = pn_event_link(pne);
- sender s(make_wrapper<sender>(lnk));
+ // TODO: process session flow data, if no link-specific data, just return.
+ if (!lnk) return;
+ link_context& lctx = link_context::get(lnk);
int state = pn_link_state(lnk);
- if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0 &&
- (state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE))
- {
- // create on_message extended event
- delegate_.on_sendable(s);
+ if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) {
+ if (pn_link_is_sender(lnk)) {
+ if (pn_link_credit(lnk) > 0) {
+ sender s(make_wrapper<sender>(lnk));
+ if (pn_link_draining(lnk)) {
+ if (!lctx.draining) {
+ lctx.draining = true;
+ delegate_.on_sender_drain_start(s);
+ }
+ }
+ else {
+ lctx.draining = false;
+ }
+ // create on_message extended event
+ delegate_.on_sendable(s);
+ }
+ }
+ else {
+ // receiver
+ if (!pn_link_credit(lnk) && lctx.draining) {
+ lctx.draining = false;
+ receiver r(make_wrapper<receiver>(lnk));
+ delegate_.on_receiver_drain_finish(r);
+ }
+ }
}
credit_topup(lnk);
}
@@ -103,11 +125,26 @@ void messaging_adapter::on_delivery(proton_event &pe) {
delegate_.on_message(d, msg);
if (lctx.auto_accept && !d.settled())
d.accept();
+ if (lctx.draining && !pn_link_credit(lnk)) {
+ lctx.draining = false;
+ receiver r(make_wrapper<receiver>(lnk));
+ delegate_.on_receiver_drain_finish(r);
+ }
}
}
else if (pn_delivery_updated(dlv) && d.settled()) {
delegate_.on_delivery_settle(d);
}
+ if (lctx.draining && pn_link_credit(lnk) == 0) {
+ lctx.draining = false;
+ pn_link_set_drain(lnk, false);
+ receiver r(make_wrapper<receiver>(lnk));
+ delegate_.on_receiver_drain_finish(r);
+ if (lctx.pending_credit) {
+ pn_link_flow(lnk, lctx.pending_credit);
+ lctx.pending_credit = 0;
+ }
+ }
credit_topup(lnk);
} else {
tracker t(make_wrapper<tracker>(dlv));
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp
index 4760c71..eb1c06e 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -24,10 +24,13 @@
#include "msg.hpp"
#include "proton_bits.hpp"
+#include "contexts.hpp"
#include "proton/connection.h"
#include "proton/session.h"
#include "proton/link.h"
+#include "proton/event.h"
+#include "proton/reactor.h"
namespace proton {
@@ -46,6 +49,35 @@ class target receiver::target() const {
return proton::target(*this);
}
+void receiver::add_credit(uint32_t credit) {
+ link_context &ctx = link_context::get(pn_object());
+ if (ctx.draining)
+ ctx.pending_credit += credit;
+ else
+ pn_link_flow(pn_object(), credit);
+}
+
+void receiver::drain() {
+ link_context &ctx = link_context::get(pn_object());
+ if (ctx.draining)
+ throw proton::error("drain already in progress");
+ else {
+ ctx.draining = true;
+ if (credit() > 0)
+ pn_link_set_drain(pn_object(), true);
+ else {
+ // Drain is already complete. No state to communicate over the wire.
+ // Create dummy flow event where "drain finish" can be detected.
+ pn_connection_t *pnc = pn_session_connection(pn_link_session(pn_object()));
+ connection_context& cctx = connection_context::get(pnc);
+ // connection_engine collector is per connection. Reactor collector is global.
+ pn_collector_t *coll = cctx.collector;
+ if (!coll)
+ coll = pn_reactor_collector(pn_object_reactor(pnc));
+ pn_collector_put(coll, PN_OBJECT, pn_object(), PN_LINK_FLOW);
+ }
+ }
+}
receiver_iterator receiver_iterator::operator++() {
if (!!obj_) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/receiver_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp
index f39ea66..1a1b3f3 100644
--- a/proton-c/bindings/cpp/src/receiver_options.cpp
+++ b/proton-c/bindings/cpp/src/receiver_options.cpp
@@ -45,7 +45,7 @@ template <class T> struct option {
class receiver_options::impl {
static void set_handler(receiver l, proton_handler &h) {
pn_record_t *record = pn_link_attachments(unwrap(l));
- internal::pn_ptr<pn_handler_t> chandler = connection().container().impl_->cpp_handler(&h);
+ internal::pn_ptr<pn_handler_t> chandler = l.connection().container().impl_->cpp_handler(&h);
pn_record_set_handler(record, chandler.get());
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender.cpp b/proton-c/bindings/cpp/src/sender.cpp
index fc33524..e45cf7a 100644
--- a/proton-c/bindings/cpp/src/sender.cpp
+++ b/proton-c/bindings/cpp/src/sender.cpp
@@ -28,6 +28,7 @@
#include "proton/types.h"
#include "proton_bits.hpp"
+#include "contexts.hpp"
namespace proton {
@@ -61,9 +62,17 @@ tracker sender::send(const message &message) {
pn_link_advance(pn_object());
if (pn_link_snd_settle_mode(pn_object()) == PN_SND_SETTLED)
pn_delivery_settle(dlv);
+ if (!pn_link_credit(pn_object()))
+ link_context::get(pn_object()).draining = false;
return make_wrapper<tracker>(dlv);
}
+void sender::return_credit() {
+ link_context &lctx = link_context::get(pn_object());
+ lctx.draining = false;
+ pn_link_drained(pn_object());
+}
+
sender_iterator sender_iterator::operator++() {
if (!!obj_) {
pn_link_t *lnk = pn_link_next(obj_.pn_object(), 0);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/sender_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp
index ed030d9..4786937 100644
--- a/proton-c/bindings/cpp/src/sender_options.cpp
+++ b/proton-c/bindings/cpp/src/sender_options.cpp
@@ -43,7 +43,7 @@ template <class T> struct option {
class sender_options::impl {
static void set_handler(sender l, proton_handler &h) {
pn_record_t *record = pn_link_attachments(unwrap(l));
- internal::pn_ptr<pn_handler_t> chandler = connection().container().impl_->cpp_handler(&h);
+ internal::pn_ptr<pn_handler_t> chandler = l.connection().container().impl_->cpp_handler(&h);
pn_record_set_handler(record, chandler.get());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org