You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/03/23 19:58:45 UTC
[2/3] qpid-proton git commit: PROTON-1161 - c++: better interface to
connection_engine.
PROTON-1161 - c++: better interface to connection_engine.
More flexible connection_engine interface to support reactor and proactor patterns.
Provides direct access to proton buffers for minimal copies in either case.
connection_engine is now completely IO and thread neutral, it simply handles:
bytes-in -> events as handler function calls -> bytes-out
Moved connection_engine and related classes into the proton::io namespace.
Cleaned up engine implementation, improved the unit tests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/222574ed
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/222574ed
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/222574ed
Branch: refs/heads/master
Commit: 222574ed95acaa4143ea3e2c2cd9e7abac9e1dd9
Parents: c63b2be
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 17 18:29:13 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 23 13:48:02 2016 -0400
----------------------------------------------------------------------
examples/cpp/engine/broker.cpp | 141 +++++-----
examples/cpp/engine/client.cpp | 4 +-
examples/cpp/engine/direct_recv.cpp | 6 +-
examples/cpp/engine/direct_send.cpp | 6 +-
examples/cpp/engine/helloworld.cpp | 4 +-
examples/cpp/engine/server.cpp | 4 +-
examples/cpp/engine/simple_recv.cpp | 4 +-
examples/cpp/engine/simple_send.cpp | 4 +-
examples/cpp/example_test.py | 2 +-
proton-c/bindings/cpp/CMakeLists.txt | 12 +-
proton-c/bindings/cpp/docs/mainpage.md | 6 +-
proton-c/bindings/cpp/docs/tutorial.dox | 4 +-
.../bindings/cpp/include/proton/connection.hpp | 6 +-
.../cpp/include/proton/connection_engine.hpp | 168 ------------
.../cpp/include/proton/connection_options.hpp | 6 +-
.../bindings/cpp/include/proton/handler.hpp | 6 +-
proton-c/bindings/cpp/include/proton/io.hpp | 134 ----------
.../cpp/include/proton/io/connection_engine.hpp | 182 +++++++++++++
.../bindings/cpp/include/proton/io/socket.hpp | 130 ++++++++++
.../bindings/cpp/include/proton/transport.hpp | 5 +
proton-c/bindings/cpp/src/connection_engine.cpp | 208 ---------------
proton-c/bindings/cpp/src/contexts.hpp | 15 +-
proton-c/bindings/cpp/src/engine_test.cpp | 258 +++++++++++--------
.../bindings/cpp/src/io/connection_engine.cpp | 172 +++++++++++++
proton-c/bindings/cpp/src/io/posix/socket.cpp | 194 ++++++++++++++
proton-c/bindings/cpp/src/io/windows/socket.cpp | 217 ++++++++++++++++
proton-c/bindings/cpp/src/posix/io.cpp | 175 -------------
proton-c/bindings/cpp/src/scalar_test.cpp | 1 -
proton-c/bindings/cpp/src/test_bits.hpp | 29 ++-
proton-c/bindings/cpp/src/value_test.cpp | 4 +-
proton-c/bindings/cpp/src/windows/io.cpp | 197 --------------
31 files changed, 1187 insertions(+), 1117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/broker.cpp b/examples/cpp/engine/broker.cpp
index de08991..698d795 100644
--- a/examples/cpp/engine/broker.cpp
+++ b/examples/cpp/engine/broker.cpp
@@ -24,76 +24,25 @@
#include <iostream>
-#ifdef WIN32
-#include "proton/acceptor.hpp"
-#include "proton/container.hpp"
-#include "proton/value.hpp"
-
-
-#include "fake_cpp11.hpp"
-
-class broker {
- public:
- broker(const proton::url& url) : handler_(url, queues_) {}
-
- proton::handler& handler() { return handler_; }
-
- private:
-
- class my_handler : public broker_handler {
- public:
- my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
-
- void on_start(proton::event &e) override {
- e.container().listen(url_);
- std::cout << "broker listening on " << url_ << std::endl;
- }
-
- private:
- const proton::url& url_;
- };
-
- private:
- queues queues_;
- my_handler handler_;
-};
-
-int main(int argc, char **argv) {
- // Command line options
- proton::url url("0.0.0.0");
- options opts(argc, argv);
- opts.add_value(url, 'a', "address", "listen on URL", "URL");
- try {
- opts.parse();
- broker b(url);
- proton::container(b.handler()).run();
- return 0;
- } catch (const bad_option& e) {
- std::cout << opts << std::endl << e.what() << std::endl;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
-#else // WIN32
-#include <proton/io.hpp>
+#ifndef WIN32 // TODO aconway 2016-03-23: windows broker example
+#include <proton/io/socket.hpp>
#include <sys/select.h>
#include <set>
template <class T> T check(T result, const std::string& msg="io_error: ") {
if (result < 0)
- throw proton::connection_engine::io_error(msg + proton::io::error_str());
+ throw proton::io::socket::io_error(msg + proton::io::socket::error_str());
return result;
}
void fd_set_if(bool on, int fd, fd_set *fds);
class broker {
- typedef std::set<proton::io::socket_engine*> engines;
+ typedef std::set<proton::io::socket::engine*> engines;
queues queues_;
broker_handler handler_;
- proton::connection_engine::container container_;
+ proton::io::connection_engine::container container_;
engines engines_;
fd_set reading_, writing_;
@@ -109,7 +58,7 @@ class broker {
}
void run(const proton::url& url) {
- proton::io::listener listener(url.host(), url.port());
+ proton::io::socket::listener listener(url.host(), url.port());
std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl;
FD_SET(listener.socket(), &reading_);
while(true) {
@@ -122,27 +71,26 @@ class broker {
int fd = listener.accept(client_host, client_port);
std::cout << "accepted " << client_host << ":" << client_port
<< " fd=" << fd << std::endl;
- engines_.insert(new proton::io::socket_engine(fd, handler_, container_.make_options()));
+ engines_.insert(
+ new proton::io::socket::engine(
+ fd, handler_, container_.make_options()));
FD_SET(fd, &reading_);
FD_SET(fd, &writing_);
}
for (engines::iterator i = engines_.begin(); i != engines_.end(); ) {
- engines::iterator j = i++; // Save iterator in case we need to erase it.
- proton::io::socket_engine *eng = *j;
+ proton::io::socket::engine *eng = *(i++);
int flags = 0;
- if (FD_ISSET(eng->socket(), &readable_set))
- flags |= proton::io::socket_engine::READ;
if (FD_ISSET(eng->socket(), &writable_set))
- flags |= proton::io::socket_engine::WRITE;
- if (flags) eng->process(flags);
- // Set reading/writing bits for next time around
- fd_set_if(eng->can_read(), eng->socket(), &reading_);
- fd_set_if(eng->can_write(), eng->socket(), &writing_);
-
- if (eng->closed()) {
+ eng->write();
+ if (FD_ISSET(eng->socket(), &readable_set))
+ eng->read();
+ if (eng->dispatch()) {
+ fd_set_if(eng->read_buffer().size, eng->socket(), &reading_);
+ fd_set_if(eng->write_buffer().size, eng->socket(), &writing_);
+ } else {
std::cout << "closed fd=" << eng->socket() << std::endl;
- engines_.erase(j);
+ engines_.erase(eng);
delete eng;
}
}
@@ -173,4 +121,57 @@ int main(int argc, char **argv) {
}
return 1;
}
+#else // WIN32
+
+#include "proton/acceptor.hpp"
+#include "proton/container.hpp"
+#include "proton/value.hpp"
+
+
+#include "fake_cpp11.hpp"
+
+class broker {
+ public:
+ broker(const proton::url& url) : handler_(url, queues_) {}
+
+ proton::handler& handler() { return handler_; }
+
+ private:
+
+ class my_handler : public broker_handler {
+ public:
+ my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
+
+ void on_start(proton::event &e) override {
+ e.container().listen(url_);
+ std::cout << "broker listening on " << url_ << std::endl;
+ }
+
+ private:
+ const proton::url& url_;
+ };
+
+ private:
+ queues queues_;
+ my_handler handler_;
+};
+
+int main(int argc, char **argv) {
+ // Command line options
+ proton::url url("0.0.0.0");
+ options opts(argc, argv);
+ opts.add_value(url, 'a', "address", "listen on URL", "URL");
+ try {
+ opts.parse();
+ broker b(url);
+ proton::container(b.handler()).run();
+ return 0;
+ } catch (const bad_option& e) {
+ std::cout << opts << std::endl << e.what() << std::endl;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+ return 1;
+}
+
#endif
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp
index d2d37c0..941ca75 100644
--- a/examples/cpp/engine/client.cpp
+++ b/examples/cpp/engine/client.cpp
@@ -20,7 +20,7 @@
*/
#include "options.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
#include "proton/url.hpp"
#include "proton/event.hpp"
#include "proton/handler.hpp"
@@ -87,7 +87,7 @@ int main(int argc, char **argv) {
requests.push_back("All mimsy were the borogroves,");
requests.push_back("And the mome raths outgrabe.");
client handler(address, requests);
- proton::io::socket_engine(address, handler).run();
+ proton::io::socket::engine(address, handler).run();
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_recv.cpp b/examples/cpp/engine/direct_recv.cpp
index 3fcc28e..51f6572 100644
--- a/examples/cpp/engine/direct_recv.cpp
+++ b/examples/cpp/engine/direct_recv.cpp
@@ -21,7 +21,7 @@
#include "options.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
#include "proton/event.hpp"
#include "proton/handler.hpp"
#include "proton/link.hpp"
@@ -70,10 +70,10 @@ int main(int argc, char **argv) {
try {
opts.parse();
proton::url url(address);
- proton::io::listener listener(url.host(), url.port());
+ proton::io::socket::listener listener(url.host(), url.port());
std::cout << "direct_recv listening on " << url << std::endl;
direct_recv handler(message_count);
- proton::io::socket_engine(listener.accept(), handler).run();
+ proton::io::socket::engine(listener.accept(), handler).run();
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_send.cpp b/examples/cpp/engine/direct_send.cpp
index 28ea845..4d7be72 100644
--- a/examples/cpp/engine/direct_send.cpp
+++ b/examples/cpp/engine/direct_send.cpp
@@ -23,7 +23,7 @@
#include "proton/acceptor.hpp"
#include "proton/connection.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
#include "proton/url.hpp"
#include "proton/event.hpp"
#include "proton/handler.hpp"
@@ -82,10 +82,10 @@ int main(int argc, char **argv) {
try {
opts.parse();
proton::url url(address);
- proton::io::listener listener(url.host(), url.port());
+ proton::io::socket::listener listener(url.host(), url.port());
std::cout << "direct_send listening on " << url << std::endl;
simple_send handler(message_count);
- proton::io::socket_engine(listener.accept(), handler).run();
+ proton::io::socket::engine(listener.accept(), handler).run();
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/helloworld.cpp b/examples/cpp/engine/helloworld.cpp
index d5a9f44..4bb0ed3 100644
--- a/examples/cpp/engine/helloworld.cpp
+++ b/examples/cpp/engine/helloworld.cpp
@@ -22,7 +22,7 @@
#include "proton/event.hpp"
#include "proton/handler.hpp"
#include "proton/url.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
#include <iostream>
@@ -57,7 +57,7 @@ int main(int argc, char **argv) {
try {
proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
hello_world hw(url.path());
- proton::io::socket_engine(url, hw).run();
+ proton::io::socket::engine(url, hw).run();
return 0;
} catch (const std::exception& e) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/server.cpp b/examples/cpp/engine/server.cpp
index 794d27e..bdd1a73 100644
--- a/examples/cpp/engine/server.cpp
+++ b/examples/cpp/engine/server.cpp
@@ -22,7 +22,7 @@
#include "options.hpp"
#include "proton/connection.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
#include "proton/url.hpp"
#include "proton/event.hpp"
#include "proton/handler.hpp"
@@ -79,7 +79,7 @@ int main(int argc, char **argv) {
try {
opts.parse();
server handler(address);
- proton::io::socket_engine(address, handler).run();
+ proton::io::socket::engine(address, handler).run();
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_recv.cpp b/examples/cpp/engine/simple_recv.cpp
index 5b6cf21..a081227 100644
--- a/examples/cpp/engine/simple_recv.cpp
+++ b/examples/cpp/engine/simple_recv.cpp
@@ -21,7 +21,7 @@
#include "options.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
#include "proton/url.hpp"
#include "proton/event.hpp"
#include "proton/handler.hpp"
@@ -76,7 +76,7 @@ int main(int argc, char **argv) {
try {
opts.parse();
simple_recv handler(address, message_count);
- proton::io::socket_engine(address, handler).run();
+ proton::io::socket::engine(address, handler).run();
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_send.cpp b/examples/cpp/engine/simple_send.cpp
index 39d8939..f6c0318 100644
--- a/examples/cpp/engine/simple_send.cpp
+++ b/examples/cpp/engine/simple_send.cpp
@@ -21,7 +21,7 @@
#include "options.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
#include "proton/url.hpp"
#include "proton/event.hpp"
#include "proton/handler.hpp"
@@ -85,7 +85,7 @@ int main(int argc, char **argv) {
try {
opts.parse();
simple_send handler(address, message_count);
- proton::io::socket_engine(address, handler).run();
+ proton::io::socket::engine(address, handler).run();
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 6cfc632..d228d67 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -90,7 +90,7 @@ class Proc(Popen):
self.ready_set = True
self.ready.set()
if self.wait() != 0:
- self.error = ProcError(self)
+ raise ProcError(self)
except Exception, e:
self.error = sys.exc_info()
finally:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 26a7c94..9254a3a 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -28,7 +28,6 @@ set(qpid-proton-cpp-source
src/acceptor.cpp
src/binary.cpp
src/byte_array.cpp
- src/scalar_base.cpp
src/condition.cpp
src/connection.cpp
src/connection_options.cpp
@@ -37,21 +36,21 @@ set(qpid-proton-cpp-source
src/container_impl.cpp
src/contexts.cpp
src/data.cpp
- src/decoder.cpp
src/decimal.cpp
+ src/decoder.cpp
src/delivery.cpp
src/duration.cpp
src/encoder.cpp
src/endpoint.cpp
- src/connection_engine.cpp
src/error.cpp
+ src/handler.cpp
src/id_generator.cpp
+ src/io/connection_engine.cpp
src/link.cpp
src/link_options.cpp
src/message.cpp
src/messaging_adapter.cpp
src/messaging_event.cpp
- src/handler.cpp
src/object.cpp
src/proton_bits.cpp
src/proton_event.cpp
@@ -60,6 +59,7 @@ set(qpid-proton-cpp-source
src/receiver.cpp
src/reconnect_timer.cpp
src/sasl.cpp
+ src/scalar_base.cpp
src/sender.cpp
src/session.cpp
src/ssl.cpp
@@ -75,9 +75,9 @@ set(qpid-proton-cpp-source
)
if(MSVC)
- list(APPEND qpid-proton-cpp-source src/windows/io.cpp)
+ list(APPEND qpid-proton-cpp-source src/io/windows/socket.cpp)
else(MSVC)
- list(APPEND qpid-proton-cpp-source src/posix/io.cpp)
+ list(APPEND qpid-proton-cpp-source src/io/posix/socket.cpp)
endif(MSVC)
set_source_files_properties (
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/docs/mainpage.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mainpage.md b/proton-c/bindings/cpp/docs/mainpage.md
index f20d957..8ad34cb 100644
--- a/proton-c/bindings/cpp/docs/mainpage.md
+++ b/proton-c/bindings/cpp/docs/mainpage.md
@@ -103,10 +103,10 @@ from your handler methods.
### %proton::connection_engine
`proton::connection_engine` dispatches events for a *single
-connection*. The subclass `proton::io::socket_engine` does
+connection*. The subclass `proton::io::socket::engine` does
socket-based IO. An application with a single connection is just like
using `proton::container` except you attach your handler to a
-`proton::io::socket_engine` instead. You can compare examples, such as
+`proton::io::socket::engine` instead. You can compare examples, such as
\ref helloworld.cpp and \ref engine/helloworld.cpp.
Now consider multiple connections. `proton::container` is easy to use
@@ -124,7 +124,7 @@ platforms. The example \ref engine/broker.cpp shows a broker using
sockets and poll, but you can see how the code could be adapted.
`proton::connection_engine` also does not dictate the IO mechanism,
-but it is an abstract class. `proton::socket_engine` provides
+but it is an abstract class. `proton::socket::engine` provides
ready-made socket-based IO, but you can write your own subclass with
any IO code. Just override the `io_read`, `io_write` and `io_close`
methods. For example, the proton test suite implements an in-memory
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/docs/tutorial.dox
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/tutorial.dox b/proton-c/bindings/cpp/docs/tutorial.dox
index dcfbe05..e40a3e7 100644
--- a/proton-c/bindings/cpp/docs/tutorial.dox
+++ b/proton-c/bindings/cpp/docs/tutorial.dox
@@ -405,7 +405,7 @@ applications or applications with unusual IO requirements.
We'll look at the \ref engine/helloworld.cpp example step-by-step to see how it differs
from the container \ref helloworld.cpp version.
-First we include the `proton::io::socket_engine` class, which is a `proton::connection_engine`
+First we include the `proton::io::socket::engine` class, which is a `proton::connection_engine`
that uses socket IO.
\skipline proton/io.hpp
@@ -417,7 +417,7 @@ engine's' connection:
\skip on_start
\until }
-Our `main` function only differs in that it creates and runs a `socket_engine`
+Our `main` function only differs in that it creates and runs a `socket::engine`
instead of a `container`.
\skip main
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index da7f806..eb4e598 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -37,6 +37,10 @@ namespace proton {
class handler;
+namespace io {
+class connection_engine;
+}
+
/// A connection to a remote AMQP peer.
class
PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, public endpoint {
@@ -123,7 +127,7 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
void host(const std::string& h);
friend class connection_context;
- friend class connection_engine;
+ friend class io::connection_engine;
friend class connection_options;
friend class connector;
friend class container_impl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/connection_engine.hpp
deleted file mode 100644
index 0b1a947..0000000
--- a/proton-c/bindings/cpp/include/proton/connection_engine.hpp
+++ /dev/null
@@ -1,168 +0,0 @@
-#ifndef CONNECTION_ENGINE_HPP
-#define CONNECTION_ENGINE_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/connection.hpp"
-#include "proton/connection_options.hpp"
-#include "proton/error.hpp"
-#include "proton/export.hpp"
-#include "proton/pn_unique_ptr.hpp"
-#include "proton/types.hpp"
-
-#include <cstddef>
-#include <utility>
-#include <string>
-
-namespace proton {
-
-class connection_engine_context;
-class handler;
-class connection;
-
-// TODO aconway 2016-01-23: doc contrast with container.
-
-/// An interface for connection-oriented IO integration. A
-/// connection_engine manages a single AMQP connection. It is useful
-/// for integrating AMQP into an existing IO framework.
-///
-/// The engine provides a simple "bytes-in/bytes-out" interface. Incoming AMQP
-/// bytes from any kind of data connection are fed into the engine and processed
-/// to dispatch events to a proton::handler. The resulting AMQP output data is
-/// available from the engine and can sent back over the connection.
-///
-/// The engine does no IO of its own. It assumes a two-way flow of bytes over
-/// some externally-managed "connection". The "connection" could be a socket
-/// managed by select, poll, epoll or some other mechanism, or it could be
-/// something else such as an RDMA connection, a shared-memory buffer or a Unix
-/// pipe.
-///
-/// The application is coded the same way for engine or container: you implement
-/// proton::handler. Handlers attached to an engine will receive transport,
-/// connection, session, link and message events. They will not receive reactor,
-/// selectable or timer events, the engine assumes those are managed externally.
-///
-/// THREAD SAFETY: A single engine instance cannot be called concurrently, but
-/// different engine instances can be processed concurrently in separate threads.
-class
-PN_CPP_CLASS_EXTERN connection_engine {
- public:
- class container {
- public:
- /// Create a container with id. Default to random UUID if id
- /// == "".
- PN_CPP_EXTERN container(const std::string &id = "");
- PN_CPP_EXTERN ~container();
-
- /// Return the container-id
- PN_CPP_EXTERN std::string id() const;
-
- /// Make options to configure a new engine, using the default options.
- ///
- /// Call this once for each new engine as the options include a generated unique link_prefix.
- /// You can modify the configuration before creating the engine but you should not
- /// modify the container_id or link_prefix.
- PN_CPP_EXTERN connection_options make_options();
-
- /// Set the default options to be used for connection engines.
- /// The container will set the container_id and link_prefix when make_options is called.
- PN_CPP_EXTERN void options(const connection_options&);
-
- private:
- class impl;
- internal::pn_unique_ptr<impl> impl_;
- };
- /// Create a connection engine that dispatches to handler.
- PN_CPP_EXTERN connection_engine(handler&, const connection_options& = no_opts);
-
- PN_CPP_EXTERN virtual ~connection_engine();
-
- /// Return the number of bytes that the engine is currently ready to read.
- PN_CPP_EXTERN size_t can_read() const;
-
- /// Return the number of bytes that the engine is currently ready to write.
- PN_CPP_EXTERN size_t can_write() const;
-
- /// Combine these flags with | to indicate read, write, both or neither
- enum io_flag {
- READ = 1,
- WRITE = 2
- };
-
- /// Read, write and dispatch events.
- ///
- /// io_flags indicates whether to read, write, both or neither.
- /// Dispatches all events generated by reading or writing.
- /// Use closed() to check if the engine is closed after processing.
- ///
- /// @throw exceptions thrown by the engines handler or the IO adapter.
- PN_CPP_EXTERN void process(int io_flags=READ|WRITE);
-
- /// True if the engine is closed, meaning there are no further
- /// events to process and close_io has been called. Call
- /// error_str() to get an error description.
- PN_CPP_EXTERN bool closed() const;
-
- /// Get the AMQP connection associated with this connection_engine.
- PN_CPP_EXTERN class connection connection() const;
-
- /// Thrown by io_read and io_write functions to indicate an error.
- struct PN_CPP_CLASS_EXTERN io_error : public error {
- PN_CPP_EXTERN explicit io_error(const std::string&); ///< Construct with message
- };
-
- protected:
- /// Do a non-blocking read on the IO stream.
- ///
- ///@return pair(size, true) if size bytes were read.
- /// size==0 means no data could be read without blocking, the stream is still open.
- /// Returns pair(0, false) if the stream closed.
- ///
- ///@throw proton::connection_engine::io_error if there is a read error.
- virtual std::pair<size_t, bool> io_read(char* buf, size_t max) = 0;
-
- /// Do a non-blocking write of up to max bytes from buf.
- ///
- /// Return the number of byes written , 0 if no data could be written
- /// without blocking.
- ///
- ///throw proton::connection_engine::io_error if there is a write error.
- virtual size_t io_write(const char*, size_t) = 0;
-
- /// Close the io, no more _io methods will be called after this is called.
- virtual void io_close() = 0;
-
- PN_CPP_EXTERN static const connection_options no_opts;
-
- private:
- connection_engine(const connection_engine&);
- connection_engine& operator=(const connection_engine&);
-
- void dispatch();
- void try_read();
- void try_write();
-
- class connection connection_;
- connection_engine_context* ctx_;
-};
-
-}
-
-#endif // CONNECTION_ENGINE_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 1a22b73..d2efab9 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -40,6 +40,10 @@ namespace proton {
class proton_handler;
class connection;
+namespace io {
+class connection_engine;
+}
+
/// Options for creating a connection.
///
/// Options can be "chained" like this:
@@ -149,7 +153,7 @@ class connection_options {
friend class container_impl;
friend class connector;
- friend class connection_engine;
+ friend class io::connection_engine;
/// @endcond
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index 6ea11d4..3086037 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -32,6 +32,10 @@ class condition;
class event;
class messaging_adapter;
+namespace io {
+class connection_engine;
+}
+
/// Callback functions for handling proton events.
///
/// Subclass and override event-handling member functions.
@@ -131,7 +135,7 @@ PN_CPP_CLASS_EXTERN handler
internal::pn_unique_ptr<messaging_adapter> messaging_adapter_;
friend class container;
- friend class connection_engine;
+ friend class io::connection_engine;
friend class connection_options;
friend class link_options;
/// @endcond
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/io.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io.hpp b/proton-c/bindings/cpp/include/proton/io.hpp
deleted file mode 100644
index 9c63edb..0000000
--- a/proton-c/bindings/cpp/include/proton/io.hpp
+++ /dev/null
@@ -1,134 +0,0 @@
-#ifndef SOCKET_IO_HPP
-#define SOCKET_IO_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/connection_engine.hpp>
-#include <proton/url.hpp>
-
-namespace proton {
-
-///@details
-/// IO using sockets, file descriptors, or handles, for use with
-/// proton::connection_engine.
-///
-/// Note that you can use proton::connection_engine to communicate using AMQP
-/// over your own IO implementation or to integrate an existing IO framework of
-/// your choice, this implementation is provided as a convenience if sockets is
-/// sufficient for your needs.
-namespace io {
-
-/// @name Setup and teardown
-///
-/// Call proton::io::initialize before using any functions in the
-/// proton::io namespace. Call proton::io::finalize when you are
-/// done.
-///
-/// You can call initialize/finalize more than once as long as they
-/// are in matching pairs. Use proton::io::guard to call
-/// initialize/finalize around a scope.
-///
-/// Note that on POSIX systems these are no-ops, but they are required
-/// for Windows.
-///
-/// @{
-
-/// Initialize the proton::io subsystem.
-PN_CPP_EXTERN void initialize();
-
-/// Finalize the proton::io subsystem.
-PN_CPP_EXTERN void finalize(); // nothrow
-
-/// Use to call io::initialize and io::finalize around a scope.
-struct guard {
- guard() { initialize(); }
- ~guard() { finalize(); }
-};
-
-/// @}
-
-/// An IO resource.
-typedef int64_t descriptor;
-
-/// An invalid descriptor.
-PN_CPP_EXTERN extern const descriptor INVALID_DESCRIPTOR;
-
-/// Return a string describing the most recent IO error.
-PN_CPP_EXTERN std::string error_str();
-
-/// Open a TCP connection to the host:port (port can be a service name or number) from a proton::url.
-PN_CPP_EXTERN descriptor connect(const proton::url&);
-
-/// Listening socket.
-class listener {
- public:
- /// Listen on host/port. Empty host means listen on all interfaces.
- /// port can be a service name or number
- PN_CPP_EXTERN listener(const std::string& host, const std::string& port);
- PN_CPP_EXTERN ~listener();
-
- /// Accept a connection. Return the descriptor, set host, port to the remote address.
- /// port can be a service name or number.
- PN_CPP_EXTERN descriptor accept(std::string& host, std::string& port);
-
- /// Accept a connection, does not provide address info.
- descriptor accept() { std::string dummy; return accept(dummy, dummy); }
-
- /// Convert to descriptor
- descriptor socket() const { return socket_; }
-
- private:
- guard guard_;
- listener(const listener&);
- listener& operator=(const listener&);
- descriptor socket_;
-};
-
-/// A connection_engine for socket-based IO.
-class socket_engine : public connection_engine {
- public:
- /// Wrap an open socket. Sets non-blocking mode.
- PN_CPP_EXTERN socket_engine(descriptor socket_, handler&, const connection_options& = no_opts);
-
- /// Create socket engine connected to url.
- PN_CPP_EXTERN socket_engine(const url&, handler&, const connection_options& = no_opts);
-
- PN_CPP_EXTERN ~socket_engine();
-
- /// Get the socket descriptor.
- descriptor socket() const { return socket_; }
-
- /// Start the engine.
- PN_CPP_EXTERN void run();
-
- protected:
- PN_CPP_EXTERN std::pair<size_t, bool> io_read(char* buf, size_t max);
- PN_CPP_EXTERN size_t io_write(const char*, size_t);
- PN_CPP_EXTERN void io_close();
-
- private:
- void init();
- guard guard_;
- descriptor socket_;
-};
-
-}} // proton::io
-
-#endif // SOCKET_IO_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
new file mode 100644
index 0000000..3192d44
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
@@ -0,0 +1,182 @@
+#ifndef CONNECTION_ENGINE_HPP
+#define CONNECTION_ENGINE_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/condition.hpp"
+#include "proton/connection.hpp"
+#include "proton/connection_options.hpp"
+#include "proton/error.hpp"
+#include "proton/export.hpp"
+#include "proton/pn_unique_ptr.hpp"
+#include "proton/transport.hpp"
+#include "proton/types.hpp"
+
+#include <cstddef>
+#include <utility>
+#include <string>
+
+struct pn_collector_t;
+
+namespace proton {
+
+class handler;
+
+/// Contains classes to integrate proton into different IO and threading environments.
+namespace io {
+
+///@cond INTERNAL
+class connection_engine_context;
+///
+
+/// Pointer to a mutable memory region with a size.
+struct mutable_buffer {
+ char* data;
+ size_t size;
+
+ mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// Pointer to a const memory region with a size.
+struct const_buffer {
+ const char* data;
+ size_t size;
+
+ const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// A protocol engine to integrate AMQP into any IO or concurrency framework.
+///
+/// io::connection_engine manages a single proton::connection and dispatches
+/// events to a proton::handler. It does no IO of its own, but allows you to
+/// integrate AMQP protocol handling into any IO or concurrency framework.
+///
+/// The application is coded the same way as for the proton::container. The
+/// application implements a proton::handler to respond to transport,
+/// connection, session, link and message events. With a little care, the same
+/// handler classes can be used for both container and connection_engine, the
+/// \ref broker.cpp example illustrates this.
+///
+/// You need to write the IO code to read AMQP data to the read_buffer(). The
+/// engine parses the AMQP frames. dispatch() calls the appropriate functions on
+/// the applications proton::handler. You write output data from the engines
+/// write_buffer() to your IO.
+///
+/// The engine is not safe for concurrent use, but you can process different
+/// engines concurrently. A common pattern for high-performance servers is to
+/// serialize read/write activity per-connection and dispatch in a fixed-size
+/// thread pool.
+///
+/// The engine is designed to work with a classic reactor (e.g. select, poll,
+/// epoll) or an async-request driven proactor (e.g. windows completion ports,
+/// boost.asio, libuv etc.)
+///
+class
+PN_CPP_CLASS_EXTERN connection_engine {
+ public:
+ // TODO aconway 2016-03-18: this will change
+ class container {
+ public:
+ /// Create a container with id. Default to random UUID.
+ PN_CPP_EXTERN container(const std::string &id = "");
+ PN_CPP_EXTERN ~container();
+
+ /// Return the container-id
+ PN_CPP_EXTERN std::string id() const;
+
+ /// Make options to configure a new engine, using the default options.
+ ///
+ /// Call this once for each new engine as the options include a generated unique link_prefix.
+ /// You can modify the configuration before creating the engine but you should not
+ /// modify the container_id or link_prefix.
+ PN_CPP_EXTERN connection_options make_options();
+
+ /// Set the default options to be used for connection engines.
+ /// The container will set the container_id and link_prefix when make_options is called.
+ PN_CPP_EXTERN void options(const connection_options&);
+
+ private:
+ class impl;
+ internal::pn_unique_ptr<impl> impl_;
+ };
+
+ /// Create a connection engine that dispatches to handler.
+ PN_CPP_EXTERN connection_engine(handler&, const connection_options& = connection_options());
+
+ PN_CPP_EXTERN virtual ~connection_engine();
+
+ /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
+ /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
+ /// Calling dispatch() may open up more buffer space.
+ PN_CPP_EXTERN mutable_buffer read_buffer();
+
+ /// Indicate that the first n bytes of read_buffer() have valid data.
+ /// This changes the buffer, call read_buffer() to get the updated buffer.
+ PN_CPP_EXTERN void read_done(size_t n);
+
+ /// Indicate that the read side of the transport is closed and no more data will be read.
+ PN_CPP_EXTERN void read_close();
+
+ /// The engine's write buffer. Write data from this buffer then call write_done()
+ /// Returns const_buffer(0, 0) if the engine has nothing to write.
+ /// Calling dispatch() may generate more data in the write buffer.
+ PN_CPP_EXTERN const_buffer write_buffer() const;
+
+ /// Indicate that the first n bytes of write_buffer() have been written successfully.
+ /// This changes the buffer, call write_buffer() to get the updated buffer.
+ PN_CPP_EXTERN void write_done(size_t n);
+
+ /// Indicate that the write side of the transport has closed and no more data will be written.
+ PN_CPP_EXTERN void write_close();
+
+ /// Indicate that the transport has closed with an error condition.
+ /// This calls both read_close() and write_close().
+ /// The error condition will be passed to handler::on_transport_error()
+ PN_CPP_EXTERN void close(const std::string& name, const std::string& description);
+
+ /// Dispatch all available events and call the corresponding \ref handler methods.
+ ///
+ /// Returns true if the engine is still active, false if it is finished and
+ /// can be destroyed. The engine is finished when either of the following is
+ /// true:
+ ///
+ /// - both read_close() and write_close() have been called, no more IO is possible.
+ /// - The AMQP connection() is closed AND write_buffer() is empty.
+ ///
+ /// May expand the read_buffer() and/or the write_buffer().
+ ///
+ /// @throw any exceptions thrown by the \ref handler.
+ PN_CPP_EXTERN bool dispatch();
+
+ /// Get the AMQP connection associated with this connection_engine.
+ PN_CPP_EXTERN proton::connection connection() const;
+
+ private:
+ connection_engine(const connection_engine&);
+ connection_engine& operator=(const connection_engine&);
+
+ proton::handler& handler_;
+ proton::connection connection_;
+ proton::transport transport_;
+ proton::internal::pn_ptr<pn_collector_t> collector_;
+};
+}}
+
+#endif // CONNECTION_ENGINE_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/io/socket.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/socket.hpp b/proton-c/bindings/cpp/include/proton/io/socket.hpp
new file mode 100644
index 0000000..a43c0ae
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/socket.hpp
@@ -0,0 +1,130 @@
+#ifndef PROTON_IO_IO_HPP
+#define PROTON_IO_IO_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/io/connection_engine.hpp>
+#include <proton/url.hpp>
+
+
+namespace proton {
+namespace io {
+namespace socket {
+
+struct
+PN_CPP_CLASS_EXTERN io_error : public proton::error {
+ PN_CPP_EXTERN explicit io_error(const std::string&); ///< Construct with message
+};
+
+/// @name Setup and teardown
+///
+/// Call initialize() before using any functions in the proton::io::socket
+/// namespace. Call finalize() when you are done.
+///
+/// You can call initialize/finalize more than once as long as they are in
+/// matching pairs. Use \ref guard to call initialize/finalize around a scope.
+///
+/// Note that on POSIX systems these are no-ops, but they are required
+/// for Windows.
+///
+/// @{
+
+/// Initialize the proton::io subsystem.
+PN_CPP_EXTERN void initialize();
+
+/// Finalize the proton::io subsystem.
+PN_CPP_EXTERN void finalize(); // nothrow
+
+/// Use to call io::initialize and io::finalize around a scope.
+struct guard {
+ guard() { initialize(); }
+ ~guard() { finalize(); }
+};
+
+/// @}
+
+/// An IO resource.
+typedef int64_t descriptor;
+
+/// An invalid descriptor.
+PN_CPP_EXTERN extern const descriptor INVALID_DESCRIPTOR;
+
+/// Return a string describing the most recent IO error.
+PN_CPP_EXTERN std::string error_str();
+
+/// Open a TCP connection to the host:port (port can be a service name or number) from a proton::url.
+PN_CPP_EXTERN descriptor connect(const proton::url&);
+
+/// Listening socket.
+class listener {
+ public:
+ /// Listen on host/port. Empty host means listen on all interfaces.
+ /// port can be a service name or number
+ PN_CPP_EXTERN listener(const std::string& host, const std::string& port);
+ PN_CPP_EXTERN ~listener();
+
+ /// Accept a connection. Return the descriptor, set host, port to the remote address.
+ /// port can be a service name or number.
+ PN_CPP_EXTERN descriptor accept(std::string& host, std::string& port);
+
+ /// Accept a connection, does not provide address info.
+ descriptor accept() { std::string dummy; return accept(dummy, dummy); }
+
+ /// Convert to descriptor
+ descriptor socket() const { return socket_; }
+
+ private:
+ guard guard_;
+ listener(const listener&);
+ listener& operator=(const listener&);
+ descriptor socket_;
+};
+
+/// A \ref connection_engine with non-blocking socket IO.
+class engine : public connection_engine {
+ public:
+ /// Wrap an open socket. Sets non-blocking mode.
+ PN_CPP_EXTERN engine(descriptor socket_, handler&, const connection_options& = connection_options());
+
+ /// Create socket engine connected to url.
+ PN_CPP_EXTERN engine(const url&, handler&, const connection_options& = connection_options());
+
+ PN_CPP_EXTERN ~engine();
+
+ /// Run the engine until it closes
+ PN_CPP_EXTERN void run();
+
+ /// Non-blocking read from socket to engine
+ PN_CPP_EXTERN void read();
+
+ /// Non-blocking write from engine to socket
+ PN_CPP_EXTERN void write();
+
+ descriptor socket() const { return socket_; }
+
+ private:
+ void init();
+ guard guard_;
+ descriptor socket_;
+};
+
+}}}
+
+#endif /*!PROTON_IO_IO_HPP*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/transport.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp b/proton-c/bindings/cpp/include/proton/transport.hpp
index 9e32ac5..bebc974 100644
--- a/proton-c/bindings/cpp/include/proton/transport.hpp
+++ b/proton-c/bindings/cpp/include/proton/transport.hpp
@@ -34,6 +34,10 @@ class connection;
class condition;
class sasl;
+namespace io {
+class connection_engine;
+}
+
class transport : public internal::object<pn_transport_t> {
/// @cond INTERNAL
transport(pn_transport_t* t) : internal::object<pn_transport_t>(t) {}
@@ -71,6 +75,7 @@ class transport : public internal::object<pn_transport_t> {
friend class connection_options;
friend class connector;
friend class proton_event;
+ friend class io::connection_engine;
/// @endcond
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_engine.cpp b/proton-c/bindings/cpp/src/connection_engine.cpp
deleted file mode 100644
index be9efeb..0000000
--- a/proton-c/bindings/cpp/src/connection_engine.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/connection_engine.hpp"
-#include "proton/error.hpp"
-#include "proton/handler.hpp"
-#include "proton/uuid.hpp"
-
-#include "contexts.hpp"
-#include "id_generator.hpp"
-#include "messaging_adapter.hpp"
-#include "messaging_event.hpp"
-#include "msg.hpp"
-#include "proton_bits.hpp"
-#include "proton_bits.hpp"
-
-#include <proton/connection.h>
-#include <proton/transport.h>
-#include <proton/event.h>
-
-#include <algorithm>
-
-#include <iosfwd>
-
-namespace proton {
-
-namespace {
-void set_error(connection_engine_context *ctx_, const std::string& reason) {
- pn_condition_t *c = pn_transport_condition(ctx_->transport);
- pn_condition_set_name(c, "io_error");
- pn_condition_set_description(c, reason.c_str());
-}
-
-void close_transport(connection_engine_context *ctx_) {
- if (pn_transport_pending(ctx_->transport) >= 0)
- pn_transport_close_head(ctx_->transport);
- if (pn_transport_capacity(ctx_->transport) >= 0)
- pn_transport_close_tail(ctx_->transport);
-}
-
-std::string make_id(const std::string s="") { return s.empty() ? uuid::random().str() : s; }
-}
-
-connection_engine::io_error::io_error(const std::string& msg) : error(msg) {}
-
-class connection_engine::container::impl {
- public:
- impl(const std::string s="") : id_(make_id(s)) {}
-
- const std::string id_;
- id_generator id_gen_;
- connection_options options_;
-};
-
-connection_engine::container::container(const std::string& s) : impl_(new impl(s)) {}
-
-connection_engine::container::~container() {}
-
-std::string connection_engine::container::id() const { return impl_->id_; }
-
-connection_options connection_engine::container::make_options() {
- connection_options opts = impl_->options_;
- opts.container_id(id()).link_prefix(impl_->id_gen_.next()+"/");
- return opts;
-}
-
-void connection_engine::container::options(const connection_options &opts) {
- impl_->options_ = opts;
-}
-
-connection_engine::connection_engine(class handler &h, const connection_options& opts) {
- connection_ = proton::connection(internal::take_ownership(pn_connection()).get());
- internal::pn_ptr<pn_transport_t> transport = internal::take_ownership(pn_transport());
- internal::pn_ptr<pn_collector_t> collector = internal::take_ownership(pn_collector());
- if (!connection_ || !transport || !collector)
- throw proton::error("engine create");
- int err = pn_transport_bind(transport.get(), connection_.pn_object());
- if (err)
- throw error(msg() << "transport bind:" << pn_code(err));
- pn_connection_collect(connection_.pn_object(), collector.get());
-
- ctx_ = &connection_engine_context::get(connection_); // Creates context
- ctx_->engine_handler = &h;
- ctx_->transport = transport.release();
- ctx_->collector = collector.release();
- opts.apply(connection_);
- // Provide defaults for connection_id and link_prefix if not set.
- std::string cid = connection_.container_id();
- if (cid.empty()) {
- cid = make_id();
- pn_connection_set_container(connection_.pn_object(), cid.c_str());
- }
- id_generator &link_gen = connection_context::get(connection_).link_gen;
- if (link_gen.prefix().empty()) {
- link_gen.prefix(make_id()+"/");
- }
-}
-
-connection_engine::~connection_engine() {
- pn_transport_unbind(ctx_->transport);
- pn_transport_free(ctx_->transport);
- internal::pn_ptr<pn_connection_t> c(connection_.pn_object());
- connection_ = proton::connection();
- pn_connection_free(c.release());
- pn_collector_free(ctx_->collector);
-}
-
-void connection_engine::process(int flags) {
- if (closed()) return;
- if (flags & WRITE) try_write();
- dispatch();
- if (flags & READ) try_read();
- dispatch();
-
- if (connection_.closed() && !closed()) {
- dispatch();
- while (can_write()) {
- try_write(); // Flush final data.
- }
- // no transport errors.
- close_transport(ctx_);
- }
- if (closed()) {
- pn_transport_unbind(ctx_->transport);
- dispatch();
- try { io_close(); } catch(const io_error&) {} // Tell the IO to close.
- }
-}
-
-void connection_engine::dispatch() {
- proton_handler& h = *ctx_->engine_handler->messaging_adapter_;
- pn_collector_t* c = ctx_->collector;
- for (pn_event_t *e = pn_collector_peek(c); e; e = pn_collector_peek(c)) {
- if (pn_event_type(e) == PN_CONNECTION_INIT) {
- // Make the messaging_adapter issue a START event.
- proton_event(e, PN_REACTOR_INIT, 0).dispatch(h);
- }
- proton_event(e, pn_event_type(e), 0).dispatch(h);
- pn_collector_pop(c);
- }
-}
-
-size_t connection_engine::can_read() const {
- return std::max(ssize_t(0), pn_transport_capacity(ctx_->transport));
-}
-
-void connection_engine::try_read() {
- size_t max = can_read();
- if (max == 0) return;
- try {
- std::pair<size_t, bool> r = io_read(pn_transport_tail(ctx_->transport), max);
- if (r.second) {
- if (r.first > max)
- throw io_error(msg() << "read invalid size: " << r.first << ">" << max);
- pn_transport_process(ctx_->transport, r.first);
- } else {
- pn_transport_close_tail(ctx_->transport);
- }
- } catch (const io_error& e) {
- set_error(ctx_, e.what());
- pn_transport_close_tail(ctx_->transport);
- }
-}
-
-size_t connection_engine::can_write() const {
- return std::max(ssize_t(0), pn_transport_pending(ctx_->transport));
-}
-
-void connection_engine::try_write() {
- size_t max = can_write();
- if (max == 0) return;
- try {
- size_t n = io_write(pn_transport_head(ctx_->transport), max);
- if (n > max) {
- throw io_error(msg() << "write invalid size: " << n << " > " << max);
- }
- pn_transport_pop(ctx_->transport, n);
- } catch (const io_error& e) {
- set_error(ctx_, e.what());
- pn_transport_close_head(ctx_->transport);
- }
-}
-
-bool connection_engine::closed() const {
- return pn_transport_closed(ctx_->transport);
-}
-
-connection connection_engine::connection() const { return connection_.pn_object(); }
-
-const connection_options connection_engine::no_opts;
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index 03271a1..a60c1fa 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -26,7 +26,7 @@
#include "proton/message.hpp"
#include "proton/connection.hpp"
#include "proton/container.hpp"
-#include "proton/connection_engine.hpp"
+#include "proton/io/connection_engine.hpp"
#include "id_generator.hpp"
#include "proton_handler.hpp"
@@ -98,19 +98,6 @@ class connection_context : public context {
static context::id id(const connection& c) { return id(c.pn_object()); }
};
-// Connection context with information used by the connection_engine.
-class connection_engine_context : public connection_context {
- public:
- connection_engine_context() : engine_handler(0), transport(0), collector(0) {}
-
- class handler *engine_handler;
- pn_transport_t *transport;
- pn_collector_t *collector;
- static connection_engine_context& get(const connection &c) {
- return ref<connection_engine_context>(id(c));
- }
-};
-
void container_context(const reactor&, container&);
class container_context {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/engine_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index 74a7a6a..afae6c2 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -20,7 +20,7 @@
#include "test_bits.hpp"
#include <proton/uuid.hpp>
-#include <proton/connection_engine.hpp>
+#include <proton/io/connection_engine.hpp>
#include <proton/handler.hpp>
#include <proton/event.hpp>
#include <proton/types_fwd.hpp>
@@ -32,167 +32,209 @@
#define override
#endif
+using namespace proton::io;
using namespace proton;
using namespace test;
+using namespace std;
-// One end of an in-memory connection
-struct mem_pipe {
- mem_pipe(std::deque<char>& r, std::deque<char>& w) : read(r), write(w) {}
- std::deque<char> &read, &write;
-};
+typedef std::deque<char> byte_stream;
-struct mem_queues : public std::pair<std::deque<char>, std::deque<char> > {
- mem_pipe a() { return mem_pipe(first, second); }
- mem_pipe b() { return mem_pipe(second, first); }
-};
+/// In memory connection_engine that reads and writes from byte_streams
+struct in_memory_engine : public connection_engine {
-// In memory connection_engine
-struct mem_engine : public connection_engine {
- mem_pipe socket;
- std::string read_error;
- std::string write_error;
-
- mem_engine(mem_pipe s, handler &h, const connection_options &opts)
- : connection_engine(h, opts), socket(s) {}
-
- std::pair<size_t, bool> io_read(char* buf, size_t size) override {
- if (!read_error.empty()) throw io_error(read_error);
- size = std::min(socket.read.size(), size);
- copy(socket.read.begin(), socket.read.begin()+size, buf);
- socket.read.erase(socket.read.begin(), socket.read.begin()+size);
- return std::make_pair(size, true);
- }
+ byte_stream& reads;
+ byte_stream& writes;
- size_t io_write(const char* buf, size_t size) override {
- if (!write_error.empty()) throw io_error(write_error);
- socket.write.insert(socket.write.begin(), buf, buf+size);
- return size;
- }
+ in_memory_engine(byte_stream& rd, byte_stream& wr, handler &h,
+ const connection_options &opts = connection_options()) :
+ connection_engine(h, opts), reads(rd), writes(wr) {}
- void io_close() override {
- read_error = write_error = "closed";
+ void do_read() {
+ mutable_buffer rbuf = read_buffer();
+ size_t size = std::min(reads.size(), rbuf.size);
+ if (size) {
+ copy(reads.begin(), reads.begin()+size, static_cast<char*>(rbuf.data));
+ read_done(size);
+ reads.erase(reads.begin(), reads.begin()+size);
+ }
}
-};
-struct debug_handler : handler {
- void on_unhandled(event& e) override {
- std::cout << e.name() << std::endl;
+ void do_write() {
+ const_buffer wbuf = write_buffer();
+ if (wbuf.size) {
+ writes.insert(writes.begin(),
+ static_cast<const char*>(wbuf.data),
+ static_cast<const char*>(wbuf.data) + wbuf.size);
+ write_done(wbuf.size);
+ }
}
-};
-struct record_handler : handler {
- std::deque<std::string> events;
- void on_unhandled(event& e) override {
- events.push_back(e.name());
- }
+ void process() { do_read(); do_write(); dispatch(); }
};
-template <class HA=record_handler, class HB=record_handler> struct engine_pair {
+/// A pair of engines that talk to each other in-memory.
+struct engine_pair {
+ byte_stream ab, ba;
connection_engine::container cont;
- mem_queues queues;
- HA ha;
- HB hb;
- mem_engine a, b;
- engine_pair() : a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options()) {}
- engine_pair(const std::string& id)
- : cont(id), a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options())
- {}
- engine_pair(const connection_options &aopts, connection_options &bopts)
- : a(queues.a(), ha, aopts), b(queues.b(), hb, bopts)
- {}
- void process() { a.process(); b.process(); }
-};
+ in_memory_engine a, b;
-void test_process_amqp() {
- engine_pair<> e;
+ engine_pair(handler& ha, handler& hb,
+ const connection_options& ca = connection_options(),
+ const connection_options& cb = connection_options()) :
+ a(ba, ab, ha, ca), b(ab, ba, hb, cb) {}
- e.a.process(connection_engine::READ); // Don't write unlesss writable
- ASSERT(e.a.socket.write.empty());
- e.a.process(connection_engine::WRITE);
-
- std::string wrote(e.a.socket.write.begin(), e.a.socket.write.end());
- e.a.process(connection_engine::WRITE);
- ASSERT_EQUAL(8, wrote.size());
- ASSERT_EQUAL("AMQP", wrote.substr(0,4));
+ void process() { a.process(); b.process(); }
+};
- e.b.process(); // Read and write AMQP
- ASSERT_EQUAL("AMQP", std::string(e.b.socket.write.begin(), e.b.socket.write.begin()+4));
- ASSERT(e.b.socket.read.empty());
- ASSERT(e.a.socket.write.empty());
- ASSERT_EQUAL(many<std::string>() + "START", e.ha.events);
+template <class S> typename S::value_type quick_pop(S& s) {
+ ASSERT(!s.empty());
+ typename S::value_type x = s.front();
+ s.pop_front();
+ return x;
}
-
-struct link_handler : public record_handler {
+/// A handler that records incoming endpoints, errors etc.
+struct record_handler : public handler {
std::deque<proton::link> links;
+ std::deque<proton::session> sessions;
+ std::deque<std::string> errors;
+
void on_link_open(event& e) override {
links.push_back(e.link());
}
- proton::link pop() {
- proton::link l;
- if (!links.empty()) {
- l = links.front();
- links.pop_front();
- }
- return l;
+ void on_session_open(event& e) {
+ sessions.push_back(e.session());
+ }
+
+ void on_unhandled_error(event& e, const condition& c) {
+ errors.push_back(e.name() + "/" + c.what());
}
};
void test_engine_prefix() {
// Set container ID and prefix explicitly
- engine_pair<link_handler, link_handler> e(
- connection_options().container_id("a").link_prefix("x/"),
- connection_options().container_id("b").link_prefix("y/"));
+ record_handler ha, hb;
+ engine_pair e(ha, hb,
+ connection_options().container_id("a").link_prefix("x/"),
+ connection_options().container_id("b").link_prefix("y/"));
e.a.connection().open();
ASSERT_EQUAL("a", e.a.connection().container_id());
e.b.connection().open();
ASSERT_EQUAL("b", e.b.connection().container_id());
- e.a.connection().open_sender("");
- while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
- ASSERT_EQUAL("x/1", e.ha.pop().name());
- ASSERT_EQUAL("x/1", e.hb.pop().name());
+ e.a.connection().open_sender("x");
+ while (ha.links.empty() || hb.links.empty()) e.process();
+ ASSERT_EQUAL("x/1", quick_pop(ha.links).name());
+ ASSERT_EQUAL("x/1", quick_pop(hb.links).name());
e.a.connection().open_receiver("");
- while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
- ASSERT_EQUAL("x/2", e.ha.pop().name());
- ASSERT_EQUAL("x/2", e.hb.pop().name());
+ while (ha.links.empty() || hb.links.empty()) e.process();
+ ASSERT_EQUAL("x/2", quick_pop(ha.links).name());
+ ASSERT_EQUAL("x/2", quick_pop(hb.links).name());
e.b.connection().open_receiver("");
- while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
- ASSERT_EQUAL("y/1", e.ha.pop().name());
- ASSERT_EQUAL("y/1", e.hb.pop().name());
+ while (ha.links.empty() || hb.links.empty()) e.process();
+ ASSERT_EQUAL("y/1", quick_pop(ha.links).name());
+ ASSERT_EQUAL("y/1", quick_pop(hb.links).name());
}
void test_container_prefix() {
/// Let the container set the options.
- engine_pair<link_handler, link_handler> e;
+ record_handler ha, hb;
+ connection_engine::container ca("a"), cb("b");
+ engine_pair e(ha, hb, ca.make_options(), cb.make_options());
+
+ ASSERT_EQUAL("a", e.a.connection().container_id());
+ ASSERT_EQUAL("b", e.b.connection().container_id());
+
e.a.connection().open();
+ sender s = e.a.connection().open_sender("x");
+ ASSERT_EQUAL("1/1", s.name());
- e.a.connection().open_sender("x");
- while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
- ASSERT_EQUAL("1/1", e.ha.pop().name());
- ASSERT_EQUAL("1/1", e.hb.pop().name());
+ while (ha.links.empty() || hb.links.empty()) e.process();
+
+ ASSERT_EQUAL("1/1", quick_pop(ha.links).name());
+ ASSERT_EQUAL("1/1", quick_pop(hb.links).name());
e.a.connection().open_receiver("y");
- while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
- ASSERT_EQUAL("1/2", e.ha.pop().name());
- ASSERT_EQUAL("1/2", e.hb.pop().name());
+ while (ha.links.empty() || hb.links.empty()) e.process();
+ ASSERT_EQUAL("1/2", quick_pop(ha.links).name());
+ ASSERT_EQUAL("1/2", quick_pop(hb.links).name());
- e.b.connection().open_receiver("z");
- while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
- ASSERT_EQUAL("2/1", e.ha.pop().name());
- ASSERT_EQUAL("2/1", e.hb.pop().name());
+ // Open a second connection in each container, make sure links have different IDs.
+ record_handler ha2, hb2;
+ engine_pair e2(ha2, hb2, ca.make_options(), cb.make_options());
- // TODO aconway 2016-01-22: check we respect name set in linkn-options.
+ ASSERT_EQUAL("a", e2.a.connection().container_id());
+ ASSERT_EQUAL("b", e2.b.connection().container_id());
+
+ e2.b.connection().open();
+ receiver r = e2.b.connection().open_receiver("z");
+ ASSERT_EQUAL("2/1", r.name());
+
+ while (ha2.links.empty() || hb2.links.empty()) e2.process();
+
+ ASSERT_EQUAL("2/1", quick_pop(ha2.links).name());
+ ASSERT_EQUAL("2/1", quick_pop(hb2.links).name());
};
+void test_endpoint_close() {
+ // Make sure conditions are sent to the remote end.
+
+ // FIXME aconway 2016-03-22: re-enable these tests when we can set error conditions.
+
+ // record_handler ha, hb;
+ // engine_pair e(ha, hb);
+ // e.a.connection().open();
+ // e.a.connection().open_sender("x");
+ // e.a.connection().open_receiver("y");
+ // while (ha.links.size() < 2 || hb.links.size() < 2) e.process();
+ // link ax = quick_pop(ha.links), ay = quick_pop(ha.links);
+ // link bx = quick_pop(hb.links), by = quick_pop(hb.links);
+
+ // // Close a link
+ // ax.close(condition("err", "foo bar"));
+ // while (!(bx.state() & endpoint::REMOTE_CLOSED)) e.process();
+ // condition c = bx.remote_condition();
+ // ASSERT_EQUAL("err", c.name());
+ // ASSERT_EQUAL("foo bar", c.description());
+ // ASSERT_EQUAL("err: foo bar", ax.local_condition().what());
+
+ // // Close a link with an empty condition
+ // ay.close(condition());
+ // while (!(by.state() & endpoint::REMOTE_CLOSED)) e.process();
+ // ASSERT(by.remote_condition().empty());
+
+ // // Close a connection
+ // connection ca = e.a.connection(), cb = e.b.connection();
+ // ca.close(condition("conn", "bad connection"));
+ // while (!cb.closed()) e.process();
+ // ASSERT_EQUAL("conn: bad connection", cb.remote_condition().what());
+}
+
+void test_transport_close() {
+ // Make sure conditions are sent to the remote end.
+ record_handler ha, hb;
+ engine_pair e(ha, hb);
+ e.a.connection().open();
+ while (!e.a.connection().state() & endpoint::REMOTE_ACTIVE) e.process();
+
+ e.a.close("oops", "engine failure");
+ // Closed but we still have output data to flush so a.dispatch() is true.
+ ASSERT(e.a.dispatch());
+ while (!e.b.connection().closed()) e.process();
+ ASSERT_EQUAL(1, hb.errors.size());
+ ASSERT_EQUAL("trasport_error/oops: engine failure", hb.errors.front());
+ ASSERT_EQUAL("oops", e.b.connection().remote_condition().name());
+ ASSERT_EQUAL("engine failure", e.b.connection().remote_condition().description());
+}
+
int main(int, char**) {
int failed = 0;
- RUN_TEST(failed, test_process_amqp());
RUN_TEST(failed, test_engine_prefix());
RUN_TEST(failed, test_container_prefix());
+ RUN_TEST(failed, test_endpoint_close());
return failed;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
new file mode 100644
index 0000000..5d8e5cc
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/io/connection_engine.hpp"
+#include "proton/error.hpp"
+#include "proton/handler.hpp"
+#include "proton/uuid.hpp"
+
+#include "contexts.hpp"
+#include "id_generator.hpp"
+#include "messaging_adapter.hpp"
+#include "messaging_event.hpp"
+#include "msg.hpp"
+#include "proton_bits.hpp"
+#include "proton_bits.hpp"
+
+#include <proton/connection.h>
+#include <proton/transport.h>
+#include <proton/event.h>
+
+#include <algorithm>
+
+#include <iosfwd>
+
+#include <assert.h>
+
+namespace proton {
+namespace io {
+
+namespace {
+std::string make_id(const std::string s="") {
+ return s.empty() ? uuid::random().str() : s;
+}
+}
+
+class connection_engine::container::impl {
+ public:
+ impl(const std::string s="") : id_(make_id(s)) {}
+
+ const std::string id_;
+ id_generator id_gen_;
+ connection_options options_;
+};
+
+connection_engine::container::container(const std::string& s) : impl_(new impl(s)) {}
+
+connection_engine::container::~container() {}
+
+std::string connection_engine::container::id() const { return impl_->id_; }
+
+connection_options connection_engine::container::make_options() {
+ connection_options opts = impl_->options_;
+ opts.container_id(id()).link_prefix(impl_->id_gen_.next()+"/");
+ return opts;
+}
+
+void connection_engine::container::options(const connection_options &opts) {
+ impl_->options_ = opts;
+}
+
+connection_engine::connection_engine(class handler &h, const connection_options& opts) :
+ handler_(h),
+ connection_(internal::take_ownership(pn_connection()).get()),
+ transport_(internal::take_ownership(pn_transport()).get()),
+ collector_(internal::take_ownership(pn_collector()).get())
+{
+ if (!connection_ || !transport_ || !collector_)
+ throw proton::error("engine create");
+ transport_.bind(connection_);
+ pn_connection_collect(connection_.pn_object(), collector_.get());
+ opts.apply(connection_);
+
+ // Provide defaults for connection_id and link_prefix if not set.
+ std::string cid = connection_.container_id();
+ if (cid.empty()) {
+ cid = make_id();
+ pn_connection_set_container(connection_.pn_object(), cid.c_str());
+ }
+ id_generator &link_gen = connection_context::get(connection_).link_gen;
+ if (link_gen.prefix().empty()) {
+ link_gen.prefix(make_id()+"/");
+ }
+}
+
+connection_engine::~connection_engine() {
+ transport_.unbind();
+ pn_collector_free(collector_.release()); // Break cycle with connection_
+}
+
+bool connection_engine::dispatch() {
+ proton_handler& h = *handler_.messaging_adapter_;
+ for (pn_event_t *e = pn_collector_peek(collector_.get());
+ e;
+ e = pn_collector_peek(collector_.get()))
+ {
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_INIT:
+ // FIXME aconway 2016-03-21: don't use START in connection handlers
+ // reserve it for containers.
+ proton_event(e, PN_REACTOR_INIT, 0).dispatch(h);
+ break;
+ default:
+ break;
+ }
+ proton_event(e, pn_event_type(e), 0).dispatch(h);
+ pn_collector_pop(collector_.get());
+ }
+ return !(pn_transport_closed(transport_.pn_object()) ||
+ (connection().closed() && write_buffer().size == 0));
+}
+
+mutable_buffer connection_engine::read_buffer() {
+ ssize_t cap = pn_transport_capacity(transport_.pn_object());
+ if (cap > 0)
+ return mutable_buffer(pn_transport_tail(transport_.pn_object()), cap);
+ else
+ return mutable_buffer(0, 0);
+}
+
+void connection_engine::read_done(size_t n) {
+ pn_transport_process(transport_.pn_object(), n);
+}
+
+void connection_engine::read_close() {
+ pn_transport_close_tail(transport_.pn_object());
+}
+
+const_buffer connection_engine::write_buffer() const {
+ ssize_t pending = pn_transport_pending(transport_.pn_object());
+ if (pending > 0)
+ return const_buffer(pn_transport_head(transport_.pn_object()), pending);
+ else
+ return const_buffer(0, 0);
+}
+
+void connection_engine::write_done(size_t n) {
+ pn_transport_pop(transport_.pn_object(), n);
+}
+
+void connection_engine::write_close() {
+ pn_transport_close_head(transport_.pn_object());
+}
+
+void connection_engine::close(const std::string& name, const std::string& description) {
+ pn_condition_t* c = pn_transport_condition(transport_.pn_object());
+ pn_condition_set_name(c, name.c_str());
+ pn_condition_set_description(c, description.c_str());
+ read_close();
+ write_close();
+}
+
+proton::connection connection_engine::connection() const {
+ return connection_;
+}
+
+}}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/io/posix/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/posix/socket.cpp b/proton-c/bindings/cpp/src/io/posix/socket.cpp
new file mode 100644
index 0000000..656a837
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/posix/socket.cpp
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "msg.hpp"
+
+#include <proton/io/socket.hpp>
+#include <proton/url.hpp>
+
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace proton {
+namespace io {
+namespace socket {
+
+io_error::io_error(const std::string& s) : error(s) {}
+
+const descriptor INVALID_DESCRIPTOR = -1;
+
+std::string error_str() {
+ char buf[512] = "Unknown error";
+#ifdef _GNU_SOURCE
+ // GNU strerror_r returns the message
+ return ::strerror_r(errno, buf, sizeof(buf));
+#else
+ // POSIX strerror_r doesn't return the buffer
+ ::strerror_r(errno, buf, sizeof(buf));
+ return std::string(buf)
+#endif
+}
+
+namespace {
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+ if (result < 0) throw io_error(msg + error_str());
+ return result;
+}
+
+void gai_check(int result, const std::string& msg="") {
+ if (result) throw io_error(msg + gai_strerror(result));
+}
+
+}
+
+void engine::init() {
+ check(fcntl(socket_, F_SETFL, fcntl(socket_, F_GETFL, 0) | O_NONBLOCK), "set nonblock: ");
+}
+
+engine::engine(descriptor fd, handler& h, const connection_options &opts)
+ : connection_engine(h, opts), socket_(fd)
+{
+ init();
+}
+
+engine::engine(const url& u, handler& h, const connection_options& opts)
+ : connection_engine(h, opts), socket_(connect(u))
+{
+ init();
+}
+
+engine::~engine() {}
+
+void engine::read() {
+ mutable_buffer rbuf = read_buffer();
+ if (rbuf.size > 0) {
+ ssize_t n = ::read(socket_, rbuf.data, rbuf.size);
+ if (n > 0)
+ read_done(n);
+ else if (n == 0)
+ read_close();
+ else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
+ close("io_error", error_str());
+ }
+}
+
+void engine::write() {
+ const_buffer wbuf = write_buffer();
+ if (wbuf.size > 0) {
+ ssize_t n = ::write(socket_, wbuf.data, wbuf.size);
+ if (n > 0)
+ write_done(n);
+ else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ close("io_error", error_str());
+ }
+ }
+}
+
+void engine::run() {
+ while (dispatch()) {
+ fd_set rd, wr;
+ FD_ZERO(&rd);
+ if (read_buffer().size)
+ FD_SET(socket_, &rd);
+ FD_ZERO(&wr);
+ if (write_buffer().size)
+ FD_SET(socket_, &wr);
+ int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
+ if (n < 0) {
+ close("select: ", error_str());
+ break;
+ }
+ if (FD_ISSET(socket_, &rd)) {
+ read();
+ }
+ if (FD_ISSET(socket_, &wr))
+ write();
+ }
+ ::close(socket_);
+}
+
+namespace {
+struct auto_addrinfo {
+ struct addrinfo *ptr;
+ auto_addrinfo() : ptr(0) {}
+ ~auto_addrinfo() { ::freeaddrinfo(ptr); }
+ addrinfo* operator->() const { return ptr; }
+};
+}
+
+descriptor connect(const proton::url& u) {
+ descriptor fd = INVALID_DESCRIPTOR;
+ try{
+ auto_addrinfo addr;
+ gai_check(::getaddrinfo(u.host().empty() ? 0 : u.host().c_str(),
+ u.port().empty() ? 0 : u.port().c_str(),
+ 0, &addr.ptr), u.str()+": ");
+ fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect: ");
+ check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
+ return fd;
+ } catch (...) {
+ if (fd >= 0) close(fd);
+ throw;
+ }
+}
+
+listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_DESCRIPTOR) {
+ try {
+ auto_addrinfo addr;
+ gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+ port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
+ "listener address invalid: ");
+ socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listen: ");
+ int yes = 1;
+ check(setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt: ");
+ check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "bind: ");
+ check(::listen(socket_, 32), "listen: ");
+ } catch (...) {
+ if (socket_ >= 0) close(socket_);
+ throw;
+ }
+}
+
+listener::~listener() { ::close(socket_); }
+
+descriptor listener::accept(std::string& host_str, std::string& port_str) {
+ struct sockaddr_storage addr;
+ socklen_t size = sizeof(addr);
+ int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
+ char host[NI_MAXHOST], port[NI_MAXSERV];
+ gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
+ host, sizeof(host), port, sizeof(port), 0),
+ "accept invalid remote address: ");
+ host_str = host;
+ port_str = port;
+ return fd;
+}
+
+// Empty stubs, only needed on windows.
+void initialize() {}
+void finalize() {}
+
+}}}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org