You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/04/27 16:54:59 UTC
[3/3] qpid-proton git commit: PROTON-1046: C++ multi-threaded
controller and improved broker example
PROTON-1046: C++ multi-threaded controller and improved broker example
A complete portable multi-threaded API for proton that can be implemented on an
threading/IO platform.
API:
- proton::controller: A multi-threaded alternative to the proton::container.
- proton::work_queue: async functions serialized per-connection.
Examples:
- mt/epoll_controller.hpp: controller/work_queue implemented using native Linux epoll.
- mt/broker.cpp: multi-threaded broker, portable over any controller implementation.
- illustrates multi-threading, use of work_queue, remote shutdown
TODO:
- Examples and implementations for non-Linux platforms.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/deccf354
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/deccf354
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/deccf354
Branch: refs/heads/master
Commit: deccf354a653e2106f40cdd59df9b67b74911e8b
Parents: b53a684
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 31 17:12:18 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Apr 27 10:39:59 2016 -0400
----------------------------------------------------------------------
config.sh.in | 2 +-
examples/cpp/CMakeLists.txt | 25 +-
examples/cpp/README.dox | 99 ++--
examples/cpp/broker.cpp | 4 +-
examples/cpp/client.cpp | 4 +-
examples/cpp/direct_recv.cpp | 4 +-
examples/cpp/direct_send.cpp | 4 +-
examples/cpp/engine/CMakeLists.txt | 37 --
examples/cpp/engine/broker.cpp | 176 -------
examples/cpp/engine/client.cpp | 103 ----
examples/cpp/engine/direct_recv.cpp | 79 ---
examples/cpp/engine/direct_send.cpp | 91 ----
examples/cpp/engine/helloworld.cpp | 68 ---
examples/cpp/engine/options.hpp | 173 -------
examples/cpp/engine/server.cpp | 90 ----
examples/cpp/engine/simple_recv.cpp | 85 ---
examples/cpp/engine/simple_send.cpp | 93 ----
examples/cpp/example/socket_windows.cpp | 218 ++++++++
examples/cpp/example_test.py | 106 ++--
examples/cpp/mt/broker.cpp | 280 ++++++++++
examples/cpp/mt/epoll_controller.cpp | 517 +++++++++++++++++++
examples/cpp/options.hpp | 2 +
examples/cpp/recurring_timer.cpp | 4 +-
examples/cpp/server.cpp | 4 +-
examples/cpp/server_direct.cpp | 4 +-
examples/cpp/simple_recv.cpp | 4 +-
examples/cpp/simple_send.cpp | 4 +-
examples/cpp/tutorial.dox | 403 +++++++++++++++
proton-c/bindings/cpp/CMakeLists.txt | 13 +-
proton-c/bindings/cpp/cpp.cmake | 3 +
proton-c/bindings/cpp/docs/mainpage.md | 152 +++---
proton-c/bindings/cpp/docs/mt_page.md | 21 +
proton-c/bindings/cpp/docs/tutorial.dox | 428 ---------------
proton-c/bindings/cpp/docs/user.doxygen.in | 3 +-
.../cpp/include/proton/connection_options.hpp | 9 +-
.../bindings/cpp/include/proton/controller.hpp | 118 +++++
proton-c/bindings/cpp/include/proton/error.hpp | 7 +-
.../bindings/cpp/include/proton/handler.hpp | 12 +
.../cpp/include/proton/io/connection_engine.hpp | 88 ++--
.../include/proton/io/default_controller.hpp | 47 ++
.../bindings/cpp/include/proton/io/socket.hpp | 130 -----
proton-c/bindings/cpp/include/proton/sender.hpp | 3 +-
.../bindings/cpp/include/proton/work_queue.hpp | 75 +++
.../bindings/cpp/src/connection_options.cpp | 13 +-
proton-c/bindings/cpp/src/contexts.hpp | 5 +-
proton-c/bindings/cpp/src/controller.cpp | 59 +++
proton-c/bindings/cpp/src/engine_test.cpp | 45 --
.../bindings/cpp/src/io/connection_engine.cpp | 67 +--
proton-c/bindings/cpp/src/io/posix/socket.cpp | 196 -------
proton-c/bindings/cpp/src/io/windows/socket.cpp | 218 --------
proton-c/bindings/cpp/src/messaging_adapter.cpp | 5 +-
tests/tools/apps/cpp/CMakeLists.txt | 2 +-
tests/tools/apps/cpp/reactor_send.cpp | 4 +-
53 files changed, 2054 insertions(+), 2352 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 744ddb3..5eb779b 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -73,7 +73,7 @@ export LD_LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LD_LIBRARY_PATH)"
export PATH="$(merge_paths $PATH $PROTON_BUILD/tests/tools/apps/c $PROTON_HOME/tests/tools/apps/python $PROTON_HOME/tests/python)"
# can the test harness use valgrind?
-if [[ -x "$(type -p valgrind)" ]] ; then
+if [[ -x "$(type -p valgrind)" && "@ENABLE_VALGRIND" == "ON" ]] ; then
export VALGRIND=$(type -p valgrind)
fi
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 4f6b742..3a81718 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -20,7 +20,10 @@
find_package(ProtonCpp REQUIRED)
include_directories(${ProtonCpp_INCLUDE_DIRS})
+link_libraries(${ProtonCpp_LIBRARIES})
+add_compile_options(${CXX_WARNING_FLAGS})
+# Single-threaded examples.
foreach(example
broker
helloworld
@@ -40,12 +43,9 @@ foreach(example
ssl_client_cert
encode_decode)
add_executable(${example} ${example}.cpp)
- target_link_libraries(${example} ${ProtonCpp_LIBRARIES})
- set_source_files_properties(${example}.cpp PROPERTIES COMPILE_FLAGS "${CXX_WARNING_FLAGS}")
endforeach()
-add_subdirectory(engine)
-
+# Python test runner
set(env_py ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py)
function(set_test_path dir)
@@ -61,7 +61,16 @@ set_test_path("$<TARGET_FILE_DIR:broker>")
add_test(NAME cpp_container_example_test
COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleTest)
-set_test_path("$<TARGET_FILE_DIR:engine-broker>")
-
-add_test(NAME cpp_engine_example_test
- COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ConnectionEngineExampleTest)
+# TODO aconway 2016-04-26: need portable MT and IO examples.
+if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND BUILD_CPP_MT)
+ set(controller_src mt/epoll_controller.cpp)
+ foreach(example
+ broker
+ )
+ add_executable(mt_${example} mt/${example}.cpp ${controller_src})
+ target_link_libraries(mt_${example} pthread)
+ set_target_properties(mt_${example} PROPERTIES CXX_STANDARD 11)
+ endforeach()
+ add_test(NAME cpp_mt_example_test
+ COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v MtBrokerTest)
+endif()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 1e78774..d545366 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -1,15 +1,22 @@
-// Examples overview.
+// C++ examples list (doxygen format)
//
-// For a better overview, see the tutorial in the generated documentation.
-//
-// In your build directory do:
+// For a tutorial-style description of the examples see tutorial.dox.
+// To build the full HTML tutorial and documentation, in your build directory do:
//
// make docs-cpp
//
// then open proton-c/bindings/cpp/docs/html/tutorial.html in your browser.
-// DEVELOPER NOTE: if you are adding or modifying examples you should keep this
-// file and ../proton-c/bindings/cpp/docs/tutorial.hpp up to date.
+// DEVELOPER NOTE: if you add or modify examples, please add/update a short
+// description below and (if appropriate) extend/update tutorial.dox.
+
+/** example sub directory
+
+The example sub-directory has utilities classes to make the example simpler,
+these classes are not directly related to the use of proton so are in a separate
+`example` directory and namespace.
+
+*/
/** @example helloworld.cpp
@@ -46,7 +53,7 @@ on 127.0.0.1:5672. Simply prints out the body of received messages.
/** @example direct_send.cpp
Accepts an incoming connection and then sends like `simple_send`. You can
-connect directly to `direct_send` *without* a broker using \ref simple_recv.cpp.
+connect directly to `direct_send` *without* a broker using @ref simple_recv.cpp.
Make sure to stop the broker first or use a different port for `direct_send`.
*/
@@ -54,7 +61,7 @@ Make sure to stop the broker first or use a different port for `direct_send`.
/** @example direct_recv.cpp
Accepts an incoming connection and then receives like `simple_recv`. You can
-connect directly to `direct_recv` *without* a broker using \ref simple_send.cpp.
+connect directly to `direct_recv` *without* a broker using @ref simple_send.cpp.
Make sure to stop the broker first or use a different port for `direct_recv`.
*/
@@ -108,9 +115,6 @@ automatically when a client tries to send or subscribe. This file contains
the `queue` class that queues messages and the `broker_handler` class
that manages queues and links and transfers messages to/from clients.
-Examples \ref broker.cpp and \ref engine/broker.cpp use this same
-broker logic but show different ways to run it in a server application.
-
*/
/** @example broker.cpp
@@ -120,79 +124,40 @@ to run other examples that reqiure an intermediary, or you can use any AMQP 1.0
broker. This broker creates queues automatically when a client tries to send or
subscribe.
-Uses the broker logic from \ref broker.hpp, the same logic as the
-`proton::connection_engine` broker example \ref engine/broker.cpp.
-
*/
-//////////////// connection_engine examples.
+/** @example mt/epoll_controller.cpp
-/** \example engine/helloworld.cpp
+An example implementation of the proton::mt::controller API that shows how to
+use the prton::io::connection_engine SPI to adapt the proton API to native
+IO. In this case using a multi-threaded Linux epoll poller as the implementation.
-`proton::connection_engine` example to send a "Hello World" message to
-itself. Compare with the corresponding `proton::container` example \ref
-helloworld.cpp.
+__Requires C++11__
*/
-/** \example engine/simple_send.cpp
+/** @example mt/broker.cpp
-`proton::connection_engine` example of sending a fixed number of messages and
-tracking their (asynchronous) acknowledgement. Messages are sent through the
-'examples' node on an intermediary accessible on 127.0.0.1:5672.
+A multi-threaded broker, using the proton::mt extensions. This broker is
+portable over any implementation of the proton::mt API, see @ref
+mt/epoll_controller.cpp for an example.
-*/
-
-/** \example engine/simple_recv.cpp
-
-`proton::connection_engine` example that subscribes to the 'examples' node and prints
- the body of received messages.
+__Requires C++11__
*/
-/** \example engine/direct_send.cpp
+/** @example mt/simple_send.cpp
-`proton::connection_engine` example accepts an incoming connection and then
-sends like `simple_send`. You can connect directly to `direct_send` *without* a
-broker using \ref simple_recv.cpp. Make sure to stop the broker first or use a
-different port for `direct_send`.
+A multi-threaded sender client. Sends messages concurrently to multiple addresses.
-*/
-
-/** \example engine/direct_recv.cpp
-
-`proton::connection_engine` example accepts an incoming connection and then
-receives like `simple_recv`. You can connect directly to `direct_recv`
-*without* a broker using \ref simple_send.cpp. Make sure to stop the broker
-first or use a different port for `direct_recv`.
+__Requires C++11__
*/
-/** \example engine/client.cpp
+/** @example mt/simple_recv.cpp
-`proton::connection_engine` client for request-response example. Sends requests and
-prints out responses. Requires an intermediary that supports the AMQP 1.0
-dynamic nodes on which the responses are received. The requests are sent through
-the 'examples' node.
+A multi-threaded receiver client. Receives messages concurrently to multiple addresses.
-*/
+__Requires C++11__
-/** \example engine/server.cpp
-
-`proton::connection_engine` server for request-response example, that receives
-requests via the examples node, converts the body to uppercase and sends the
-result back to the indicated reply address.
-
-*/
-
-/** \example engine/broker.cpp
-
-A simple, single-threaded broker using the `proton::container`. You can use this
-to run other examples that reqiure an intermediary, or you can use any AMQP 1.0
-broker. This broker creates queues automatically when a client tries to send or
-subscribe.
-
-Uses the broker logic from \ref broker.hpp, the same logic as the
-proton::container` broker example \ref broker.cpp.
-
-*/
+*/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index 37839c6..a19997f 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -61,7 +61,7 @@ class broker {
int main(int argc, char **argv) {
proton::url url("0.0.0.0");
- options opts(argc, argv);
+ example::options opts(argc, argv);
opts.add_value(url, 'a', "address", "listen on URL", "URL");
@@ -72,7 +72,7 @@ int main(int argc, char **argv) {
proton::container(b.handler()).run();
return 0;
- } catch (const bad_option& e) {
+ } catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp
index 0c38ac6..494294e 100644
--- a/examples/cpp/client.cpp
+++ b/examples/cpp/client.cpp
@@ -80,7 +80,7 @@ class client : public proton::handler {
int main(int argc, char **argv) {
proton::url url("127.0.0.1:5672/examples");
- options opts(argc, argv);
+ example::options opts(argc, argv);
opts.add_value(url, 'a', "address", "connect and send to URL", "URL");
@@ -97,7 +97,7 @@ int main(int argc, char **argv) {
proton::container(c).run();
return 0;
- } catch (const bad_option& e) {
+ } catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/direct_recv.cpp b/examples/cpp/direct_recv.cpp
index f999869..76bbaf9 100644
--- a/examples/cpp/direct_recv.cpp
+++ b/examples/cpp/direct_recv.cpp
@@ -72,7 +72,7 @@ class direct_recv : public proton::handler {
int main(int argc, char **argv) {
std::string address("127.0.0.1:5672/examples");
int message_count = 100;
- options opts(argc, argv);
+ example::options opts(argc, argv);
opts.add_value(address, 'a', "address", "listen and receive on URL", "URL");
opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
@@ -84,7 +84,7 @@ int main(int argc, char **argv) {
proton::container(recv).run();
return 0;
- } catch (const bad_option& e) {
+ } catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/direct_send.cpp b/examples/cpp/direct_send.cpp
index 0b63ec5..860acc4 100644
--- a/examples/cpp/direct_send.cpp
+++ b/examples/cpp/direct_send.cpp
@@ -82,7 +82,7 @@ class simple_send : public proton::handler {
int main(int argc, char **argv) {
std::string address("127.0.0.1:5672/examples");
int message_count = 100;
- options opts(argc, argv);
+ example::options opts(argc, argv);
opts.add_value(address, 'a', "address", "listen and send on URL", "URL");
opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
@@ -94,7 +94,7 @@ int main(int argc, char **argv) {
proton::container(send).run();
return 0;
- } catch (const bad_option& e) {
+ } catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/CMakeLists.txt b/examples/cpp/engine/CMakeLists.txt
deleted file mode 100644
index bafa20c..0000000
--- a/examples/cpp/engine/CMakeLists.txt
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(ProtonCpp REQUIRED)
-
-include_directories(${ProtonCpp_INCLUDE_DIRS})
-
-foreach(example
- broker
- helloworld
- simple_recv
- simple_send
- direct_recv
- direct_send
- client
- server)
- add_executable(engine-${example} ${example}.cpp ${extra_source})
- target_link_libraries(engine-${example} ${ProtonCpp_LIBRARIES})
- set_source_files_properties(engine-${example}.cpp PROPERTIES COMPILE_FLAGS "${CXX_WARNING_FLAGS}")
- set_target_properties(engine-${example} PROPERTIES OUTPUT_NAME ${example})
-endforeach()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/broker.cpp b/examples/cpp/engine/broker.cpp
deleted file mode 100644
index bfe84fc..0000000
--- a/examples/cpp/engine/broker.cpp
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "../options.hpp"
-#include "../broker.hpp"
-
-#include <iostream>
-
-#ifndef WIN32 // TODO aconway 2016-03-23: windows broker example
-#include <proton/io/socket.hpp>
-#include <sys/select.h>
-#include <set>
-
-template <class T> T check(T result, const std::string& msg="io_error: ") {
- if (result < 0)
- throw proton::io::socket::io_error(msg + proton::io::socket::error_str());
- return result;
-}
-
-void fd_set_if(bool on, int fd, fd_set *fds);
-
-class broker {
- typedef std::set<proton::io::socket::engine*> engines;
-
- queues queues_;
- broker_handler handler_;
- proton::io::connection_engine::container container_;
- engines engines_;
- fd_set reading_, writing_;
-
- public:
- broker() : handler_(queues_) {
- FD_ZERO(&reading_);
- FD_ZERO(&writing_);
- }
-
- ~broker() {
- for (engines::iterator i = engines_.begin(); i != engines_.end(); ++i)
- delete *i;
- }
-
- void run(const proton::url& url) {
- proton::io::socket::listener listener(url.host(), url.port());
- std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl;
- FD_SET(listener.socket(), &reading_);
- while(true) {
- fd_set readable_set = reading_;
- fd_set writable_set = writing_;
- check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
-
- if (FD_ISSET(listener.socket(), &readable_set)) {
- std::string client_host, client_port;
- int fd = listener.accept(client_host, client_port);
- std::cout << "accepted " << client_host << ":" << client_port
- << " fd=" << fd << std::endl;
- engines_.insert(
- new proton::io::socket::engine(
- fd, handler_, container_.make_options()));
- FD_SET(fd, &reading_);
- FD_SET(fd, &writing_);
- }
-
- for (engines::iterator i = engines_.begin(); i != engines_.end(); ) {
- proton::io::socket::engine *eng = *(i++);
- int flags = 0;
- if (FD_ISSET(eng->socket(), &writable_set))
- eng->write();
- if (FD_ISSET(eng->socket(), &readable_set))
- eng->read();
- if (eng->dispatch()) {
- fd_set_if(eng->read_buffer().size, eng->socket(), &reading_);
- fd_set_if(eng->write_buffer().size, eng->socket(), &writing_);
- } else {
- std::cout << "closed fd=" << eng->socket() << std::endl;
- engines_.erase(eng);
- delete eng;
- }
- }
- }
- }
-};
-
-void fd_set_if(bool on, int fd, fd_set *fds) {
- if (on)
- FD_SET(fd, fds);
- else
- FD_CLR(fd, fds);
-}
-
-int main(int argc, char **argv) {
- // Command line options
- std::string address("0.0.0.0");
- options opts(argc, argv);
- opts.add_value(address, 'a', "address", "listen on URL", "URL");
- try {
- opts.parse();
- broker().run(address);
- return 0;
- } catch (const bad_option& e) {
- std::cout << opts << std::endl << e.what() << std::endl;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
-#else // WIN32
-
-#include "proton/acceptor.hpp"
-#include "proton/container.hpp"
-#include "proton/value.hpp"
-
-#include "../fake_cpp11.hpp"
-
-class broker {
- public:
- broker(const proton::url& url) : handler_(url, queues_) {}
-
- proton::handler& handler() { return handler_; }
-
- private:
-
- class my_handler : public broker_handler {
- public:
- my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
-
- void on_container_start(proton::container &c) override {
- c.listen(url_);
- std::cout << "broker listening on " << url_ << std::endl;
- }
-
- private:
- const proton::url& url_;
- };
-
- private:
- queues queues_;
- my_handler handler_;
-};
-
-int main(int argc, char **argv) {
- // Command line options
- proton::url url("0.0.0.0");
- options opts(argc, argv);
- opts.add_value(url, 'a', "address", "listen on URL", "URL");
- try {
- opts.parse();
- broker b(url);
- proton::container(b.handler()).run();
- return 0;
- } catch (const bad_option& e) {
- std::cout << opts << std::endl << e.what() << std::endl;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp
deleted file mode 100644
index 8e58a38..0000000
--- a/examples/cpp/engine/client.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/delivery.hpp"
-#include "proton/handler.hpp"
-#include "proton/connection.hpp"
-#include "proton/tracker.hpp"
-#include "proton/source_options.hpp"
-
-#include <iostream>
-#include <vector>
-
-#include "../fake_cpp11.hpp"
-
-using proton::receiver_options;
-using proton::source_options;
-
-class client : public proton::handler {
- private:
- proton::url url;
- std::vector<std::string> requests;
- proton::sender sender;
- proton::receiver receiver;
-
- public:
- client(const proton::url &u, const std::vector<std::string>& r) : url(u), requests(r) {}
-
- void on_connection_open(proton::connection &c) override {
- sender = c.open_sender(url.path());
- // Create a receiver requesting a dynamically created queue
- // for the message source.
- receiver_options dynamic_addr = receiver_options().source(source_options().dynamic(true));
- receiver = c.open_receiver("", dynamic_addr);
- }
-
- void send_request() {
- proton::message req;
- req.body(requests.front());
- req.reply_to(receiver.source().address());
- sender.send(req);
- }
-
- void on_receiver_open(proton::receiver &) override {
- send_request();
- }
-
- void on_message(proton::delivery &d, proton::message &response) override {
- if (requests.empty()) return; // Spurious extra message!
- std::cout << requests.front() << " => " << response.body() << std::endl;
- requests.erase(requests.begin());
- if (!requests.empty()) {
- send_request();
- } else {
- d.connection().close();
- }
- }
-};
-
-int main(int argc, char **argv) {
- // Command line options
- std::string address("127.0.0.1:5672/examples");
- options opts(argc, argv);
- opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
-
- try {
- opts.parse();
-
- std::vector<std::string> requests;
- requests.push_back("Twas brillig, and the slithy toves");
- requests.push_back("Did gire and gymble in the wabe.");
- requests.push_back("All mimsy were the borogroves,");
- requests.push_back("And the mome raths outgrabe.");
- client handler(address, requests);
- proton::io::socket::engine(address, handler).run();
- return 0;
- } catch (const bad_option& e) {
- std::cout << opts << std::endl << e.what() << std::endl;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_recv.cpp b/examples/cpp/engine/direct_recv.cpp
deleted file mode 100644
index 48f4478..0000000
--- a/examples/cpp/engine/direct_recv.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/delivery.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/handler.hpp"
-#include "proton/receiver.hpp"
-#include "proton/url.hpp"
-#include "proton/value.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class direct_recv : public proton::handler {
- private:
- uint64_t expected;
- uint64_t received;
-
- public:
- direct_recv(int c) : expected(c), received(0) {}
-
- void on_message(proton::delivery &d, proton::message &msg) override {
- if (msg.id().get<uint64_t>() < received)
- return; // ignore duplicate
- if (expected == 0 || received < expected) {
- std::cout << msg.body() << std::endl;
- received++;
- }
- if (received == expected) {
- d.receiver().close();
- d.connection().close();
- }
- }
-};
-
-int main(int argc, char **argv) {
- // Command line options
- std::string address("127.0.0.1:5672/examples");
- int message_count = 100;
- options opts(argc, argv);
- opts.add_value(address, 'a', "address", "listen and receive on URL", "URL");
- opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
- try {
- opts.parse();
- proton::url url(address);
- proton::io::socket::listener listener(url.host(), url.port());
- std::cout << "direct_recv listening on " << url << std::endl;
- direct_recv handler(message_count);
- proton::io::socket::engine(listener.accept(), handler).run();
- return 0;
- } catch (const bad_option& e) {
- std::cout << opts << std::endl << e.what() << std::endl;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_send.cpp b/examples/cpp/engine/direct_send.cpp
deleted file mode 100644
index 2d9acf0..0000000
--- a/examples/cpp/engine/direct_send.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/acceptor.hpp"
-#include "proton/connection.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/tracker.hpp"
-#include "proton/value.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class simple_send : public proton::handler {
- private:
- int sent;
- int confirmed;
- int total;
- public:
- simple_send(int c) : sent(0), confirmed(0), total(c) {}
-
- void on_sendable(proton::sender &sender) override {
- while (sender.credit() && sent < total) {
- proton::message msg;
- msg.id(sent + 1);
- std::map<std::string, int> m;
- m["sequence"] = sent+1;
- msg.body(m);
- sender.send(msg);
- sent++;
- }
- }
-
- void on_tracker_accept(proton::tracker &t) override {
- confirmed++;
- if (confirmed == total) {
- std::cout << "all messages confirmed" << std::endl;
- t.connection().close();
- }
- }
-
- void on_transport_close(proton::transport &) override {
- sent = confirmed;
- }
-};
-
-int main(int argc, char **argv) {
- // Command line options
- std::string address("127.0.0.1:5672/examples");
- int message_count = 100;
- options opts(argc, argv);
- opts.add_value(address, 'a', "address", "listen and send on URL", "URL");
- opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
- try {
- opts.parse();
- proton::url url(address);
- proton::io::socket::listener listener(url.host(), url.port());
- std::cout << "direct_send listening on " << url << std::endl;
- simple_send handler(message_count);
- proton::io::socket::engine(listener.accept(), handler).run();
- return 0;
- } catch (const bad_option& e) {
- std::cout << opts << std::endl << e.what() << std::endl;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/helloworld.cpp b/examples/cpp/engine/helloworld.cpp
deleted file mode 100644
index a4f23ef..0000000
--- a/examples/cpp/engine/helloworld.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/delivery.hpp"
-#include "proton/handler.hpp"
-#include "proton/tracker.hpp"
-#include "proton/url.hpp"
-#include "proton/io/socket.hpp"
-
-#include <iostream>
-
-#include "../fake_cpp11.hpp"
-
-class hello_world : public proton::handler {
- private:
- std::string address_;
-
- public:
- hello_world(const std::string& address) : address_(address) {}
-
- void on_connection_open(proton::connection &c) override {
- c.open_receiver(address_);
- c.open_sender(address_);
- }
-
- void on_sendable(proton::sender &s) override {
- proton::message m("Hello World!");
- s.send(m);
- s.close();
- }
-
- void on_message(proton::delivery &d, proton::message &m) override {
- std::cout << m.body() << std::endl;
- d.connection().close();
- }
-};
-
-int main(int argc, char **argv) {
- try {
- proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
- hello_world hw(url.path());
- proton::io::socket::engine(url, hw).run();
-
- return 0;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
-
- return 1;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/options.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/options.hpp b/examples/cpp/engine/options.hpp
deleted file mode 100644
index bd477b5..0000000
--- a/examples/cpp/engine/options.hpp
+++ /dev/null
@@ -1,173 +0,0 @@
-#ifndef OPTIONS_HPP
-#define OPTIONS_HPP
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <string>
-#include <sstream>
-#include <ostream>
-#include <vector>
-#include <stdexcept>
-
-/** bad_option is thrown for option parsing errors */
-struct bad_option : public std::runtime_error {
- bad_option(const std::string& s) : std::runtime_error(s) {}
-};
-
-/** Simple command-line option parser for example programs */
-class options {
- public:
-
- options(int argc, char const * const * argv) : argc_(argc), argv_(argv), prog_(argv[0]), help_() {
- size_t slash = prog_.find_last_of("/\\");
- if (slash != std::string::npos)
- prog_ = prog_.substr(slash+1); // Extract prog name from path
- add_flag(help_, 'h', "help", "Print the help message");
- }
-
- ~options() {
- for (opts::iterator i = opts_.begin(); i != opts_.end(); ++i)
- delete *i;
- }
-
- /** Updates value when parse() is called if option is present with a value. */
- template<class T>
- void add_value(T& value, char short_name, const std::string& long_name, const std::string& description, const std::string var) {
- opts_.push_back(new option_value<T>(value, short_name, long_name, description, var));
- }
-
- /** Sets flag when parse() is called if option is present. */
- void add_flag(bool& flag, char short_name, const std::string& long_name, const std::string& description) {
- opts_.push_back(new option_flag(flag, short_name, long_name, description));
- }
-
- /** Parse the command line, return the index of the first non-option argument.
- *@throws bad_option if there is a parsing error or unknown option.
- */
- int parse() {
- int arg = 1;
- for (; arg < argc_ && argv_[arg][0] == '-'; ++arg) {
- opts::iterator i = opts_.begin();
- while (i != opts_.end() && !(*i)->parse(argc_, argv_, arg))
- ++i;
- if (i == opts_.end())
- throw bad_option(std::string("unknown option ") + argv_[arg]);
- }
- if (help_) throw bad_option("");
- return arg;
- }
-
- /** Print a usage message */
- friend std::ostream& operator<<(std::ostream& os, const options& op) {
- os << std::endl << "usage: " << op.prog_ << " [options]" << std::endl;
- os << std::endl << "options:" << std::endl;
- for (opts::const_iterator i = op.opts_.begin(); i < op.opts_.end(); ++i)
- os << **i << std::endl;
- return os;
- }
-
- private:
- class option {
- public:
- option(char s, const std::string& l, const std::string& d, const std::string v) :
- short_(std::string("-") + s), long_("--" + l), desc_(d), var_(v) {}
- virtual ~option() {}
-
- virtual bool parse(int argc, char const * const * argv, int &i) = 0;
- virtual void print_default(std::ostream&) const {};
-
- friend std::ostream& operator<<(std::ostream& os, const option& op) {
- os << " " << op.short_;
- if (!op.var_.empty()) os << " " << op.var_;
- os << ", " << op.long_;
- if (!op.var_.empty()) os << "=" << op.var_;
- os << std::endl << " " << op.desc_;
- op.print_default(os);
- return os;
- }
-
- protected:
- std::string short_, long_, desc_, var_;
- };
-
- template <class T>
- class option_value : public option {
- public:
- option_value(T& value, char s, const std::string& l, const std::string& d, const std::string& v) :
- option(s, l, d, v), value_(value) {}
-
- bool parse(int argc, char const * const * argv, int &i) {
- std::string arg(argv[i]);
- if (arg == short_ || arg == long_) {
- if (i < argc-1) {
- set_value(arg, argv[++i]);
- return true;
- } else {
- throw bad_option("missing value for " + arg);
- }
- }
- if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) {
- set_value(long_, arg.substr(long_.size()+1));
- return true;
- }
- return false;
- }
-
- virtual void print_default(std::ostream& os) const { os << " (default " << value_ << ")"; }
-
- void set_value(const std::string& opt, const std::string& s) {
- std::istringstream is(s);
- is >> value_;
- if (is.fail() || is.bad())
- throw bad_option("bad value for " + opt + ": " + s);
- }
-
- private:
- T& value_;
- };
-
- class option_flag: public option {
- public:
- option_flag(bool& flag, const char s, const std::string& l, const std::string& d) :
- option(s, l, d, ""), flag_(flag)
- { flag_ = false; }
-
- bool parse(int /*argc*/, char const * const * argv, int &i) {
- if (argv[i] == short_ || argv[i] == long_) {
- flag_ = true;
- return true;
- } else {
- return false;
- }
- }
-
- private:
- bool &flag_;
- };
-
- typedef std::vector<option*> opts;
-
- int argc_;
- char const * const * argv_;
- std::string prog_;
- opts opts_;
- bool help_;
-};
-
-#endif // OPTIONS_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/server.cpp b/examples/cpp/engine/server.cpp
deleted file mode 100644
index 31f3599..0000000
--- a/examples/cpp/engine/server.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/connection.hpp"
-#include "proton/delivery.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/tracker.hpp"
-#include "proton/url.hpp"
-
-#include <iostream>
-#include <map>
-#include <string>
-#include <cctype>
-
-#include "../fake_cpp11.hpp"
-
-class server : public proton::handler {
- private:
- typedef std::map<std::string, proton::sender> sender_map;
- proton::url url;
- sender_map senders;
-
- public:
-
- server(const std::string &u) : url(u) {}
-
- void on_connection_open(proton::connection &c) override {
- c.open_receiver(url.path());
- std::cout << "server connected to " << url << std::endl;
- }
-
- std::string to_upper(const std::string &s) {
- std::string uc(s);
- size_t l = uc.size();
- for (size_t i=0; i<l; i++) uc[i] = std::toupper(uc[i]);
- return uc;
- }
-
- void on_message(proton::delivery &d, proton::message &m) override {
- std::cout << "Received " << m.body() << std::endl;
- std::string reply_to = m.reply_to();
- proton::message reply;
- reply.to(reply_to);
- reply.body(to_upper(proton::get<std::string>(m.body())));
- reply.correlation_id(m.correlation_id());
- if (!senders[reply_to])
- senders[reply_to] = d.connection().open_sender(reply_to);
- senders[reply_to].send(reply);
- }
-};
-
-int main(int argc, char **argv) {
- // Command line options
- std::string address("amqp://0.0.0.0:5672/examples");
- options opts(argc, argv);
- opts.add_value(address, 'a', "address", "listen on URL", "URL");
- try {
- opts.parse();
- server handler(address);
- proton::io::socket::engine(address, handler).run();
- return 0;
- } catch (const bad_option& e) {
- std::cout << opts << std::endl << e.what() << std::endl;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_recv.cpp b/examples/cpp/engine/simple_recv.cpp
deleted file mode 100644
index ffd80f9..0000000
--- a/examples/cpp/engine/simple_recv.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/receiver.hpp"
-#include "proton/value.hpp"
-#include "proton/message_id.hpp"
-#include "proton/delivery.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class simple_recv : public proton::handler {
- private:
- proton::url url;
- proton::receiver receiver;
- uint64_t expected;
- uint64_t received;
- public:
-
- simple_recv(const std::string &s, int c) : url(s), expected(c), received(0) {}
-
- void on_connection_open(proton::connection &c) override {
- receiver = c.open_receiver(url.path());
- std::cout << "simple_recv listening on " << url << std::endl;
- }
-
- void on_message(proton::delivery& d, proton::message &msg) override {
- if (msg.id().get<uint64_t>() < received)
- return; // ignore duplicate
- if (expected == 0 || received < expected) {
- std::cout << msg.body() << std::endl;
- received++;
- if (received == expected) {
- d.receiver().close();
- d.connection().close();
- }
- }
- }
-};
-
-int main(int argc, char **argv) {
- // Command line options
- std::string address("127.0.0.1:5672/examples");
- int message_count = 100;
- options opts(argc, argv);
- opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL");
- opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
-
- try {
- opts.parse();
- simple_recv handler(address, message_count);
- proton::io::socket::engine(address, handler).run();
- return 0;
- } catch (const bad_option& e) {
- std::cout << opts << std::endl << e.what() << std::endl;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_send.cpp b/examples/cpp/engine/simple_send.cpp
deleted file mode 100644
index e08f39f..0000000
--- a/examples/cpp/engine/simple_send.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/connection.hpp"
-#include "proton/tracker.hpp"
-#include "proton/value.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class simple_send : public proton::handler {
- private:
- proton::url url;
- int sent;
- int confirmed;
- int total;
- public:
-
- simple_send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {}
-
- void on_connection_open(proton::connection &c) override {
- c.open_sender(url.path());
- }
-
- void on_sendable(proton::sender &sender) override {
- while (sender.credit() && sent < total) {
- proton::message msg;
- msg.id(sent + 1);
- std::map<std::string, int> m;
- m["sequence"] = sent+1;
- msg.body(m);
- sender.send(msg);
- sent++;
- }
- }
-
- void on_tracker_accept(proton::tracker &t) override {
- confirmed++;
- if (confirmed == total) {
- std::cout << "all messages confirmed" << std::endl;
- t.connection().close();
- }
- }
-
- void on_transport_close(proton::transport &) override {
- sent = confirmed;
- }
-};
-
-int main(int argc, char **argv) {
- // Command line options
- std::string address("127.0.0.1:5672/examples");
- int message_count = 100;
- options opts(argc, argv);
- opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
- opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
- try {
- opts.parse();
- simple_send handler(address, message_count);
- proton::io::socket::engine(address, handler).run();
- return 0;
- } catch (const bad_option& e) {
- std::cout << opts << std::endl << e.what() << std::endl;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/example/socket_windows.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/example/socket_windows.cpp b/examples/cpp/example/socket_windows.cpp
new file mode 100644
index 0000000..f312525
--- /dev/null
+++ b/examples/cpp/example/socket_windows.cpp
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "msg.hpp"
+
+#include <proton/io/socket.hpp>
+#include <proton/url.hpp>
+
+#define FD_SETSIZE 2048
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <assert.h>
+
+namespace proton {
+namespace io {
+namespace socket {
+
+const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET;
+
+std::string error_str() {
+ HRESULT code = WSAGetLastError();
+ char err[1024] = {0};
+ FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+ FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+ return err;
+}
+
+io_error::io_error(const std::string& s) : error(s) {}
+
+namespace {
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+ if (result == SOCKET_ERROR)
+ throw io_error(msg + error_str());
+ return result;
+}
+
+void gai_check(int result, const std::string& msg="") {
+ if (result)
+ throw io_error(msg + gai_strerror(result));
+}
+
+} // namespace
+
+void initialize() {
+ WSADATA unused;
+ check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2
+}
+
+void finalize() {
+ WSACleanup();
+}
+
+void engine::init() {
+ u_long nonblock = 1;
+ check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: ");
+}
+
+engine::engine(descriptor fd, handler& h, const connection_options &opts)
+ : connection_engine(h, opts), socket_(fd)
+{
+ init();
+}
+
+engine::engine(const url& u, handler& h, const connection_options &opts)
+ : connection_engine(h, opts), socket_(connect(u))
+{
+ init();
+ connection().open();
+}
+
+engine::~engine() {}
+
+void engine::read() {
+ mutable_buffer rbuf = read_buffer();
+ if (rbuf.size > 0) {
+ int n = ::recv(socket_, rbuf.data, rbuf.size, 0);
+ if (n > 0)
+ read_done(n);
+ else if (n == 0)
+ read_close();
+ else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
+ close(error_condition("io_error", error_str()));
+ }
+}
+
+void engine::write() {
+ const_buffer wbuf = write_buffer();
+ if (wbuf.size > 0) {
+ int n = ::send(socket_, wbuf.data, wbuf.size, 0);
+ if (n > 0)
+ write_done(n);
+ else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
+ close(error_condition("io_error", error_str()));
+ }
+}
+
+void engine::run() {
+ while (dispatch()) {
+ fd_set rd, wr;
+ FD_ZERO(&rd);
+ if (read_buffer().size)
+ FD_SET(socket_, &rd);
+ FD_ZERO(&wr);
+ if (write_buffer().size)
+ FD_SET(socket_, &wr);
+ int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
+ if (n < 0) {
+ close(error_condition("select: ", error_str()));
+ break;
+ }
+ if (FD_ISSET(socket_, &rd)) {
+ read();
+ }
+ if (FD_ISSET(socket_, &wr))
+ write();
+ }
+ ::closesocket(socket_);
+}
+
+namespace {
+struct auto_addrinfo {
+ struct addrinfo *ptr;
+ auto_addrinfo() : ptr(0) {}
+ ~auto_addrinfo() { ::freeaddrinfo(ptr); }
+ addrinfo* operator->() const { return ptr; }
+};
+
+static const char *amqp_service(const char *port) {
+ // Help older Windows to know about amqp[s] ports
+ if (port) {
+ if (!strcmp("amqp", port)) return "5672";
+ if (!strcmp("amqps", port)) return "5671";
+ }
+ return port;
+}
+}
+
+
+descriptor connect(const proton::url& u) {
+ // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
+ std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host();
+ descriptor fd = INVALID_SOCKET;
+ try{
+ auto_addrinfo addr;
+ gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+ amqp_service(u.port().empty() ? 0 : u.port().c_str()),
+ 0, &addr.ptr),
+ "connect address invalid: ");
+ fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: ");
+ check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
+ return fd;
+ } catch (...) {
+ if (fd != INVALID_SOCKET) ::closesocket(fd);
+ throw;
+ }
+}
+
+listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) {
+ try {
+ auto_addrinfo addr;
+ gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+ port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
+ "listener address invalid: ");
+ socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: ");
+ bool yes = true;
+ check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: ");
+ check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: ");
+ check(::listen(socket_, 32), "listener listen: ");
+ } catch (...) {
+ if (socket_ != INVALID_SOCKET) ::closesocket(socket_);
+ throw;
+ }
+}
+
+listener::~listener() { ::closesocket(socket_); }
+
+descriptor listener::accept(std::string& host_str, std::string& port_str) {
+ struct sockaddr_storage addr;
+ socklen_t size = sizeof(addr);
+ int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
+ char host[NI_MAXHOST], port[NI_MAXSERV];
+ gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
+ host, sizeof(host), port, sizeof(port), 0),
+ "accept invalid remote address: ");
+ host_str = host;
+ port_str = port;
+ return fd;
+}
+
+}}}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index d228d67..38a5154 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -131,60 +131,40 @@ class Proc(Popen):
raise ProcError(self, "timeout waiting for exit")
-def count_tests(cls):
- methods = inspect.getmembers(cls, predicate=inspect.ismethod)
- tests = [ i for i,j in methods if i.startswith('test_') ]
- return len(tests)
-
-class CompatSetupClass(object):
- # Roughly provides setUpClass and tearDownClass functionality for older python versions
- # in our test scenarios
- def __init__(self, target):
- self.completed = False
- self.test_count = count_tests(target)
- self.target = target
- self.global_setup = False
-
- def note_setup(self):
- if not self.global_setup:
- self.global_setup = True
- self.target.setup_class()
-
- def note_teardown(self):
- self.test_count -= 1
- if self.test_count == 0:
- self.completed = True
- self.target.teardown_class()
-
-
-class ExampleTestCase(unittest.TestCase):
-
- @classmethod
- def setup_class(cls):
- pass
-
- @classmethod
- def teardown_class(cls):
- pass
-
- def completed(self):
- cls = self.__class__
- return cls.compat_ and cls.compat_.completed
-
+if hasattr(unittest.TestCase, 'setUpClass') and hasattr(unittest.TestCase, 'tearDownClass'):
+ TestCase = unittest.TestCase
+else:
+ class TestCase(unittest.TestCase):
+ """
+ Roughly provides setUpClass and tearDownClass functionality for older python
+ versions in our test scenarios. If subclasses override setUp or tearDown
+ they *must* call the superclass.
+ """
+ def setUp(self):
+ if not hasattr(type(self), '_setup_class_count'):
+ type(self)._setup_class_count = len(
+ inspect.getmembers(
+ type(self),
+ predicate=lambda(m): inspect.ismethod(m) and m.__name__.startswith('test_')))
+ type(self).setUpClass()
+
+ def tearDown(self):
+ self.assertTrue(self._setup_class_count > 0)
+ self._setup_class_count -= 1
+ if self._setup_class_count == 0:
+ type(self).tearDownClass()
+
+
+class ExampleTestCase(TestCase):
+ """TestCase that manages started processes"""
def setUp(self):
- cls = self.__class__
- if not hasattr(cls, "compat_"):
- cls.compat_ = CompatSetupClass(cls)
- if cls.compat_.completed:
- # Last test for this class already seen.
- raise Exception("Test sequencing error")
- cls.compat_.note_setup()
+ super(ExampleTestCase, self).setUp()
self.procs = []
def tearDown(self):
for p in self.procs:
p.safe_kill()
- self.__class__.compat_.note_teardown()
+ super(ExampleTestCase, self).tearDown()
def proc(self, *args, **kwargs):
p = Proc(*args, **kwargs)
@@ -194,27 +174,26 @@ class ExampleTestCase(unittest.TestCase):
class BrokerTestCase(ExampleTestCase):
"""
ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
+ Subclasses must set `broker_exe` class variable with the name of the broker executable.
"""
- # setUpClass not available until 2.7
@classmethod
- def setup_class(cls):
+ def setUpClass(cls):
cls.addr = pick_addr() + "/examples"
- cls.broker = Proc(["broker", "-a", cls.addr], ready="listening")
+ cls.broker = None # In case Proc throws, create the attribute.
+ cls.broker = Proc([cls.broker_exe, "-a", cls.addr], ready="listening")
cls.broker.wait_ready()
- # tearDownClass not available until 2.7
@classmethod
- def teardown_class(cls):
- cls.broker.safe_kill()
+ def tearDownClass(cls):
+ if cls.broker: cls.broker.safe_kill()
def tearDown(self):
+ b = type(self).broker
+ if b and b.poll() != None: # Broker crashed
+ type(self).setUpClass() # Start another for the next test.
+ raise ProcError(b, "broker crash")
super(BrokerTestCase, self).tearDown()
- if not self.completed():
- b = type(self).broker
- if b.poll() != None: # Broker crashed
- type(self).setUpClass() # Start another for the next test.
- raise ProcError(b, "broker crash")
CLIENT_EXPECT="""Twas brillig, and the slithy toves => TWAS BRILLIG, AND THE SLITHY TOVES
@@ -230,6 +209,8 @@ def recv_expect(name, addr):
class ContainerExampleTest(BrokerTestCase):
"""Run the container examples, verify they behave as expected."""
+ broker_exe = "broker"
+
def test_helloworld(self):
self.assertEqual('Hello World!\n', self.proc(["helloworld", self.addr]).wait_exit())
@@ -341,8 +322,8 @@ Hello World!
self.assertEqual(expect_found, True)
-class ConnectionEngineExampleTest(BrokerTestCase):
- """Run the connction_engine examples, verify they behave as expected."""
+class EngineTestCase(BrokerTestCase):
+ """Run selected clients to test a connction_engine broker."""
def test_helloworld(self):
self.assertEqual('Hello World!\n',
@@ -380,5 +361,8 @@ class ConnectionEngineExampleTest(BrokerTestCase):
self.assertEqual(CLIENT_EXPECT,
self.proc(["client", "-a", self.addr]).wait_exit())
+class MtBrokerTest(EngineTestCase):
+ broker_exe = "mt_broker"
+
if __name__ == "__main__":
unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/mt/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp
new file mode 100644
index 0000000..48738c9
--- /dev/null
+++ b/examples/cpp/mt/broker.cpp
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "../options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/controller.hpp>
+#include <proton/delivery.hpp>
+#include <proton/handler.hpp>
+#include <proton/url.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <functional>
+#include <iostream>
+#include <mutex>
+#include <thread>
+
+// Thread safe queue.
+// Stores messages, notifies subscribed connections when there is data.
+class queue {
+ public:
+ queue(const std::string& name) : name_(name) {}
+
+ std::string name() const { return name_; }
+
+ // Push a message onto the queue.
+ // If the queue was previously empty, notify subscribers it has messages.
+ // Called from receiver's connection.
+ void push(const proton::message &m) {
+ std::lock_guard<std::mutex> g(lock_);
+ messages_.push_back(m);
+ if (messages_.size() == 1) { // Non-empty, notify subscribers
+ for (auto cb : callbacks_)
+ cb(this);
+ callbacks_.clear();
+ }
+ }
+
+ // If the queue is not empty, pop a message into m and return true.
+ // Otherwise save callback to be called when there are messages and return false.
+ // Called from sender's connection.
+ bool pop(proton::message& m, std::function<void(queue*)> callback) {
+ std::lock_guard<std::mutex> g(lock_);
+ if (messages_.empty()) {
+ callbacks_.push_back(callback);
+ return false;
+ } else {
+ m = std::move(messages_.front());
+ messages_.pop_front();
+ return true;
+ }
+ }
+
+ private:
+ const std::string name_;
+ std::mutex lock_;
+ std::deque<proton::message> messages_;
+ std::vector<std::function<void(queue*)> > callbacks_;
+};
+
+/// Thread safe map of queues.
+class queues {
+ public:
+ queues() : next_id_(0) {}
+
+ // Get or create the named queue.
+ queue* get(const std::string& name) {
+ std::lock_guard<std::mutex> g(lock_);
+ auto i = queues_.insert(queue_map::value_type(name, nullptr)).first;
+ if (!i->second)
+ i->second.reset(new queue(name));
+ return i->second.get();
+ }
+
+ // Create a dynamic queue with a unique name.
+ queue* dynamic() {
+ std::ostringstream os;
+ os << "_dynamic_" << next_id_++;
+ return get(os.str());
+ }
+
+ private:
+ typedef std::map<std::string, std::unique_ptr<queue> > queue_map;
+
+ std::mutex lock_;
+ queue_map queues_;
+ std::atomic<uint64_t> next_id_; // Use to generate unique queue IDs.
+};
+
+/// Broker connection handler. Things to note:
+///
+/// Each handler manages a single connection. Proton AMQP callbacks and queue
+/// callbacks via proton::work_queue are serialized per-connection, so the
+/// handler does not need a lock. Handlers for different connections can be
+/// called concurrently.
+///
+/// Senders (aka subscriptions) need some cross-thread notification:.
+///
+/// - a sender that gets credit calls queue::pop() in `on_sendable()`
+/// - on success it sends the message immediatly.
+/// - on queue empty, the sender is added to the `blocked_` set and the queue stores a callback.
+/// - when a receiver thread pushes a message, the queue calls its callbacks.
+/// - the callback causes a serialized call to has_messages() which re-tries all `blocked_` senders.
+///
+class broker_connection_handler : public proton::handler {
+ public:
+ broker_connection_handler(queues& qs) : queues_(qs) {}
+
+ void on_connection_open(proton::connection& c) override {
+ // Create the has_messages callback for use with queue subscriptions.
+ //
+ // Note the captured and bound arguments must be thread-safe to copy,
+ // shared_ptr<work_queue>, and plain pointers this and q are all safe.
+ //
+ // The proton::connection object c is not thread-safe to copy.
+ // However when the work_queue calls this->has_messages it will be safe
+ // to use any proton objects associated with c again.
+ auto work = proton::work_queue::get(c);
+ has_messages_callback_ = [this, work](queue* q) {
+ work->push(std::bind(&broker_connection_handler::has_messages, this, q));
+ };
+ c.open(); // Always accept
+ }
+
+ // A sender sends messages from a queue to a subscriber.
+ void on_sender_open(proton::sender &sender) override {
+ queue *q = sender.source().dynamic() ?
+ queues_.dynamic() : queues_.get(sender.source().address());
+ std::cout << "sending from " << q->name() << std::endl;
+ }
+
+ // We have credit to send a message.
+ void on_sendable(proton::sender &s) override {
+ queue* q = sender_queue(s);
+ if (!do_send(q, s)) // Queue is empty, save ourselves in the blocked set.
+ blocked_.insert(std::make_pair(q, s));
+ }
+
+ // A receiver receives messages from a publisher to a queue.
+ void on_receiver_open(proton::receiver &receiver) override {
+ std::string qname = receiver.target().address();
+ if (qname == "shutdown") {
+ std::cout << "broker shutting down" << std::endl;
+ // Sending to the special "shutdown" queue stops the broker.
+ proton::controller::get(receiver.connection()).stop(
+ proton::error_condition("shutdown", "stop broker"));
+ } else {
+ std::cout << "receiving to " << qname << std::endl;
+ }
+ }
+
+ // A message is received.
+ void on_message(proton::delivery &d, proton::message &m) override {
+ std::string qname = d.receiver().target().address();
+ queues_.get(qname)->push(m);
+ }
+
+ void on_session_close(proton::session &session) override {
+ // Erase all blocked senders that belong to session.
+ auto predicate = [session](const proton::sender& s) {
+ return s.session() == session;
+ };
+ erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
+ }
+
+ void on_sender_close(proton::sender &sender) override {
+ // Erase sender from the blocked set.
+ auto range = blocked_.equal_range(sender_queue(sender));
+ auto predicate = [sender](const proton::sender& s) { return s == sender; };
+ erase_sender_if(range.first, range.second, predicate);
+ }
+
+ // The controller calls on_transport_close() last.
+ void on_transport_close(proton::transport&) override {
+ delete this; // All done.
+ }
+
+ private:
+ typedef std::multimap<queue*, proton::sender> blocked_map;
+
+ // Get the queue associated with a sender.
+ queue* sender_queue(const proton::sender& s) {
+ return queues_.get(s.source().address()); // Thread safe.
+ }
+
+ // Only called if we have credit. Return true if we sent a message.
+ bool do_send(queue* q, proton::sender &s) {
+ proton::message m;
+ bool popped = q->pop(m, has_messages_callback_);
+ if (popped)
+ s.send(m);
+ /// if !popped the queue has saved the callback for later.
+ return popped;
+ }
+
+ // Called via @ref work_queue when q has messages. Try all the blocked senders.
+ void has_messages(queue* q) {
+ auto range = blocked_.equal_range(q);
+ for (auto i = range.first; i != range.second;) {
+ if (i->second.credit() <= 0 || do_send(q, i->second))
+ i = blocked_.erase(i); // No credit or send was successful, stop blocked.
+ else
+ ++i; // have credit, didn't send, keep blocked
+ }
+ }
+
+ // Use to erase closed senders from blocked_ set.
+ template <class Predicate>
+ void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) {
+ for (auto i = begin; i != end; ) {
+ if (p(i->second))
+ i = blocked_.erase(i);
+ else
+ ++i;
+ }
+ }
+
+ queues& queues_;
+ blocked_map blocked_;
+ std::function<void(queue*)> has_messages_callback_;
+ proton::connection connection_;
+};
+
+
+class broker {
+ public:
+ broker(const std::string addr) : controller_(proton::controller::create()) {
+ controller_->options(proton::connection_options().container_id("mt_broker"));
+ std::cout << "broker listening on " << addr << std::endl;
+ controller_->listen(addr, std::bind(&broker::new_handler, this));
+ }
+
+ void run() {
+ for(int i = 0; i < std::thread::hardware_concurrency(); ++i)
+ std::thread(&proton::controller::run, controller_.get()).detach();
+ controller_->wait();
+ }
+
+ private:
+ proton::handler* new_handler() {
+ return new broker_connection_handler(queues_);
+ }
+
+ queues queues_;
+ std::unique_ptr<proton::controller> controller_;
+};
+
+int main(int argc, char **argv) {
+ // Command line options
+ std::string address("0.0.0.0");
+ example::options opts(argc, argv);
+ opts.add_value(address, 'a', "address", "listen on URL", "URL");
+ try {
+ opts.parse();
+ broker(address).run();
+ return 0;
+ } catch (const example::bad_option& e) {
+ std::cout << opts << std::endl << e.what() << std::endl;
+ } catch (const std::exception& e) {
+ std::cerr << "broker shutdown: " << e.what() << std::endl;
+ }
+ return 1;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org