You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2017/09/01 20:45:27 UTC

qpid-proton git commit: PROTON-1567: Implement failover urls - Example "reliable" client sending and receiving messages - Also add jitter to retry backoff (with C++11)

Repository: qpid-proton
Updated Branches:
  refs/heads/master a8f0c956a -> e631bf6b1


PROTON-1567: Implement failover urls
- Example "reliable" client sending and receiving messages
- Also add jitter to retry backoff (with C++11)


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e631bf6b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e631bf6b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e631bf6b

Branch: refs/heads/master
Commit: e631bf6b11960d9687d42dfdde1ff4c65804981c
Parents: a8f0c95
Author: Andrew Stitcher <as...@apache.org>
Authored: Thu Aug 31 17:31:17 2017 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Fri Sep 1 16:39:17 2017 -0400

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |   1 +
 examples/cpp/reconnect_client.cpp               | 144 +++++++++++++++++++
 .../cpp/include/proton/connection_options.hpp   |   1 -
 .../cpp/include/proton/internal/config.hpp      |   8 ++
 .../cpp/include/proton/reconnect_options.hpp    |   5 +-
 proton-c/bindings/cpp/src/contexts.cpp          |   3 +-
 proton-c/bindings/cpp/src/include/contexts.hpp  |   1 +
 .../cpp/src/include/reconnect_options_impl.hpp  |   4 +
 .../cpp/src/proactor_container_impl.cpp         |  59 ++++++--
 proton-c/bindings/cpp/src/reconnect_options.cpp |   1 +
 10 files changed, 213 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index d116913..a8d9d34 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -41,6 +41,7 @@ foreach(example
     helloworld_direct
     simple_recv
     simple_send
+    reconnect_client
     message_properties
     scheduled_send_03
     direct_recv

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/examples/cpp/reconnect_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/reconnect_client.cpp b/examples/cpp/reconnect_client.cpp
new file mode 100644
index 0000000..6075f03
--- /dev/null
+++ b/examples/cpp/reconnect_client.cpp
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/default_container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/link.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/reconnect_options.hpp>
+#include <proton/value.hpp>
+#include <proton/types.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include "fake_cpp11.hpp"
+
+class reconnect_client : public proton::messaging_handler {
+    std::string url;
+    std::string address;
+    std::vector<std::string> failovers;
+    proton::sender sender;
+    int sent;
+    int expected;
+    int received;
+
+  public:
+    reconnect_client(const std::string &u, const std::string& a, int c, const std::vector<std::string>& f) :
+        url(u), address(a), failovers(f), sent(0), expected(c), received(0) {}
+
+  private:
+    void on_container_start(proton::container &c) OVERRIDE {
+        proton::connection_options co;
+        proton::reconnect_options ro;
+
+        ro.failover_urls(failovers);
+        co.reconnect(ro);
+        c.connect(url, co);
+    }
+
+    void on_connection_open(proton::connection & c) OVERRIDE {
+        c.open_receiver(address);
+        c.open_sender(address);
+        // reconnect we probably lost the last message sent
+        sent = received;
+        std::cout << "simple_recv listening on " << url << std::endl;
+    }
+
+    void on_message(proton::delivery &d, proton::message &msg) OVERRIDE {
+        if (proton::coerce<int>(msg.id()) < received) {
+            return; // Ignore duplicate
+        }
+
+        if (expected == 0 || received < expected) {
+            std::cout << msg.body() << std::endl;
+            received++;
+
+            if (received == expected) {
+                d.receiver().close();
+                sender.close();
+                d.connection().close();
+            } else {
+                // See if we can send any messages now
+                send(sender);
+            }
+        }
+    }
+
+    void send(proton::sender& s) {
+        // Only send with credit and only allow one outstanding message
+        while (s.credit() && sent < received+1) {
+            std::map<std::string, int> m;
+            m["sequence"] = sent + 1;
+
+            proton::message msg;
+            msg.id(sent + 1);
+            msg.body(m);
+
+            std::cout << "Sending: " << sent+1 << std::endl;
+            s.send(msg);
+            sent++;
+        }
+    }
+
+    void on_sender_open(proton::sender & s) OVERRIDE {
+        sender = s;
+    }
+
+    void on_sendable(proton::sender &s) OVERRIDE {
+        send(s);
+    }
+};
+
+int main(int argc, const char** argv) {
+    try {
+        if (argc < 4) {
+            std ::cerr <<
+                "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT FAILOVER-URL...\n"
+                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+                "MESSAGE-COUNT: number of messages to receive\n"
+                "FAILOVER_URL...: zero or more failover urls\n";
+            return 1;
+        }
+        const char *url = argv[1];
+        const char *address = argv[2];
+        int message_count = atoi(argv[3]);
+        std::vector<std::string> failovers(&argv[4], &argv[argc]);
+
+        reconnect_client client(url, address, message_count, failovers);
+        proton::default_container(client).run();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 066e8cf..02a9027 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -149,7 +149,6 @@ class connection_options {
     /// **Experimental** - Options for reconnect on outgoing connections.
     PN_CPP_EXTERN connection_options& reconnect(reconnect_options &);
 
-    
 
     /// Update option values from values set in other.
     PN_CPP_EXTERN connection_options& update(const connection_options& other);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/include/proton/internal/config.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp b/proton-c/bindings/cpp/include/proton/internal/config.hpp
index 54b014b..e600f21 100644
--- a/proton-c/bindings/cpp/include/proton/internal/config.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp
@@ -83,6 +83,10 @@
 #define PN_CPP_HAS_DELETED_FUNCTIONS PN_CPP_HAS_CPP11
 #endif
 
+#ifndef PN_CPP_HAS_THREAD_LOCAL
+#define PN_CPP_HAS_THREAD_LOCAL PN_CPP_HAS_CPP11
+#endif
+
 #ifndef PN_CPP_HAS_STD_FUNCTION
 #define PN_CPP_HAS_STD_FUNCTION PN_CPP_HAS_CPP11
 #endif
@@ -95,6 +99,10 @@
 #define PN_CPP_HAS_CHRONO PN_CPP_HAS_CPP11
 #endif
 
+#ifndef PN_CPP_HAS_RANDOM
+#define PN_CPP_HAS_RANDOM PN_CPP_HAS_CPP11
+#endif
+
 #ifndef PN_CPP_HAS_STD_MUTEX
 #define PN_CPP_HAS_STD_MUTEX PN_CPP_HAS_CPP11
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/reconnect_options.hpp b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
index e8ed02c..abbd517 100644
--- a/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
@@ -28,6 +28,7 @@
 #include "./source.hpp"
 
 #include <string>
+#include <vector>
 
 namespace proton {
 
@@ -67,8 +68,8 @@ class reconnect_options {
     /// Maximum reconnect attempts (default 0, meaning no limit)
     PN_CPP_EXTERN reconnect_options& max_attempts(int);
 
-    /// TODO: failover_urls
-
+    /// Alternative connection urls used for failover
+    PN_CPP_EXTERN reconnect_options& failover_urls(const std::vector<std::string>& urls);
 
   private:
     class impl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp
index 812d573..d076478 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -72,7 +72,8 @@ connection_context::connection_context() :
 {}
 
 reconnect_context::reconnect_context(const reconnect_options& ro, const connection_options& co) :
-    reconnect_options_(new reconnect_options(ro)), connection_options_(new connection_options(co)), retries_(0)
+    reconnect_options_(new reconnect_options(ro)), connection_options_(new connection_options(co)),
+    retries_(0), current_url_(-1)
 {}
 
 listener_context::listener_context() : listen_handler_(0) {}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/include/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp
index 7920d70..7797b08 100644
--- a/proton-c/bindings/cpp/src/include/contexts.hpp
+++ b/proton-c/bindings/cpp/src/include/contexts.hpp
@@ -106,6 +106,7 @@ class reconnect_context {
     internal::pn_unique_ptr<const connection_options> connection_options_;
     duration delay_;
     int retries_;
+    int current_url_;
 };
 
 class listener_context : public context {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
index fc90508..2aca82d 100644
--- a/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
@@ -24,6 +24,9 @@
 
 #include "proton/duration.hpp"
 
+#include <string>
+#include <vector>
+
 namespace proton {
 
 class reconnect_options::impl {
@@ -34,6 +37,7 @@ class reconnect_options::impl {
     float    delay_multiplier;
     duration max_delay;
     int      max_attempts;
+    std::vector<std::string> failover_urls;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 4f90c68..b818f65 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -46,6 +46,13 @@
 # include <thread>
 #endif
 
+#if PN_CPP_HAS_RANDOM
+# include <random>
+#endif
+
+// XXXX: Debug
+//#include <iostream>
+
 namespace proton {
 
 class container::impl::common_work_queue : public work_queue::impl {
@@ -179,8 +186,8 @@ pn_connection_t* container::impl::make_connection_lh(
     cc.container = &container_;
     cc.handler = mh;
     cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, pnc);
-
     cc.connected_address_ = url;
+
     setup_connection_lh(url, pnc);
     make_wrapper(pnc).open(opts);
 
@@ -205,38 +212,70 @@ void container::impl::start_connection(const url& url, pn_connection_t *pnc) {
 
 void container::impl::reconnect(pn_connection_t* pnc) {
     connection_context& cc = connection_context::get(pnc);
-    reconnect_context* rc = cc.reconnect_context_.get();
+    reconnect_context& rc = *cc.reconnect_context_.get();
+    const reconnect_options::impl& roi = *rc.reconnect_options_->impl_;
 
     // Figure out next connection url to try
-    const proton::url url(cc.connected_address_);
+    // rc.current_url_ == -1 means try the url specified in connect, not a failover url
+    const proton::url url(rc.current_url_==-1 ? cc.connected_address_ : roi.failover_urls[rc.current_url_]);
+
+    // XXXX Debug:
+    //std::cout << "Retries: " << rc.retries_ << " Delay: " << rc.delay_ << " Trying: " << url << "\n";
 
-    cc.connected_address_ = url;
     setup_connection_lh(url, pnc);
-    { // Scope required to keep temporary destructor from doing pn_decref() after start_connection()
-        make_wrapper(pnc).open(*rc->connection_options_);
+    make_wrapper(pnc).open(*rc.connection_options_);
+    start_connection(url, pnc);
+
+    // Did we go through all the urls?
+    if (rc.current_url_==int(roi.failover_urls.size())-1) {
+        rc.current_url_ = -1;
+        ++rc.retries_;
+    } else {
+        ++rc.current_url_;
     }
-    start_connection(cc.connected_address_, pnc);
-    rc->retries_++;
+}
+
+namespace {
+#if PN_CPP_HAS_RANDOM && PN_CPP_HAS_THREAD_LOCAL
+duration random_between(duration min, duration max)
+{
+    static thread_local std::default_random_engine gen;
+    std::uniform_int_distribution<duration::numeric_type> dist{min.milliseconds(), max.milliseconds()};
+    return duration(dist(gen));
+}
+#else
+duration random_between(duration, duration max)
+{
+    return max;
+}
+#endif
 }
 
 duration container::impl::next_delay(reconnect_context& rc) {
     // If we've not retried before do it immediately
     if (rc.retries_==0) return duration(0);
 
+    // If we haven't tried all failover urls yet this round do it immediately
+    if (rc.current_url_!=-1) return duration(0);
+
     const reconnect_options::impl& roi = *rc.reconnect_options_->impl_;
     if (rc.retries_==1) {
         rc.delay_ = roi.delay;
     } else {
         rc.delay_ = std::min(roi.max_delay, rc.delay_ * roi.delay_multiplier);
     }
-    return rc.delay_;
+    return random_between(roi.delay, rc.delay_);
 }
 
 void container::impl::reset_reconnect(pn_connection_t* pnc) {
     connection_context& cc = connection_context::get(pnc);
     reconnect_context* rc = cc.reconnect_context_.get();
 
-    if (rc) rc->retries_ = 0;
+    if (!rc) return;
+
+    rc->delay_ = 0;
+    rc->retries_ = 0;
+    rc->current_url_ = -1;
 }
 
 bool container::impl::setup_reconnect(pn_connection_t* pnc) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/reconnect_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/reconnect_options.cpp b/proton-c/bindings/cpp/src/reconnect_options.cpp
index ef0d497..a469d3f 100644
--- a/proton-c/bindings/cpp/src/reconnect_options.cpp
+++ b/proton-c/bindings/cpp/src/reconnect_options.cpp
@@ -39,5 +39,6 @@ reconnect_options& reconnect_options::delay(duration d) { impl_->delay = d; retu
 reconnect_options& reconnect_options::delay_multiplier(float f) { impl_->delay_multiplier = f; return *this; }
 reconnect_options& reconnect_options::max_delay(duration d) { impl_->max_delay = d; return *this; }
 reconnect_options& reconnect_options::max_attempts(int i) { impl_->max_attempts = i; return *this; }
+reconnect_options& reconnect_options::failover_urls(const std::vector<std::string>& urls) { impl_->failover_urls = urls; return *this; }
 
 } // namespace proton


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org