You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2019/02/02 19:02:05 UTC
[qpid-proton] branch master updated: PROTON-1990: C++ credit
draining fails to toggle off in 2/3 of possible cases
This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push:
new 54a7beb PROTON-1990: C++ credit draining fails to toggle off in 2/3 of possible cases
54a7beb is described below
commit 54a7bebd5b407f1e1761fc12dd0128a570e846bf
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Sat Feb 2 11:00:44 2019 -0800
PROTON-1990: C++ credit draining fails to toggle off in 2/3 of possible cases
---
cpp/CMakeLists.txt | 1 +
cpp/src/credit_test.cpp | 365 ++++++++++++++++++++++++++++++++++++++++++
cpp/src/messaging_adapter.cpp | 2 +
3 files changed, 368 insertions(+)
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 794eb29..9614160 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -284,6 +284,7 @@ add_cpp_test(value_test)
add_cpp_test(container_test)
add_cpp_test(reconnect_test)
add_cpp_test(link_test)
+add_cpp_test(credit_test)
if (ENABLE_JSONCPP)
add_cpp_test(connect_config_test)
target_link_libraries(connect_config_test qpid-proton-core) # For pn_sasl_enabled
diff --git a/cpp/src/credit_test.cpp b/cpp/src/credit_test.cpp
new file mode 100644
index 0000000..91e3b52
--- /dev/null
+++ b/cpp/src/credit_test.cpp
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_bits.hpp"
+#include "proton/connection.hpp"
+#include "proton/connection_options.hpp"
+#include "proton/container.hpp"
+#include "proton/delivery.hpp"
+#include "proton/error_condition.hpp"
+#include "proton/listen_handler.hpp"
+#include "proton/listener.hpp"
+#include "proton/message.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/receiver_options.hpp"
+#include "proton/transport.hpp"
+#include "proton/work_queue.hpp"
+
+#include "proton/internal/pn_unique_ptr.hpp"
+
+#include <cstdlib>
+#include <ctime>
+#include <string>
+#include <cstdio>
+#include <sstream>
+
+namespace {
+
+// Wait for N things to be done.
+class waiter {
+ size_t count;
+ public:
+ waiter(size_t n) : count(n) {}
+ void done() { if (--count == 0) ready(); }
+ virtual void ready() = 0;
+};
+
+class server_connection_handler : public proton::messaging_handler {
+
+ struct listen_handler : public proton::listen_handler {
+ proton::connection_options opts;
+ std::string url;
+ waiter& listen_waiter;
+
+ listen_handler(proton::messaging_handler& h, waiter& w) : listen_waiter(w) {
+ opts.handler(h);
+ }
+
+ void on_open(proton::listener& l) PN_CPP_OVERRIDE {
+ std::ostringstream o;
+ o << "//:" << l.port(); // Connect to the actual listening port
+ url = o.str();
+ // Schedule rather than call done() direct to ensure serialization
+ l.container().schedule(proton::duration::IMMEDIATE,
+ proton::make_work(&waiter::done, &listen_waiter));
+ }
+
+ proton::connection_options on_accept(proton::listener&) PN_CPP_OVERRIDE { return opts; }
+ };
+
+ proton::listener listener_;
+ proton::sender sender_;
+ int expect_;
+ bool closing_;
+ int available_;
+ int acked_;
+ listen_handler listen_handler_;
+ proton::work_queue *notify_wq_;
+ proton::work notify_work_;
+
+ void close (proton::connection &c) {
+ if (closing_) return;
+
+ c.close(proton::error_condition("amqp:connection:forced", "Failover testing"));
+ closing_ = true;
+ }
+
+ public:
+ server_connection_handler(proton::container& c, int a, waiter& w)
+ : expect_(0), closing_(false), available_(a), acked_(0), listen_handler_(*this, w), notify_wq_(0)
+ {
+ listener_ = c.listen("//:0", listen_handler_);
+ }
+
+ std::string url() const {
+ if (listen_handler_.url.empty()) throw std::runtime_error("no url");
+ return listen_handler_.url;
+ }
+
+ void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE {
+ // Only listen for a single connection
+ listener_.stop();
+ c.open();
+ }
+
+ void on_sender_open(proton::sender &s) PN_CPP_OVERRIDE {
+ s.open();
+ sender_ = s;
+ }
+
+ void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
+ send_available_messages(s);
+ }
+
+ void send_available_messages(proton::sender &s) {
+ bool draining = s.draining();
+ while (available_ && s.credit() > 0) {
+ s.send(proton::message("hello"));
+ available_--;
+ expect_++;
+ }
+ if (draining && !available_ && s.credit()) {
+ s.return_credit(); // return the rest
+ }
+ }
+
+ void notify_idle() {
+ notify_wq_->add(notify_work_);
+ }
+
+ void on_tracker_accept(proton::tracker& d) PN_CPP_OVERRIDE {
+ acked_++;
+ if (acked_ == expect_ && (available_ == 0 || d.sender().credit() == 0))
+ notify_idle();
+ }
+
+ void on_transport_error(proton::transport & ) PN_CPP_OVERRIDE {
+ // If we get an error then (try to) stop the listener
+ // - this will stop the listener if we didn't already accept a connection
+ listener_.stop();
+ }
+
+ void notify_on_idle(proton::work_queue &wq, proton::work &w) { notify_wq_ = &wq; notify_work_ = w;}
+
+ void available(int i) {
+ available_ = i;
+ }
+};
+
+class tester : public proton::messaging_handler, public waiter {
+ public:
+ tester() : waiter(1), container_(*this, "credit_tester"),
+ received_(0), initial_credit_(0) {}
+
+ void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
+ srv_.reset(new server_connection_handler(c, 100000, *this));
+ }
+
+ // waiter::ready is called when listener can accept connections.
+ void ready() PN_CPP_OVERRIDE {
+ container_.connect(srv_->url());
+ }
+
+ void on_connection_open(proton::connection& c) PN_CPP_OVERRIDE {
+ c.open_receiver("messages", proton::receiver_options().credit_window(0));
+ }
+
+ void on_receiver_open(proton::receiver &r) PN_CPP_OVERRIDE {
+ receiver_ = r;
+ next_idle_ = proton::make_work(&tester::first_idle, this);
+ proton::work call_on_server_idle(make_work(&tester::on_server_idle, this));
+ srv_->notify_on_idle(r.work_queue(), call_on_server_idle);
+ r.add_credit(initial_credit_);
+ }
+
+ void on_message(proton::delivery &d, proton::message &m) PN_CPP_OVERRIDE {
+ received_++;
+ d.accept();
+ }
+
+ void run() {
+ container_.run(1); // Single threaded to avoid locks and barriers.
+ }
+
+ void server_available(int available) {
+ // If multithreaded, locking would be required.
+ srv_->available(available);
+ }
+
+ void on_server_idle() {
+ next_idle_();
+ }
+
+ void fail(const std::string &msg, int rcv) {
+ // Call from work_queue. Remember the exception later.
+ std::ostringstream os;
+ os << msg << rcv;
+ fail_msg_ = os.str();
+ receiver_.connection().close();
+ }
+
+ void on_connection_close(proton::connection& c) PN_CPP_OVERRIDE {
+ if (!fail_msg_.empty())
+ FAIL(fail_msg_);
+ }
+
+ virtual void first_idle() = 0;
+
+ protected:
+ proton::internal::pn_unique_ptr<server_connection_handler> srv_;
+ proton::container container_;
+ proton::receiver receiver_;
+ proton::work next_idle_;
+ std::string fail_msg_;
+ int received_;
+ int initial_credit_;
+};
+
+
+class basic_credit_tester : public tester {
+ public:
+ basic_credit_tester() { initial_credit_ = 3; }
+
+ void first_idle() PN_CPP_OVERRIDE {
+ if (received_ != 3) {
+ fail(FAIL_MSG("messages received should be 3 not "), received_);
+ return;
+ }
+ next_idle_ = proton::make_work(&basic_credit_tester::second_idle, this);
+ server_available(2);
+ receiver_.add_credit(3);
+ }
+
+ void second_idle() {
+ if (received_ != 5) {
+ fail(FAIL_MSG("messages received should be 5 not "), received_);
+ return;
+ }
+ next_idle_ = proton::make_work(&basic_credit_tester::third_idle, this);
+ server_available(10);
+ receiver_.add_credit(1);
+ }
+
+ void third_idle() {
+ if (received_ != 7) {
+ fail(FAIL_MSG("messages received should be 7 not "), received_);
+ return;
+ }
+ // passed
+ receiver_.connection().close();
+ }
+};
+
+
+int test_basic_credit() {
+ basic_credit_tester().run();
+ return 0;
+}
+
+
+class drain_credit_tester : public tester {
+ int drain_finishes_;
+
+ public:
+ drain_credit_tester() : drain_finishes_(0) { initial_credit_ = 10; }
+
+ void on_receiver_drain_finish(proton::receiver &r) PN_CPP_OVERRIDE {
+ drain_finishes_++;
+ }
+
+ void first_idle() PN_CPP_OVERRIDE {
+ if (received_ != 10) {
+ fail(FAIL_MSG("messages received should be 10 not "), received_);
+ return;
+ }
+ next_idle_ = proton::make_work(&drain_credit_tester::second_idle, this);
+ server_available(10);
+ receiver_.add_credit(15);
+ receiver_.drain();
+ }
+
+ void second_idle() {
+ if (received_ != 20) {
+ fail(FAIL_MSG("messages received should be 20 not "), received_);
+ return;
+ }
+ if (drain_finishes_ != 1) {
+ fail(FAIL_MSG("drain finish callbacks should be 1, not: "), drain_finishes_);
+ return;
+ }
+ if (receiver_.credit() != 0) {
+ fail(FAIL_MSG("credit not returned on drain, remaining: "), receiver_.credit());
+ return;
+ }
+ next_idle_ = proton::make_work(&drain_credit_tester::third_idle, this);
+ server_available(5);
+ receiver_.add_credit(10);
+ }
+
+ void third_idle() {
+ if (received_ != 25) {
+ fail(FAIL_MSG("messages received should be 20 not "), received_);
+ return;
+ }
+ if (receiver_.credit() != 5) {
+ fail(FAIL_MSG("incorrect credit after drain, should be 5, not "), receiver_.credit());
+ return;
+ }
+ next_idle_ = proton::make_work(&drain_credit_tester::fourth_idle, this);
+ server_available(3);
+ receiver_.add_credit(1);
+ }
+
+ void fourth_idle() {
+ if (received_ != 28) {
+ fail(FAIL_MSG("messages received should be 28 not "), received_);
+ return;
+ }
+ if (receiver_.credit() != 3) {
+ fail(FAIL_MSG("incorrect credit, should be 3, not "), receiver_.credit());
+ return;
+ }
+ next_idle_ = proton::make_work(&drain_credit_tester::fifth_idle, this);
+ server_available(1);
+ receiver_.drain();
+ }
+
+ void fifth_idle() {
+ if (received_ != 29) {
+ fail(FAIL_MSG("messages received should be 29 not "), received_);
+ return;
+ }
+ if (drain_finishes_ != 2) {
+ fail(FAIL_MSG("drain finish callbacks should be 2, not: "), drain_finishes_);
+ return;
+ }
+ if (receiver_.credit() != 0) {
+ fail(FAIL_MSG("second drain credit failed, should be 0, not "), receiver_.credit());
+ return;
+ }
+ // passed
+ receiver_.connection().close();
+ }
+};
+
+int test_drain_credit() {
+ drain_credit_tester().run();
+ return 0;
+}
+
+
+} // namespace
+
+
+int main(int argc, char** argv) {
+ int failed = 0;
+ RUN_ARGV_TEST(failed, test_basic_credit());
+ RUN_ARGV_TEST(failed, test_drain_credit());
+ return failed;
+}
diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp
index 78df0da..a495f58 100644
--- a/cpp/src/messaging_adapter.cpp
+++ b/cpp/src/messaging_adapter.cpp
@@ -84,6 +84,7 @@ void on_link_flow(messaging_handler& handler, pn_event_t* event) {
// receiver
if (!pn_link_credit(lnk) && lctx.draining) {
lctx.draining = false;
+ pn_link_set_drain(lnk, false);
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_drain_finish(r);
}
@@ -132,6 +133,7 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) {
d.accept();
if (lctx.draining && !pn_link_credit(lnk)) {
lctx.draining = false;
+ pn_link_set_drain(lnk, false);
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_drain_finish(r);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org