You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/12/11 06:54:13 UTC
[qpid-proton] branch master updated: PROTON-2309: [cpp] Improve
reconnect logic and examples - Fix bug in C++ binding that stops it from
responding to a forced close from its peer with a close frame before
closing the socket if reconnect is turned on. - Added a reconnect option to
the simple_send example - Made reconnect work in simple_connect example
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push:
new 1191cfe PROTON-2309: [cpp] Improve reconnect logic and examples - Fix bug in C++ binding that stops it from responding to a forced close from its peer with a close frame before closing the socket if reconnect is turned on. - Added a reconnect option to the simple_send example - Made reconnect work in simple_connect example
1191cfe is described below
commit 1191cfec400d3d76da0f3d2e3ca2da4c3e69a044
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Fri Dec 11 01:43:53 2020 -0500
PROTON-2309: [cpp] Improve reconnect logic and examples
- Fix bug in C++ binding that stops it from responding to a forced close from
its peer with a close frame before closing the socket if reconnect is turned on.
- Added a reconnect option to the simple_send example
- Made reconnect work in simple_connect example
---
cpp/examples/simple_connect.cpp | 6 +-----
cpp/examples/simple_send.cpp | 13 +++++++++----
cpp/src/connection.cpp | 6 +++++-
cpp/src/contexts.cpp | 2 +-
cpp/src/contexts.hpp | 1 +
cpp/src/proactor_container_impl.cpp | 39 ++++++++++++++++++++++---------------
cpp/src/proactor_container_impl.hpp | 1 -
7 files changed, 40 insertions(+), 28 deletions(-)
diff --git a/cpp/examples/simple_connect.cpp b/cpp/examples/simple_connect.cpp
index 74a8c87..a9d6d4c 100644
--- a/cpp/examples/simple_connect.cpp
+++ b/cpp/examples/simple_connect.cpp
@@ -68,11 +68,7 @@ class simple_connect : public proton::messaging_handler {
}
void on_connection_open(proton::connection &c) OVERRIDE {
- c.close();
- }
-
- void on_error(const proton::error_condition& e) OVERRIDE {
- throw std::runtime_error(e.what());
+ if (!reconnect) c.close();
}
};
diff --git a/cpp/examples/simple_send.cpp b/cpp/examples/simple_send.cpp
index 8e73fa5..21cc67f 100644
--- a/cpp/examples/simple_send.cpp
+++ b/cpp/examples/simple_send.cpp
@@ -27,6 +27,7 @@
#include <proton/message.hpp>
#include <proton/message_id.hpp>
#include <proton/messaging_handler.hpp>
+#include <proton/reconnect_options.hpp>
#include <proton/tracker.hpp>
#include <proton/types.hpp>
@@ -40,19 +41,21 @@ class simple_send : public proton::messaging_handler {
std::string url;
std::string user;
std::string password;
+ bool reconnect;
proton::sender sender;
int sent;
int confirmed;
int total;
public:
- simple_send(const std::string &s, const std::string &u, const std::string &p, int c) :
- url(s), user(u), password(p), sent(0), confirmed(0), total(c) {}
+ simple_send(const std::string &s, const std::string &u, const std::string &p, bool r, int c) :
+ url(s), user(u), password(p), reconnect(r), sent(0), confirmed(0), total(c) {}
void on_container_start(proton::container &c) OVERRIDE {
proton::connection_options co;
if (!user.empty()) co.user(user);
if (!password.empty()) co.password(password);
+ if (reconnect) co.reconnect(proton::reconnect_options());
sender = c.open_sender(url, co);
}
@@ -94,18 +97,20 @@ int main(int argc, char **argv) {
std::string address("127.0.0.1:5672/examples");
std::string user;
std::string password;
+ bool reconnect = false;
int message_count = 100;
example::options opts(argc, argv);
opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
- opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
opts.add_value(user, 'u', "user", "authenticate as USER", "USER");
opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD");
+ opts.add_flag(reconnect, 'r', "reconnect", "reconnect on connection failure");
+ opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
try {
opts.parse();
- simple_send send(address, user, password, message_count);
+ simple_send send(address, user, password, reconnect, message_count);
proton::container(send).run();
return 0;
diff --git a/cpp/src/connection.cpp b/cpp/src/connection.cpp
index c6686ae..ec3a851 100644
--- a/cpp/src/connection.cpp
+++ b/cpp/src/connection.cpp
@@ -60,7 +60,11 @@ void connection::open(const connection_options &opts) {
pn_connection_open(pn_object());
}
-void connection::close() { pn_connection_close(pn_object()); }
+void connection::close() {
+ pn_connection_close(pn_object());
+ reconnect_context* rctx = connection_context::get(pn_object()).reconnect_context_.get();
+ if (rctx) rctx->stop_reconnect_ = true;
+}
std::string connection::virtual_host() const {
return str(pn_connection_remote_hostname(pn_object()));
diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp
index 65d883a..f6b7822 100644
--- a/cpp/src/contexts.cpp
+++ b/cpp/src/contexts.cpp
@@ -73,7 +73,7 @@ connection_context::connection_context() :
{}
reconnect_context::reconnect_context(const reconnect_options_base& ro) :
- reconnect_options_(ro), retries_(0), current_url_(-1), reconnected_(false)
+ reconnect_options_(ro), retries_(0), current_url_(-1), stop_reconnect_(false), reconnected_(false)
{}
listener_context::listener_context() : listen_handler_(0) {}
diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp
index 6660b55..dcd8081 100644
--- a/cpp/src/contexts.hpp
+++ b/cpp/src/contexts.hpp
@@ -112,6 +112,7 @@ class reconnect_context {
duration delay_;
int retries_;
int current_url_;
+ bool stop_reconnect_;
bool reconnected_;
};
diff --git a/cpp/src/proactor_container_impl.cpp b/cpp/src/proactor_container_impl.cpp
index 0f02891..1fae73b 100644
--- a/cpp/src/proactor_container_impl.cpp
+++ b/cpp/src/proactor_container_impl.cpp
@@ -299,11 +299,13 @@ duration next_delay(reconnect_context& rc) {
}
return random_between(roi.delay, rc.delay_);
}
+
+inline reconnect_context* get_reconnect_context(pn_connection_t* pnc) {
+ return connection_context::get(pnc).reconnect_context_.get();
}
-void container::impl::reset_reconnect(pn_connection_t* pnc) {
- connection_context& cc = connection_context::get(pnc);
- reconnect_context* rc = cc.reconnect_context_.get();
+void reset_reconnect(pn_connection_t* pnc) {
+ reconnect_context* rc = get_reconnect_context(pnc);
if (!rc) return;
@@ -313,10 +315,17 @@ void container::impl::reset_reconnect(pn_connection_t* pnc) {
rc->current_url_ = -1;
}
+}
+
bool container::impl::can_reconnect(pn_connection_t* pnc) {
+ reconnect_context* rc = get_reconnect_context(pnc);
+
+ // If reconnect not enabled just fail
+ if (!rc) return false;
+
// Don't reconnect if we are locally closed, the application will
// not expect a connection it closed to re-open.
- if (pn_connection_state(pnc) & PN_LOCAL_CLOSED) return false;
+ if (rc->stop_reconnect_) return false;
// If container stopping don't try to reconnect
// - we pretend to have set up a reconnect attempt so
@@ -326,11 +335,6 @@ bool container::impl::can_reconnect(pn_connection_t* pnc) {
GUARD(lock_);
if (stopping_) return true;
}
- connection_context& cc = connection_context::get(pnc);
- reconnect_context* rc = cc.reconnect_context_.get();
-
- // If reconnect not enabled just fail
- if (!rc) return false;
const reconnect_options_base& roi = rc->reconnect_options_;
@@ -665,17 +669,19 @@ container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) {
pn_connection_t *c = pn_event_connection(event);
pn_condition_t *cc = pn_connection_remote_condition(c);
- // amqp:connection:forced should be treated like a transport
- // disconnect. Hide the connection error/close events from the
- // application and generate a PN_TRANSPORT_CLOSE event.
- if (pn_condition_is_set(cc) &&
+ // If reconnect is on, amqp:connection:forced should be treated specially:
+ // Hide the connection error/close events from the application;
+ // Then we close the connection noting the forced close;
+ // Then set up for reconnect handling.
+ if (get_reconnect_context(c) &&
+ pn_condition_is_set(cc) &&
!strcmp(pn_condition_get_name(cc), "amqp:connection:forced"))
{
pn_transport_t* t = pn_event_transport(event);
pn_condition_t* tc = pn_transport_condition(t);
pn_condition_copy(tc, cc);
- pn_transport_close_head(t);
pn_transport_close_tail(t);
+ pn_connection_close(c);
return ContinueLoop;
}
break;
@@ -702,8 +708,9 @@ container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) {
throw;
}
}
- // on_connection_reconnecting() may have closed the connection, check again.
- if (!(pn_connection_state(c) & PN_LOCAL_CLOSED)) {
+ // on_transport_error() may have closed the connection, check again.
+ reconnect_context* rc = get_reconnect_context(c);
+ if (rc && !(rc->stop_reconnect_)) {
setup_reconnect(c);
return ContinueLoop;
}
diff --git a/cpp/src/proactor_container_impl.hpp b/cpp/src/proactor_container_impl.hpp
index b58ab0e..c8dd1ad 100644
--- a/cpp/src/proactor_container_impl.hpp
+++ b/cpp/src/proactor_container_impl.hpp
@@ -107,7 +107,6 @@ class container::impl {
void reconnect(pn_connection_t* pnc);
bool can_reconnect(pn_connection_t* pnc);
void setup_reconnect(pn_connection_t* pnc);
- void reset_reconnect(pn_connection_t* pnc);
// Event loop to run in each container thread
void thread();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org