You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/12/11 06:54:13 UTC

[qpid-proton] branch master updated: PROTON-2309: [cpp] Improve reconnect logic and examples - Fix bug in C++ binding that stops it from responding to a forced close from its peer with a close frame before closing the socket if reconnect is turned on. - Added a reconnect option to the simple_send example - Made reconnect work in simple_connect example

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new 1191cfe  PROTON-2309: [cpp] Improve reconnect logic and examples - Fix bug in C++ binding that stops it from responding to a forced close from   its peer with a close frame before closing the socket if reconnect is turned on. - Added a reconnect option to the simple_send example - Made reconnect work in simple_connect example
1191cfe is described below

commit 1191cfec400d3d76da0f3d2e3ca2da4c3e69a044
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Fri Dec 11 01:43:53 2020 -0500

    PROTON-2309: [cpp] Improve reconnect logic and examples
    - Fix bug in C++ binding that stops it from responding to a forced close from
      its peer with a close frame before closing the socket if reconnect is turned on.
    - Added a reconnect option to the simple_send example
    - Made reconnect work in simple_connect example
---
 cpp/examples/simple_connect.cpp     |  6 +-----
 cpp/examples/simple_send.cpp        | 13 +++++++++----
 cpp/src/connection.cpp              |  6 +++++-
 cpp/src/contexts.cpp                |  2 +-
 cpp/src/contexts.hpp                |  1 +
 cpp/src/proactor_container_impl.cpp | 39 ++++++++++++++++++++++---------------
 cpp/src/proactor_container_impl.hpp |  1 -
 7 files changed, 40 insertions(+), 28 deletions(-)

diff --git a/cpp/examples/simple_connect.cpp b/cpp/examples/simple_connect.cpp
index 74a8c87..a9d6d4c 100644
--- a/cpp/examples/simple_connect.cpp
+++ b/cpp/examples/simple_connect.cpp
@@ -68,11 +68,7 @@ class simple_connect : public proton::messaging_handler {
     }
 
     void on_connection_open(proton::connection &c) OVERRIDE {
-        c.close();
-    }
-
-    void on_error(const proton::error_condition& e) OVERRIDE {
-        throw std::runtime_error(e.what());
+        if (!reconnect) c.close();
     }
 };
 
diff --git a/cpp/examples/simple_send.cpp b/cpp/examples/simple_send.cpp
index 8e73fa5..21cc67f 100644
--- a/cpp/examples/simple_send.cpp
+++ b/cpp/examples/simple_send.cpp
@@ -27,6 +27,7 @@
 #include <proton/message.hpp>
 #include <proton/message_id.hpp>
 #include <proton/messaging_handler.hpp>
+#include <proton/reconnect_options.hpp>
 #include <proton/tracker.hpp>
 #include <proton/types.hpp>
 
@@ -40,19 +41,21 @@ class simple_send : public proton::messaging_handler {
     std::string url;
     std::string user;
     std::string password;
+    bool reconnect;
     proton::sender sender;
     int sent;
     int confirmed;
     int total;
 
   public:
-    simple_send(const std::string &s, const std::string &u, const std::string &p, int c) :
-        url(s), user(u), password(p), sent(0), confirmed(0), total(c) {}
+    simple_send(const std::string &s, const std::string &u, const std::string &p, bool r, int c) :
+        url(s), user(u), password(p), reconnect(r), sent(0), confirmed(0), total(c) {}
 
     void on_container_start(proton::container &c) OVERRIDE {
         proton::connection_options co;
         if (!user.empty()) co.user(user);
         if (!password.empty()) co.password(password);
+        if (reconnect) co.reconnect(proton::reconnect_options());
         sender = c.open_sender(url, co);
     }
 
@@ -94,18 +97,20 @@ int main(int argc, char **argv) {
     std::string address("127.0.0.1:5672/examples");
     std::string user;
     std::string password;
+    bool reconnect = false;
     int message_count = 100;
     example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
-    opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
     opts.add_value(user, 'u', "user", "authenticate as USER", "USER");
     opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD");
+    opts.add_flag(reconnect, 'r', "reconnect", "reconnect on connection failure");
+    opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
 
     try {
         opts.parse();
 
-        simple_send send(address, user, password, message_count);
+        simple_send send(address, user, password, reconnect, message_count);
         proton::container(send).run();
 
         return 0;
diff --git a/cpp/src/connection.cpp b/cpp/src/connection.cpp
index c6686ae..ec3a851 100644
--- a/cpp/src/connection.cpp
+++ b/cpp/src/connection.cpp
@@ -60,7 +60,11 @@ void connection::open(const connection_options &opts) {
     pn_connection_open(pn_object());
 }
 
-void connection::close() { pn_connection_close(pn_object()); }
+void connection::close() {
+  pn_connection_close(pn_object());
+  reconnect_context* rctx = connection_context::get(pn_object()).reconnect_context_.get();
+  if (rctx) rctx->stop_reconnect_ = true;
+}
 
 std::string connection::virtual_host() const {
     return str(pn_connection_remote_hostname(pn_object()));
diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp
index 65d883a..f6b7822 100644
--- a/cpp/src/contexts.cpp
+++ b/cpp/src/contexts.cpp
@@ -73,7 +73,7 @@ connection_context::connection_context() :
 {}
 
 reconnect_context::reconnect_context(const reconnect_options_base& ro) :
-    reconnect_options_(ro), retries_(0), current_url_(-1), reconnected_(false)
+    reconnect_options_(ro), retries_(0), current_url_(-1), stop_reconnect_(false), reconnected_(false)
 {}
 
 listener_context::listener_context() : listen_handler_(0) {}
diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp
index 6660b55..dcd8081 100644
--- a/cpp/src/contexts.hpp
+++ b/cpp/src/contexts.hpp
@@ -112,6 +112,7 @@ class reconnect_context {
     duration delay_;
     int retries_;
     int current_url_;
+    bool stop_reconnect_;
     bool reconnected_;
 };
 
diff --git a/cpp/src/proactor_container_impl.cpp b/cpp/src/proactor_container_impl.cpp
index 0f02891..1fae73b 100644
--- a/cpp/src/proactor_container_impl.cpp
+++ b/cpp/src/proactor_container_impl.cpp
@@ -299,11 +299,13 @@ duration next_delay(reconnect_context& rc) {
     }
     return random_between(roi.delay, rc.delay_);
 }
+
+inline reconnect_context* get_reconnect_context(pn_connection_t* pnc) {
+    return connection_context::get(pnc).reconnect_context_.get();
 }
 
-void container::impl::reset_reconnect(pn_connection_t* pnc) {
-    connection_context& cc = connection_context::get(pnc);
-    reconnect_context* rc = cc.reconnect_context_.get();
+void reset_reconnect(pn_connection_t* pnc) {
+    reconnect_context* rc = get_reconnect_context(pnc);
 
     if (!rc) return;
 
@@ -313,10 +315,17 @@ void container::impl::reset_reconnect(pn_connection_t* pnc) {
     rc->current_url_ = -1;
 }
 
+}
+
 bool container::impl::can_reconnect(pn_connection_t* pnc) {
+    reconnect_context* rc = get_reconnect_context(pnc);
+
+    // If reconnect not enabled just fail
+    if (!rc) return false;
+
     // Don't reconnect if we are locally closed, the application will
     // not expect a connection it closed to re-open.
-    if (pn_connection_state(pnc) & PN_LOCAL_CLOSED) return false;
+    if (rc->stop_reconnect_) return false;
 
     // If container stopping don't try to reconnect
     // - we pretend to have set up a reconnect attempt so
@@ -326,11 +335,6 @@ bool container::impl::can_reconnect(pn_connection_t* pnc) {
         GUARD(lock_);
         if (stopping_) return true;
     }
-    connection_context& cc = connection_context::get(pnc);
-    reconnect_context* rc = cc.reconnect_context_.get();
-
-    // If reconnect not enabled just fail
-    if (!rc) return false;
 
     const reconnect_options_base& roi = rc->reconnect_options_;
 
@@ -665,17 +669,19 @@ container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) {
         pn_connection_t *c = pn_event_connection(event);
         pn_condition_t *cc = pn_connection_remote_condition(c);
 
-        // amqp:connection:forced should be treated like a transport
-        // disconnect. Hide the connection error/close events from the
-        // application and generate a PN_TRANSPORT_CLOSE event.
-        if (pn_condition_is_set(cc) &&
+        // If reconnect is on, amqp:connection:forced should be treated specially:
+        // Hide the connection error/close events from the application;
+        // Then we close the connection noting the forced close;
+        // Then set up for reconnect handling.
+        if (get_reconnect_context(c) &&
+            pn_condition_is_set(cc) &&
             !strcmp(pn_condition_get_name(cc), "amqp:connection:forced"))
         {
             pn_transport_t* t = pn_event_transport(event);
             pn_condition_t* tc = pn_transport_condition(t);
             pn_condition_copy(tc, cc);
-            pn_transport_close_head(t);
             pn_transport_close_tail(t);
+            pn_connection_close(c);
             return ContinueLoop;
         }
         break;
@@ -702,8 +708,9 @@ container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) {
                         throw;
                 }
             }
-            // on_connection_reconnecting() may have closed the connection, check again.
-            if (!(pn_connection_state(c) & PN_LOCAL_CLOSED)) {
+            // on_transport_error() may have closed the connection, check again.
+            reconnect_context* rc = get_reconnect_context(c);
+            if (rc && !(rc->stop_reconnect_)) {
                 setup_reconnect(c);
                 return ContinueLoop;
             }
diff --git a/cpp/src/proactor_container_impl.hpp b/cpp/src/proactor_container_impl.hpp
index b58ab0e..c8dd1ad 100644
--- a/cpp/src/proactor_container_impl.hpp
+++ b/cpp/src/proactor_container_impl.hpp
@@ -107,7 +107,6 @@ class container::impl {
     void reconnect(pn_connection_t* pnc);
     bool can_reconnect(pn_connection_t* pnc);
     void setup_reconnect(pn_connection_t* pnc);
-    void reset_reconnect(pn_connection_t* pnc);
 
     // Event loop to run in each container thread
     void thread();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org