You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/01 15:03:59 UTC

[31/50] qpid-proton git commit: PROTON-1557: c++ example of multi-threaded sending/receivng client

PROTON-1557: c++ example of multi-threaded sending/receivng client

Also some updates/corrections to documentation.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ed756d8f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ed756d8f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ed756d8f

Branch: refs/heads/go1
Commit: ed756d8f3582c7e06b84ce0761ee84b14d9c5f0a
Parents: 929681a
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Aug 25 18:12:04 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Aug 25 21:55:45 2017 -0400

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |   6 +-
 examples/cpp/README.dox                         |  45 +---
 examples/cpp/send_recv_mt.cpp                   | 269 +++++++++++++++++++
 examples/cpp/tutorial.dox                       |   9 +-
 proton-c/bindings/cpp/docs/io.md                |   7 -
 proton-c/bindings/cpp/docs/mt.md                |  34 +--
 .../bindings/cpp/include/proton/container.hpp   |   1 +
 7 files changed, 300 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ed756d8f/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 7da4834..df9f6a7 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -60,9 +60,11 @@ foreach(example
 endforeach()
 
 if(HAS_CPP11)
-  # Single-threaded examples that require C++11
+  # Examples that require C++11
   foreach(example
-      scheduled_send)
+      scheduled_send
+      send_recv_mt
+      )
     add_executable(${example} ${example}.cpp)
   endforeach()
 endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ed756d8f/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 327dbef..551efde 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -108,40 +108,11 @@ alternatives.
 
 */
 
-/** @example broker.hpp
-
-Common logic for a simple "mini broker" that creates creates queues
-automatically when a client tries to send or subscribe. This file contains
-the `queue` class that queues messages and the `broker_handler` class
-that manages queues and links and transfers messages to/from clients.
-
-*/
-
 /** @example broker.cpp
 
-A simple, single-threaded broker using the `proton::container`. You can use this
-to run other examples that reqiure an intermediary, or you can use any AMQP 1.0
-broker. This broker creates queues automatically when a client tries to send or
-subscribe.
-
-*/
-
-/** @example mt/epoll_container.cpp
-
-An example implementation of the proton::container API that shows how
-to use the proton::io::connection_driver SPI to adapt the proton API
-to native IO, in this case using a multithreaded Linux epoll poller as
-the implementation.
-
-__Requires C++11__
-
-*/
-
-/** @example mt/broker.cpp
-
-A multithreaded broker, that will work on any multi-threaded container. See @ref mt/epoll_container.cpp for an example of a multi-threaded container.
-
-__Requires C++11__
+A broker using the `proton::container`. You can use this to run other examples
+that reqiure an intermediary, or you can use any AMQP 1.0 broker. This broker
+creates queues automatically when a client tries to send or subscribe.
 
 */
 
@@ -167,3 +138,13 @@ A working example for accessing Service Bus session-enabled queues.
 Also provides some general notes on Service Bus usage.
 
 */
+
+/** @example send_recv_mt.cpp
+
+A multi-threaded sender and receiver.
+
+__Requires C++11__
+
+*/
+
+*/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ed756d8f/examples/cpp/send_recv_mt.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/send_recv_mt.cpp b/examples/cpp/send_recv_mt.cpp
new file mode 100644
index 0000000..addcbaf
--- /dev/null
+++ b/examples/cpp/send_recv_mt.cpp
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// C++11 only
+//
+// A multi-threaded client that sends and receives messages from multiple AMQP
+// addresses.
+//
+// Demonstrates how to:
+//
+// - implement proton handlers that interact with user threads safely
+// - block user threads calling send() to respect AMQP flow control
+// - use AMQP flow control to limit message buffering for receivers
+//
+// We define mt_sender and mt_receiver classes with simple, thread-safe blocking
+// send() and receive() functions.
+//
+// These classes are also privately proton::message_handler instances. They use
+// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
+// etc.) to pass messages between user and proton::container threads.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <thread>
+
+// Lock to serialize std::cout, std::cerr used from multiple threads.
+std::mutex out_lock;
+#define LOCK(EXPR) do { std::lock_guard<std::mutex> l(out_lock); EXPR; } while(0)
+#define COUT(EXPR) do { LOCK(std::cout << EXPR); } while(0)
+#define CERR(EXPR) do { LOCK(std::cerr << EXPR); } while(0)
+
+// A thread-safe sending connection.
+class mt_sender : private proton::messaging_handler {
+    // Only used in proton thread
+    proton::sender sender_;
+
+    // Shared by proton and user threads, use lock_ to protect.
+    std::mutex lock_;
+    proton::work_queue* work_queue_;   // Messages waiting to be sent
+    std::condition_variable can_send_; // Signal sending threads
+    int queued_;                       // Queued messages waiting to be sent
+    int credit_;                       // AMQP credit - number of messages we can send
+
+  public:
+    // Connect to url
+    mt_sender(proton::container& cont, const std::string& url) :
+        work_queue_(0), queued_(0), credit_(0)
+    {
+        // Pass *this as handler.
+        cont.open_sender(url, proton::connection_options().handler(*this));
+    }
+
+    // Thread safe send()
+    void send(const proton::message& m) {
+        std::unique_lock<std::mutex> l(lock_);
+        // Don't queue up more messages than we have credit for
+        while (!(work_queue_ && queued_ < credit_))
+            can_send_.wait(l);
+        ++queued_;
+        // Add a lambda function to the work queue.
+        // This will call do_send() with a copy of m in the correct proton thread.
+        work_queue_->add([=]() { this->do_send(m); });
+    }
+
+    void close() {
+        std::lock_guard<std::mutex> l(lock_);
+        if (work_queue_)
+            work_queue_->add([this]() { this->sender_.connection().close(); });
+    }
+
+  private:
+    // ==== called by proton threads only
+
+    void on_sender_open(proton::sender& s) override {
+        sender_ = s;
+        std::lock_guard<std::mutex> l(lock_);
+        work_queue_ = &s.work_queue();
+    }
+
+    void on_sendable(proton::sender& s) override {
+        std::lock_guard<std::mutex> l(lock_);
+        credit_ = s.credit();
+        can_send_.notify_all(); // Notify senders we have credit
+    }
+
+    // work_queue work items is are automatically dequeued and called by proton
+    // This function is called because it was queued by send()
+    void do_send(const proton::message& m) {
+        sender_.send(m);
+        std::lock_guard<std::mutex> l(lock_);
+        --queued_;                    // work item was consumed from the work_queue
+        credit_ = sender_.credit();   // update credit
+        can_send_.notify_all();       // Notify senders we have space on queue
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        CERR("unexpected error: " << e << std::endl);
+        exit(1);
+    }
+};
+
+// A thread safe receiving connection.
+class mt_receiver : private proton::messaging_handler {
+    static const size_t MAX_BUFFER = 100; // Max number of buffered messages
+
+    // Used in proton threads only
+    proton::receiver receiver_;
+
+    // Used in proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue* work_queue_;
+    std::queue<proton::message> buffer_; // Messages not yet returned by receive()
+    std::condition_variable can_receive_; // Notify receivers of messages
+
+  public:
+
+    // Connect to url
+    mt_receiver(proton::container& cont, const std::string& url) : work_queue_()
+    {
+        // NOTE:credit_window(0) disables automatic flow control.
+        // We will use flow control to match AMQP credit to buffer capacity.
+        cont.open_receiver(url, proton::receiver_options().credit_window(0),
+                           proton::connection_options().handler(*this));
+    }
+
+    // Thread safe receive
+    proton::message receive() {
+        std::unique_lock<std::mutex> l(lock_);
+        // Wait for buffered messages
+        while (!work_queue_ || buffer_.empty())
+            can_receive_.wait(l);
+        proton::message m = std::move(buffer_.front());
+        buffer_.pop();
+        // Add a lambda to the work queue to call receive_done().
+        // This will tell the handler to add more credit.
+        work_queue_->add([=]() { this->receive_done(); });
+        return m;
+    }
+
+    void close() {
+        std::lock_guard<std::mutex> l(lock_);
+        if (work_queue_)
+            work_queue_->add([this]() { this->receiver_.connection().close(); });
+    }
+
+  private:
+    // ==== The following are called by proton threads only.
+
+    void on_receiver_open(proton::receiver& r) override {
+        receiver_ = r;
+        std::lock_guard<std::mutex> l(lock_);
+        work_queue_ = &receiver_.work_queue();
+        receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
+    }
+
+    void on_message(proton::delivery &d, proton::message &m) override {
+        // Proton automatically reduces credit by 1 before calling on_message
+        std::lock_guard<std::mutex> l(lock_);
+        buffer_.push(m);
+        can_receive_.notify_all();
+    }
+
+    // called via work_queue
+    void receive_done() {
+        // Add 1 credit, a receiver has taken a message out of the buffer.
+        receiver_.add_credit(1);
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        CERR("unexpected error: " << e << std::endl);
+        exit(1);
+    }
+};
+
+// ==== Example code using the mt_sender and mt_receiver
+
+// Send n messages
+void send_thread(mt_sender& s, int n) {
+    for (int i = 0; i < n; ++i) {
+        std::ostringstream o;
+        o << std::this_thread::get_id() << ":" << i;
+        s.send(proton::message(o.str()));
+    }
+    COUT(std::this_thread::get_id() << " sent " << n << std::endl);
+}
+
+// Receive messages till atomic remaining count is 0.
+// remaining is shared among all receiving threads
+void receive_thread(mt_receiver& r, std::atomic_int& remaining, bool print) {
+    auto id = std::this_thread::get_id();
+    int n = 0;
+    while (remaining-- > 0) {
+        auto m = r.receive();
+        ++n;
+        if (print)
+            COUT(id << " received \"" << m.body() << '"' << std::endl);
+    }
+    COUT(id << " received " << n << " messages" << std::endl);
+}
+
+int main(int argc, const char **argv) {
+    try {
+        int n_threads = argc > 1 ? atoi(argv[1]) : 2;
+        int n_messages = argc > 2 ? atoi(argv[2]) : 10;
+        const char *url =  argc > 3 ? argv[3] : "amqp://127.0.0.1/examples";
+        std::atomic_int remaining(n_messages * n_threads); // Total messages to be received
+        bool print = (remaining <= 30); // Print messages for short runs only
+
+        // Run the proton container
+        proton::container container;
+        auto container_thread = std::thread([&]() { container.run(); });
+
+        // A single sender and receiver to be shared by all the threads
+        mt_sender sender(container, url);
+        mt_receiver receiver(container, url);
+
+        // Start receiver threads, then sender threads.
+        // Starting receivers first gives all receivers a chance to compete for messages.
+        std::vector<std::thread> threads;
+        for (int i = 0; i < n_threads; ++i)
+            threads.push_back(std::thread([&]() { receive_thread(receiver, remaining, print); }));
+        for (int i = 0; i < n_threads; ++i)
+            threads.push_back(std::thread([&]() { send_thread(sender, n_messages); }));
+
+        // Wait for threads to finish
+        for (auto& n_messages_threads : threads)
+            n_messages_threads.join();
+        sender.close();
+        receiver.close();
+
+        container_thread.join();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ed756d8f/examples/cpp/tutorial.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/tutorial.dox b/examples/cpp/tutorial.dox
index 56345a1..fef08be 100644
--- a/examples/cpp/tutorial.dox
+++ b/examples/cpp/tutorial.dox
@@ -21,11 +21,10 @@ The examples below show how to implement handlers for clients and
 servers and how to run them using the `proton::default_container`, a
 portable, easy-to-use way to build single-threaded clients or servers.
 
-Some of the examples require an AMQP *broker* that can receive, store,
-and send messages. @ref broker.hpp and @ref broker.cpp define a simple
-example broker. If run without arguments, it listens on
-`0.0.0.0:5672`, the standard AMQP port on all network interfaces. To
-use a different port or network interface:
+Some of the examples require an AMQP *broker* that can receive, store, and send
+messages. @ref broker.cpp define a simple example broker. If run without
+arguments, it listens on `0.0.0.0:5672`, the standard AMQP port on all network
+interfaces. To use a different port or network interface:
 
     broker -a <host>:<port>
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ed756d8f/proton-c/bindings/cpp/docs/io.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/io.md b/proton-c/bindings/cpp/docs/io.md
index 230e538..c0f7b02 100644
--- a/proton-c/bindings/cpp/docs/io.md
+++ b/proton-c/bindings/cpp/docs/io.md
@@ -16,12 +16,5 @@ The connection driver is deliberately very simple and low level. It
 performs no IO of its own, no thread-related locking, and is written
 in simple C++98-compatible code.
 
-The connection dirver can be used standalone as an AMQP translator, or
-you can implement the following two interfaces to provide a complete
-implementation of the Proton API that can run any Proton application:
 
- - `proton::container` lets the user initiate or listen for connections.
- - `proton::event_loop` lets the user serialize work with respect to a
-   connection.
 
-@see @ref mt/epoll\_container.cpp for an example.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ed756d8f/proton-c/bindings/cpp/docs/mt.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mt.md b/proton-c/bindings/cpp/docs/mt.md
index 1a63ea6..e316f81 100644
--- a/proton-c/bindings/cpp/docs/mt.md
+++ b/proton-c/bindings/cpp/docs/mt.md
@@ -1,6 +1,6 @@
 # Multithreaded Proton applications {#mt_page}
 
-For an example see @ref mt/broker.cpp
+For examples see @ref broker.cpp and @ref send_recv_mt.cpp
 
 Most classes in namespace @ref proton are not thread-safe. Objects
 belonging to a single connection (`proton::connection`,
@@ -10,35 +10,19 @@ used concurrently in separate threads.
 
 A multithreaded container calls event-handling functions for each
 connection *sequentially* but can process *different* connections
-concurrently in different threads. If you use a *separate*
+concurrently in different threads. If you use a separate
 `proton::messaging_handler` for each connection, then event-handling
 functions can can use their parameters and the handler's own data
 members without locks. The handler functions will never be called
 concurrently. You can set the handlers for each connection using
 `proton::container::connect()` and `proton::container::listen()`.
 
-The example @ref mt/broker.cpp is a multithreaded broker using this
-approach.  It creates a new handler for each incoming connection to
-manage the state of that connection's `proton::sender` and
-`proton::receiver` links. The handler needs no lock because it only
-deals with state in the context of one connection.
+The example @ref broker.cpp is a broker that can be run in single or
+multi-threaded mode.  It creates a new handler for each incoming
+connection to manage the state of that connection's `proton::sender` and
+`proton::receiver` links. The handler needs no lock because it only deals with
+state in the context of one connection.
 
-The `proton::event_loop` class represents the sequence of events
-associated with a connection.  `proton::event_loop::inject()` allows
-another thread to "inject" work to be executed in sequence with the
-rest of the events so it can operate safely on data associated with
-the connection.
+The example @ref send_recv_mt.cpp shows how application threads can
+communicate safely with proton handler threads.
 
-In the @ref mt/broker.cpp example, a queue can receive messages from
-one connection but have subscribers on another connection. Subscribers
-pass a function object to the queue which uses
-`proton::event_loop::inject()` to call a notification callback on the
-handler for that connection. The callback is executed in the
-connection's event loop so it can use a `proton::sender` object to
-send the message safely.
-
-*Note*: It is possible to share a single handler between more than one
-connection.  In that case it *can* be called concurrently on behalf of
-different connections, so you will need suitable locking.
-
-@see @ref io_page - Implementing your own container.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ed756d8f/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 8b517bf..859d70c 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -23,6 +23,7 @@
  */
 
 #include "./fwd.hpp"
+#include "./thread_safe.hpp"
 #include "./types_fwd.hpp"
 
 #include "./internal/config.hpp"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org