You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2017/09/01 20:45:27 UTC
qpid-proton git commit: PROTON-1567: Implement failover urls -
Example "reliable" client sending and receiving messages - Also add jitter to
retry backoff (with C++11)
Repository: qpid-proton
Updated Branches:
refs/heads/master a8f0c956a -> e631bf6b1
PROTON-1567: Implement failover urls
- Example "reliable" client sending and receiving messages
- Also add jitter to retry backoff (with C++11)
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e631bf6b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e631bf6b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e631bf6b
Branch: refs/heads/master
Commit: e631bf6b11960d9687d42dfdde1ff4c65804981c
Parents: a8f0c95
Author: Andrew Stitcher <as...@apache.org>
Authored: Thu Aug 31 17:31:17 2017 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Fri Sep 1 16:39:17 2017 -0400
----------------------------------------------------------------------
examples/cpp/CMakeLists.txt | 1 +
examples/cpp/reconnect_client.cpp | 144 +++++++++++++++++++
.../cpp/include/proton/connection_options.hpp | 1 -
.../cpp/include/proton/internal/config.hpp | 8 ++
.../cpp/include/proton/reconnect_options.hpp | 5 +-
proton-c/bindings/cpp/src/contexts.cpp | 3 +-
proton-c/bindings/cpp/src/include/contexts.hpp | 1 +
.../cpp/src/include/reconnect_options_impl.hpp | 4 +
.../cpp/src/proactor_container_impl.cpp | 59 ++++++--
proton-c/bindings/cpp/src/reconnect_options.cpp | 1 +
10 files changed, 213 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index d116913..a8d9d34 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -41,6 +41,7 @@ foreach(example
helloworld_direct
simple_recv
simple_send
+ reconnect_client
message_properties
scheduled_send_03
direct_recv
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/examples/cpp/reconnect_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/reconnect_client.cpp b/examples/cpp/reconnect_client.cpp
new file mode 100644
index 0000000..6075f03
--- /dev/null
+++ b/examples/cpp/reconnect_client.cpp
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/default_container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/link.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/reconnect_options.hpp>
+#include <proton/value.hpp>
+#include <proton/types.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include "fake_cpp11.hpp"
+
+class reconnect_client : public proton::messaging_handler {
+ std::string url;
+ std::string address;
+ std::vector<std::string> failovers;
+ proton::sender sender;
+ int sent;
+ int expected;
+ int received;
+
+ public:
+ reconnect_client(const std::string &u, const std::string& a, int c, const std::vector<std::string>& f) :
+ url(u), address(a), failovers(f), sent(0), expected(c), received(0) {}
+
+ private:
+ void on_container_start(proton::container &c) OVERRIDE {
+ proton::connection_options co;
+ proton::reconnect_options ro;
+
+ ro.failover_urls(failovers);
+ co.reconnect(ro);
+ c.connect(url, co);
+ }
+
+ void on_connection_open(proton::connection & c) OVERRIDE {
+ c.open_receiver(address);
+ c.open_sender(address);
+ // reconnect we probably lost the last message sent
+ sent = received;
+ std::cout << "simple_recv listening on " << url << std::endl;
+ }
+
+ void on_message(proton::delivery &d, proton::message &msg) OVERRIDE {
+ if (proton::coerce<int>(msg.id()) < received) {
+ return; // Ignore duplicate
+ }
+
+ if (expected == 0 || received < expected) {
+ std::cout << msg.body() << std::endl;
+ received++;
+
+ if (received == expected) {
+ d.receiver().close();
+ sender.close();
+ d.connection().close();
+ } else {
+ // See if we can send any messages now
+ send(sender);
+ }
+ }
+ }
+
+ void send(proton::sender& s) {
+ // Only send with credit and only allow one outstanding message
+ while (s.credit() && sent < received+1) {
+ std::map<std::string, int> m;
+ m["sequence"] = sent + 1;
+
+ proton::message msg;
+ msg.id(sent + 1);
+ msg.body(m);
+
+ std::cout << "Sending: " << sent+1 << std::endl;
+ s.send(msg);
+ sent++;
+ }
+ }
+
+ void on_sender_open(proton::sender & s) OVERRIDE {
+ sender = s;
+ }
+
+ void on_sendable(proton::sender &s) OVERRIDE {
+ send(s);
+ }
+};
+
+int main(int argc, const char** argv) {
+ try {
+ if (argc < 4) {
+ std ::cerr <<
+ "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT FAILOVER-URL...\n"
+ "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+ "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+ "MESSAGE-COUNT: number of messages to receive\n"
+ "FAILOVER_URL...: zero or more failover urls\n";
+ return 1;
+ }
+ const char *url = argv[1];
+ const char *address = argv[2];
+ int message_count = atoi(argv[3]);
+ std::vector<std::string> failovers(&argv[4], &argv[argc]);
+
+ reconnect_client client(url, address, message_count, failovers);
+ proton::default_container(client).run();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 066e8cf..02a9027 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -149,7 +149,6 @@ class connection_options {
/// **Experimental** - Options for reconnect on outgoing connections.
PN_CPP_EXTERN connection_options& reconnect(reconnect_options &);
-
/// Update option values from values set in other.
PN_CPP_EXTERN connection_options& update(const connection_options& other);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/include/proton/internal/config.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp b/proton-c/bindings/cpp/include/proton/internal/config.hpp
index 54b014b..e600f21 100644
--- a/proton-c/bindings/cpp/include/proton/internal/config.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp
@@ -83,6 +83,10 @@
#define PN_CPP_HAS_DELETED_FUNCTIONS PN_CPP_HAS_CPP11
#endif
+#ifndef PN_CPP_HAS_THREAD_LOCAL
+#define PN_CPP_HAS_THREAD_LOCAL PN_CPP_HAS_CPP11
+#endif
+
#ifndef PN_CPP_HAS_STD_FUNCTION
#define PN_CPP_HAS_STD_FUNCTION PN_CPP_HAS_CPP11
#endif
@@ -95,6 +99,10 @@
#define PN_CPP_HAS_CHRONO PN_CPP_HAS_CPP11
#endif
+#ifndef PN_CPP_HAS_RANDOM
+#define PN_CPP_HAS_RANDOM PN_CPP_HAS_CPP11
+#endif
+
#ifndef PN_CPP_HAS_STD_MUTEX
#define PN_CPP_HAS_STD_MUTEX PN_CPP_HAS_CPP11
#endif
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/reconnect_options.hpp b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
index e8ed02c..abbd517 100644
--- a/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
@@ -28,6 +28,7 @@
#include "./source.hpp"
#include <string>
+#include <vector>
namespace proton {
@@ -67,8 +68,8 @@ class reconnect_options {
/// Maximum reconnect attempts (default 0, meaning no limit)
PN_CPP_EXTERN reconnect_options& max_attempts(int);
- /// TODO: failover_urls
-
+ /// Alternative connection urls used for failover
+ PN_CPP_EXTERN reconnect_options& failover_urls(const std::vector<std::string>& urls);
private:
class impl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp
index 812d573..d076478 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -72,7 +72,8 @@ connection_context::connection_context() :
{}
reconnect_context::reconnect_context(const reconnect_options& ro, const connection_options& co) :
- reconnect_options_(new reconnect_options(ro)), connection_options_(new connection_options(co)), retries_(0)
+ reconnect_options_(new reconnect_options(ro)), connection_options_(new connection_options(co)),
+ retries_(0), current_url_(-1)
{}
listener_context::listener_context() : listen_handler_(0) {}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/include/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp
index 7920d70..7797b08 100644
--- a/proton-c/bindings/cpp/src/include/contexts.hpp
+++ b/proton-c/bindings/cpp/src/include/contexts.hpp
@@ -106,6 +106,7 @@ class reconnect_context {
internal::pn_unique_ptr<const connection_options> connection_options_;
duration delay_;
int retries_;
+ int current_url_;
};
class listener_context : public context {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
index fc90508..2aca82d 100644
--- a/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
@@ -24,6 +24,9 @@
#include "proton/duration.hpp"
+#include <string>
+#include <vector>
+
namespace proton {
class reconnect_options::impl {
@@ -34,6 +37,7 @@ class reconnect_options::impl {
float delay_multiplier;
duration max_delay;
int max_attempts;
+ std::vector<std::string> failover_urls;
};
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 4f90c68..b818f65 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -46,6 +46,13 @@
# include <thread>
#endif
+#if PN_CPP_HAS_RANDOM
+# include <random>
+#endif
+
+// XXXX: Debug
+//#include <iostream>
+
namespace proton {
class container::impl::common_work_queue : public work_queue::impl {
@@ -179,8 +186,8 @@ pn_connection_t* container::impl::make_connection_lh(
cc.container = &container_;
cc.handler = mh;
cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, pnc);
-
cc.connected_address_ = url;
+
setup_connection_lh(url, pnc);
make_wrapper(pnc).open(opts);
@@ -205,38 +212,70 @@ void container::impl::start_connection(const url& url, pn_connection_t *pnc) {
void container::impl::reconnect(pn_connection_t* pnc) {
connection_context& cc = connection_context::get(pnc);
- reconnect_context* rc = cc.reconnect_context_.get();
+ reconnect_context& rc = *cc.reconnect_context_.get();
+ const reconnect_options::impl& roi = *rc.reconnect_options_->impl_;
// Figure out next connection url to try
- const proton::url url(cc.connected_address_);
+ // rc.current_url_ == -1 means try the url specified in connect, not a failover url
+ const proton::url url(rc.current_url_==-1 ? cc.connected_address_ : roi.failover_urls[rc.current_url_]);
+
+ // XXXX Debug:
+ //std::cout << "Retries: " << rc.retries_ << " Delay: " << rc.delay_ << " Trying: " << url << "\n";
- cc.connected_address_ = url;
setup_connection_lh(url, pnc);
- { // Scope required to keep temporary destructor from doing pn_decref() after start_connection()
- make_wrapper(pnc).open(*rc->connection_options_);
+ make_wrapper(pnc).open(*rc.connection_options_);
+ start_connection(url, pnc);
+
+ // Did we go through all the urls?
+ if (rc.current_url_==int(roi.failover_urls.size())-1) {
+ rc.current_url_ = -1;
+ ++rc.retries_;
+ } else {
+ ++rc.current_url_;
}
- start_connection(cc.connected_address_, pnc);
- rc->retries_++;
+}
+
+namespace {
+#if PN_CPP_HAS_RANDOM && PN_CPP_HAS_THREAD_LOCAL
+duration random_between(duration min, duration max)
+{
+ static thread_local std::default_random_engine gen;
+ std::uniform_int_distribution<duration::numeric_type> dist{min.milliseconds(), max.milliseconds()};
+ return duration(dist(gen));
+}
+#else
+duration random_between(duration, duration max)
+{
+ return max;
+}
+#endif
}
duration container::impl::next_delay(reconnect_context& rc) {
// If we've not retried before do it immediately
if (rc.retries_==0) return duration(0);
+ // If we haven't tried all failover urls yet this round do it immediately
+ if (rc.current_url_!=-1) return duration(0);
+
const reconnect_options::impl& roi = *rc.reconnect_options_->impl_;
if (rc.retries_==1) {
rc.delay_ = roi.delay;
} else {
rc.delay_ = std::min(roi.max_delay, rc.delay_ * roi.delay_multiplier);
}
- return rc.delay_;
+ return random_between(roi.delay, rc.delay_);
}
void container::impl::reset_reconnect(pn_connection_t* pnc) {
connection_context& cc = connection_context::get(pnc);
reconnect_context* rc = cc.reconnect_context_.get();
- if (rc) rc->retries_ = 0;
+ if (!rc) return;
+
+ rc->delay_ = 0;
+ rc->retries_ = 0;
+ rc->current_url_ = -1;
}
bool container::impl::setup_reconnect(pn_connection_t* pnc) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/reconnect_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/reconnect_options.cpp b/proton-c/bindings/cpp/src/reconnect_options.cpp
index ef0d497..a469d3f 100644
--- a/proton-c/bindings/cpp/src/reconnect_options.cpp
+++ b/proton-c/bindings/cpp/src/reconnect_options.cpp
@@ -39,5 +39,6 @@ reconnect_options& reconnect_options::delay(duration d) { impl_->delay = d; retu
reconnect_options& reconnect_options::delay_multiplier(float f) { impl_->delay_multiplier = f; return *this; }
reconnect_options& reconnect_options::max_delay(duration d) { impl_->max_delay = d; return *this; }
reconnect_options& reconnect_options::max_attempts(int i) { impl_->max_attempts = i; return *this; }
+reconnect_options& reconnect_options::failover_urls(const std::vector<std::string>& urls) { impl_->failover_urls = urls; return *this; }
} // namespace proton
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org