You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/01 15:04:01 UTC
[33/50] qpid-proton git commit: PROTON-1557: c++ improve
multi-threaded clients
PROTON-1557: c++ improve multi-threaded clients
2 clients:
- multithreaded_client.cpp: simple send thread, receive thread, run thread
- multithreaded_client_flow_control: multi-connection, block for flow control
Changes:
- reduced needless diff between examples
- use separate work_queue* to clarify separate thread safety rules from sender
- took work_queue->add() out of lock to emphasize it is thread safe
- use fixed argument list, same arg order
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/398f786b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/398f786b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/398f786b
Branch: refs/heads/go1
Commit: 398f786ba94857b00d052c03232f8f9ef97f3f44
Parents: f1ee268
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Aug 28 11:53:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Aug 28 18:00:24 2017 -0400
----------------------------------------------------------------------
examples/cpp/CMakeLists.txt | 3 +-
examples/cpp/multithreaded_client.cpp | 185 ++++++++++++
.../cpp/multithreaded_client_flow_control.cpp | 287 +++++++++++++++++++
examples/cpp/send_recv_mt.cpp | 269 -----------------
4 files changed, 474 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/398f786b/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index df9f6a7..d116913 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -63,7 +63,8 @@ if(HAS_CPP11)
# Examples that require C++11
foreach(example
scheduled_send
- send_recv_mt
+ multithreaded_client
+ multithreaded_client_flow_control
)
add_executable(${example} ${example}.cpp)
endforeach()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/398f786b/examples/cpp/multithreaded_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client.cpp b/examples/cpp/multithreaded_client.cpp
new file mode 100644
index 0000000..955655c
--- /dev/null
+++ b/examples/cpp/multithreaded_client.cpp
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//
+// C++11 only
+//
+// A multi-threaded client that calls proton::container::run() in one thread, sends
+// messages in another and receives messages in a third.
+//
+// Note this client does not deal with flow-control. If the sender is faster
+// than the receiver, messages will build up in memory on the sending side.
+// See @ref multithreaded_client_flow_control.cpp for a more complex example with
+// flow control.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// Handler for a single thread-safe sending and receiving connection.
+class client : public proton::messaging_handler {
+ // Invariant
+ const std::string url_;
+ const std::string address_;
+
+ // Only used in proton handler thread
+ proton::sender sender_;
+
+ // Shared by proton and user threads, protected by lock_
+ std::mutex lock_;
+ proton::work_queue *work_queue_;
+ std::condition_variable sender_ready_;
+ std::queue<proton::message> messages_;
+ std::condition_variable messages_ready_;
+
+ public:
+ client(const std::string& url, const std::string& address) : url_(url), address_(address) {}
+
+ // Thread safe
+ void send(const proton::message& msg) {
+ // Use [=] to copy the message, we cannot pass it by reference since it
+ // will be used in another thread.
+ work_queue()->add([=]() { sender_.send(msg); });
+ }
+
+ // Thread safe
+ proton::message receive() {
+ std::unique_lock<std::mutex> l(lock_);
+ while (messages_.empty()) messages_ready_.wait(l);
+ auto msg = std::move(messages_.front());
+ messages_.pop();
+ return msg;
+ }
+
+ // Thread safe
+ void close() {
+ work_queue()->add([=]() { sender_.connection().close(); });
+ }
+
+ private:
+
+ proton::work_queue* work_queue() {
+ // Wait till work_queue_ and sender_ are initialized.
+ std::unique_lock<std::mutex> l(lock_);
+ while (!work_queue_) sender_ready_.wait(l);
+ return work_queue_;
+ }
+
+ // == messaging_handler overrides, only called in proton hander thread
+
+ // Note: this example creates a connection when the container starts.
+ // To create connections after the container has started, use
+ // container::connect().
+ // See @ref multithreaded_client_flow_control.cpp for an example.
+ void on_container_start(proton::container& cont) override {
+ cont.connect(url_);
+ }
+
+ void on_connection_open(proton::connection& conn) override {
+ conn.open_sender(address_);
+ conn.open_receiver(address_);
+ }
+
+ void on_sender_open(proton::sender& s) override {
+ {
+ // sender_ and work_queue_ must be set atomically
+ std::lock_guard<std::mutex> l(lock_);
+ sender_ = s;
+ work_queue_ = &s.work_queue();
+ }
+ sender_ready_.notify_all();
+ }
+
+ void on_message(proton::delivery& dlv, proton::message& msg) override {
+ {
+ std::lock_guard<std::mutex> l(lock_);
+ messages_.push(msg);
+ }
+ messages_ready_.notify_all();
+ }
+
+ void on_error(const proton::error_condition& e) override {
+ std::cerr << "unexpected error: " << e << std::endl;
+ exit(1);
+ }
+};
+
+int main(int argc, const char** argv) {
+ try {
+ if (argc != 4) {
+ std ::cerr <<
+ "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
+ "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+ "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+ "MESSAGE-COUNT: number of messages to send\n";
+ return 1;
+ }
+ const char *url = argv[1];
+ const char *address = argv[2];
+ int n_messages = atoi(argv[3]);
+
+ client cl(url, address);
+ proton::container container(cl);
+ std::thread container_thread([&]() { container.run(); });
+
+ std::thread sender([&]() {
+ for (int i = 0; i < n_messages; ++i) {
+ proton::message msg(std::to_string(i + 1));
+ cl.send(msg);
+ std::cout << "sent: " << msg.body() << std::endl;
+ }
+ });
+
+ int received = 0;
+ std::thread receiver([&]() {
+ for (int i = 0; i < n_messages; ++i) {
+ auto msg = cl.receive();
+ std::cout << "received: " << msg.body() << std::endl;
+ ++received;
+ }
+ });
+
+ sender.join();
+ receiver.join();
+ cl.close();
+ container_thread.join();
+ std::cout << "received " << received << " messages" << std::endl;
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/398f786b/examples/cpp/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client_flow_control.cpp b/examples/cpp/multithreaded_client_flow_control.cpp
new file mode 100644
index 0000000..9eec782
--- /dev/null
+++ b/examples/cpp/multithreaded_client_flow_control.cpp
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// C++11 only
+//
+// A multi-threaded client that sends and receives messages from multiple AMQP
+// addresses.
+//
+// Demonstrates how to:
+//
+// - implement proton handlers that interact with user threads safely
+// - block sender threads to respect AMQP flow control
+// - use AMQP flow control to limit message buffering for receivers threads
+//
+// We define sender and receiver classes with simple, thread-safe blocking
+// send() and receive() functions.
+//
+// These classes are also privately proton::message_handler instances. They use
+// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
+// etc.) to pass messages between user and proton::container threads.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// A thread-safe sending connection that blocks sending threads when there
+// is no AMQP credit to send messages.
+class sender : private proton::messaging_handler {
+ // Only used in proton handler thread
+ proton::sender sender_;
+
+ // Shared by proton and user threads, protected by lock_
+ std::mutex lock_;
+ proton::work_queue *work_queue_;
+ std::condition_variable sender_ready_;
+ int queued_; // Queued messages waiting to be sent
+ int credit_; // AMQP credit - number of messages we can send
+
+ public:
+ sender(proton::container& cont, const std::string& url, const std::string& address)
+ : work_queue_(0), queued_(0), credit_(0)
+ {
+ cont.open_sender(url+"/"+address, proton::connection_options().handler(*this));
+ }
+
+ // Thread safe
+ void send(const proton::message& m) {
+ {
+ std::unique_lock<std::mutex> l(lock_);
+ // Don't queue up more messages than we have credit for
+ while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l);
+ ++queued_;
+ }
+ work_queue_->add([=]() { this->do_send(m); }); // work_queue_ is thread safe
+ }
+
+ // Thread safe
+ void close() {
+ work_queue()->add([=]() { sender_.connection().close(); });
+ }
+
+ private:
+
+ proton::work_queue* work_queue() {
+ // Wait till work_queue_ and sender_ are initialized.
+ std::unique_lock<std::mutex> l(lock_);
+ while (!work_queue_) sender_ready_.wait(l);
+ return work_queue_;
+ }
+
+ // == messaging_handler overrides, only called in proton hander thread
+
+ void on_sender_open(proton::sender& s) override {
+ // Make sure sender_ and work_queue_ are set atomically
+ std::lock_guard<std::mutex> l(lock_);
+ sender_ = s;
+ work_queue_ = &s.work_queue();
+ }
+
+ void on_sendable(proton::sender& s) override {
+ std::lock_guard<std::mutex> l(lock_);
+ credit_ = s.credit();
+ sender_ready_.notify_all(); // Notify senders we have credit
+ }
+
+ // work_queue work items is are automatically dequeued and called by proton
+ // This function is called because it was queued by send()
+ void do_send(const proton::message& m) {
+ sender_.send(m);
+ std::lock_guard<std::mutex> l(lock_);
+ --queued_; // work item was consumed from the work_queue
+ credit_ = sender_.credit(); // update credit
+ sender_ready_.notify_all(); // Notify senders we have space on queue
+ }
+
+ void on_error(const proton::error_condition& e) override {
+ std::cerr << "unexpected error: " << e << std::endl;
+ exit(1);
+ }
+};
+
+// A thread safe receiving connection that blocks receiving threads when there
+// are no messages available, and maintains a bounded buffer of incoming
+// messages by issuing AMQP credit only when there is space in the buffer.
+class receiver : private proton::messaging_handler {
+ static const size_t MAX_BUFFER = 100; // Max number of buffered messages
+
+ // Used in proton threads only
+ proton::receiver receiver_;
+
+ // Used in proton and user threads, protected by lock_
+ std::mutex lock_;
+ proton::work_queue* work_queue_;
+ std::queue<proton::message> buffer_; // Messages not yet returned by receive()
+ std::condition_variable can_receive_; // Notify receivers of messages
+
+ public:
+
+ // Connect to url
+ receiver(proton::container& cont, const std::string& url, const std::string& address)
+ : work_queue_()
+ {
+ // NOTE:credit_window(0) disables automatic flow control.
+ // We will use flow control to match AMQP credit to buffer capacity.
+ cont.open_receiver(url+"/"+address, proton::receiver_options().credit_window(0),
+ proton::connection_options().handler(*this));
+ }
+
+ // Thread safe receive
+ proton::message receive() {
+ std::unique_lock<std::mutex> l(lock_);
+ // Wait for buffered messages
+ while (!work_queue_ || buffer_.empty())
+ can_receive_.wait(l);
+ proton::message m = std::move(buffer_.front());
+ buffer_.pop();
+ // Add a lambda to the work queue to call receive_done().
+ // This will tell the handler to add more credit.
+ work_queue_->add([=]() { this->receive_done(); });
+ return m;
+ }
+
+ void close() {
+ std::lock_guard<std::mutex> l(lock_);
+ if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); });
+ }
+
+ private:
+ // ==== The following are called by proton threads only.
+
+ void on_receiver_open(proton::receiver& r) override {
+ receiver_ = r;
+ std::lock_guard<std::mutex> l(lock_);
+ work_queue_ = &receiver_.work_queue();
+ receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
+ }
+
+ void on_message(proton::delivery &d, proton::message &m) override {
+ // Proton automatically reduces credit by 1 before calling on_message
+ std::lock_guard<std::mutex> l(lock_);
+ buffer_.push(m);
+ can_receive_.notify_all();
+ }
+
+ // called via work_queue
+ void receive_done() {
+ // Add 1 credit, a receiver has taken a message out of the buffer.
+ receiver_.add_credit(1);
+ }
+
+ void on_error(const proton::error_condition& e) override {
+ std::cerr << "unexpected error: " << e << std::endl;
+ exit(1);
+ }
+};
+
+// ==== Example code using the sender and receiver
+
+// Send n messages
+void send_thread(sender& s, int n, bool print) {
+ auto id = std::this_thread::get_id();
+ for (int i = 0; i < n; ++i) {
+ std::ostringstream ss;
+ ss << std::this_thread::get_id() << ":" << i;
+ s.send(proton::message(ss.str()));
+ if (print) std::cout << "received: " << ss.str() << std::endl;
+ }
+ std::cout << id << " sent " << n << std::endl;
+}
+
+// Receive messages till atomic remaining count is 0.
+// remaining is shared among all receiving threads
+void receive_thread(receiver& r, std::atomic_int& remaining, bool print) {
+ auto id = std::this_thread::get_id();
+ int n = 0;
+ while (remaining-- > 0) {
+ auto m = r.receive();
+ ++n;
+ if (print) std::cout << id << "received: " << m.body() << std::endl;
+ }
+ std::cout << id << " received " << n << " messages" << std::endl;
+}
+
+int main(int argc, const char **argv) {
+ try {
+ if (argc != 5) {
+ std::cerr <<
+ "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n"
+ "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+ "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+ "MESSAGE-COUNT: number of messages to send\n"
+ "THREAD-COUNT: number of sender/receiver thread pairs\n";
+ return 1;
+ }
+
+ const char *url = argv[1];
+ const char *address = argv[2];
+ int n_messages = atoi(argv[3]);
+ int n_threads = atoi(argv[4]);
+
+ // Total messages to be received, multiple receiver threads will decrement this.
+ std::atomic_int remaining(n_messages * n_threads);
+ bool print = remaining < 1000; // Don't print for long runs, dominates run time
+
+ // Run the proton container
+ proton::container container;
+ auto container_thread = std::thread([&]() { container.run(); });
+
+ // A single sender and receiver to be shared by all the threads
+ sender send(container, url, address);
+ receiver recv(container, url, address);
+
+ // Start receiver threads, then sender threads.
+ // Starting receivers first gives all receivers a chance to compete for messages.
+ std::vector<std::thread> threads;
+ for (int i = 0; i < n_threads; ++i)
+ threads.push_back(std::thread([&]() { receive_thread(recv, remaining, print); }));
+ for (int i = 0; i < n_threads; ++i)
+ threads.push_back(std::thread([&]() { send_thread(send, n_messages, print); }));
+
+ // Wait for threads to finish
+ for (auto& t : threads)
+ t.join();
+ send.close();
+ recv.close();
+
+ container_thread.join();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/398f786b/examples/cpp/send_recv_mt.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/send_recv_mt.cpp b/examples/cpp/send_recv_mt.cpp
deleted file mode 100644
index addcbaf..0000000
--- a/examples/cpp/send_recv_mt.cpp
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// C++11 only
-//
-// A multi-threaded client that sends and receives messages from multiple AMQP
-// addresses.
-//
-// Demonstrates how to:
-//
-// - implement proton handlers that interact with user threads safely
-// - block user threads calling send() to respect AMQP flow control
-// - use AMQP flow control to limit message buffering for receivers
-//
-// We define mt_sender and mt_receiver classes with simple, thread-safe blocking
-// send() and receive() functions.
-//
-// These classes are also privately proton::message_handler instances. They use
-// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
-// etc.) to pass messages between user and proton::container threads.
-//
-// NOTE: no proper error handling
-
-#include <proton/connection.hpp>
-#include <proton/connection_options.hpp>
-#include <proton/container.hpp>
-#include <proton/message.hpp>
-#include <proton/messaging_handler.hpp>
-#include <proton/receiver_options.hpp>
-#include <proton/sender.hpp>
-#include <proton/work_queue.hpp>
-
-#include <atomic>
-#include <condition_variable>
-#include <iostream>
-#include <mutex>
-#include <queue>
-#include <sstream>
-#include <thread>
-
-// Lock to serialize std::cout, std::cerr used from multiple threads.
-std::mutex out_lock;
-#define LOCK(EXPR) do { std::lock_guard<std::mutex> l(out_lock); EXPR; } while(0)
-#define COUT(EXPR) do { LOCK(std::cout << EXPR); } while(0)
-#define CERR(EXPR) do { LOCK(std::cerr << EXPR); } while(0)
-
-// A thread-safe sending connection.
-class mt_sender : private proton::messaging_handler {
- // Only used in proton thread
- proton::sender sender_;
-
- // Shared by proton and user threads, use lock_ to protect.
- std::mutex lock_;
- proton::work_queue* work_queue_; // Messages waiting to be sent
- std::condition_variable can_send_; // Signal sending threads
- int queued_; // Queued messages waiting to be sent
- int credit_; // AMQP credit - number of messages we can send
-
- public:
- // Connect to url
- mt_sender(proton::container& cont, const std::string& url) :
- work_queue_(0), queued_(0), credit_(0)
- {
- // Pass *this as handler.
- cont.open_sender(url, proton::connection_options().handler(*this));
- }
-
- // Thread safe send()
- void send(const proton::message& m) {
- std::unique_lock<std::mutex> l(lock_);
- // Don't queue up more messages than we have credit for
- while (!(work_queue_ && queued_ < credit_))
- can_send_.wait(l);
- ++queued_;
- // Add a lambda function to the work queue.
- // This will call do_send() with a copy of m in the correct proton thread.
- work_queue_->add([=]() { this->do_send(m); });
- }
-
- void close() {
- std::lock_guard<std::mutex> l(lock_);
- if (work_queue_)
- work_queue_->add([this]() { this->sender_.connection().close(); });
- }
-
- private:
- // ==== called by proton threads only
-
- void on_sender_open(proton::sender& s) override {
- sender_ = s;
- std::lock_guard<std::mutex> l(lock_);
- work_queue_ = &s.work_queue();
- }
-
- void on_sendable(proton::sender& s) override {
- std::lock_guard<std::mutex> l(lock_);
- credit_ = s.credit();
- can_send_.notify_all(); // Notify senders we have credit
- }
-
- // work_queue work items is are automatically dequeued and called by proton
- // This function is called because it was queued by send()
- void do_send(const proton::message& m) {
- sender_.send(m);
- std::lock_guard<std::mutex> l(lock_);
- --queued_; // work item was consumed from the work_queue
- credit_ = sender_.credit(); // update credit
- can_send_.notify_all(); // Notify senders we have space on queue
- }
-
- void on_error(const proton::error_condition& e) override {
- CERR("unexpected error: " << e << std::endl);
- exit(1);
- }
-};
-
-// A thread safe receiving connection.
-class mt_receiver : private proton::messaging_handler {
- static const size_t MAX_BUFFER = 100; // Max number of buffered messages
-
- // Used in proton threads only
- proton::receiver receiver_;
-
- // Used in proton and user threads, protected by lock_
- std::mutex lock_;
- proton::work_queue* work_queue_;
- std::queue<proton::message> buffer_; // Messages not yet returned by receive()
- std::condition_variable can_receive_; // Notify receivers of messages
-
- public:
-
- // Connect to url
- mt_receiver(proton::container& cont, const std::string& url) : work_queue_()
- {
- // NOTE:credit_window(0) disables automatic flow control.
- // We will use flow control to match AMQP credit to buffer capacity.
- cont.open_receiver(url, proton::receiver_options().credit_window(0),
- proton::connection_options().handler(*this));
- }
-
- // Thread safe receive
- proton::message receive() {
- std::unique_lock<std::mutex> l(lock_);
- // Wait for buffered messages
- while (!work_queue_ || buffer_.empty())
- can_receive_.wait(l);
- proton::message m = std::move(buffer_.front());
- buffer_.pop();
- // Add a lambda to the work queue to call receive_done().
- // This will tell the handler to add more credit.
- work_queue_->add([=]() { this->receive_done(); });
- return m;
- }
-
- void close() {
- std::lock_guard<std::mutex> l(lock_);
- if (work_queue_)
- work_queue_->add([this]() { this->receiver_.connection().close(); });
- }
-
- private:
- // ==== The following are called by proton threads only.
-
- void on_receiver_open(proton::receiver& r) override {
- receiver_ = r;
- std::lock_guard<std::mutex> l(lock_);
- work_queue_ = &receiver_.work_queue();
- receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
- }
-
- void on_message(proton::delivery &d, proton::message &m) override {
- // Proton automatically reduces credit by 1 before calling on_message
- std::lock_guard<std::mutex> l(lock_);
- buffer_.push(m);
- can_receive_.notify_all();
- }
-
- // called via work_queue
- void receive_done() {
- // Add 1 credit, a receiver has taken a message out of the buffer.
- receiver_.add_credit(1);
- }
-
- void on_error(const proton::error_condition& e) override {
- CERR("unexpected error: " << e << std::endl);
- exit(1);
- }
-};
-
-// ==== Example code using the mt_sender and mt_receiver
-
-// Send n messages
-void send_thread(mt_sender& s, int n) {
- for (int i = 0; i < n; ++i) {
- std::ostringstream o;
- o << std::this_thread::get_id() << ":" << i;
- s.send(proton::message(o.str()));
- }
- COUT(std::this_thread::get_id() << " sent " << n << std::endl);
-}
-
-// Receive messages till atomic remaining count is 0.
-// remaining is shared among all receiving threads
-void receive_thread(mt_receiver& r, std::atomic_int& remaining, bool print) {
- auto id = std::this_thread::get_id();
- int n = 0;
- while (remaining-- > 0) {
- auto m = r.receive();
- ++n;
- if (print)
- COUT(id << " received \"" << m.body() << '"' << std::endl);
- }
- COUT(id << " received " << n << " messages" << std::endl);
-}
-
-int main(int argc, const char **argv) {
- try {
- int n_threads = argc > 1 ? atoi(argv[1]) : 2;
- int n_messages = argc > 2 ? atoi(argv[2]) : 10;
- const char *url = argc > 3 ? argv[3] : "amqp://127.0.0.1/examples";
- std::atomic_int remaining(n_messages * n_threads); // Total messages to be received
- bool print = (remaining <= 30); // Print messages for short runs only
-
- // Run the proton container
- proton::container container;
- auto container_thread = std::thread([&]() { container.run(); });
-
- // A single sender and receiver to be shared by all the threads
- mt_sender sender(container, url);
- mt_receiver receiver(container, url);
-
- // Start receiver threads, then sender threads.
- // Starting receivers first gives all receivers a chance to compete for messages.
- std::vector<std::thread> threads;
- for (int i = 0; i < n_threads; ++i)
- threads.push_back(std::thread([&]() { receive_thread(receiver, remaining, print); }));
- for (int i = 0; i < n_threads; ++i)
- threads.push_back(std::thread([&]() { send_thread(sender, n_messages); }));
-
- // Wait for threads to finish
- for (auto& n_messages_threads : threads)
- n_messages_threads.join();
- sender.close();
- receiver.close();
-
- container_thread.join();
-
- return 0;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org