You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/03/23 19:58:44 UTC

[1/3] qpid-proton git commit: PROTON-1161 - c++: better interface to connection_engine.

Repository: qpid-proton
Updated Branches:
  refs/heads/master c63b2bea6 -> 38a71ffe5


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/io/windows/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/windows/socket.cpp b/proton-c/bindings/cpp/src/io/windows/socket.cpp
new file mode 100644
index 0000000..afd3b56
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/windows/socket.cpp
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "msg.hpp"
+
+#include <proton/io/socket.hpp>
+#include <proton/url.hpp>
+
+#define FD_SETSIZE 2048
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <assert.h>
+
+namespace proton {
+namespace io {
+namespace socket {
+
+const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET;
+
+std::string error_str() {
+    HRESULT code = WSAGetLastError();
+    char err[1024] = {0};
+    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+                  FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+    return err;
+}
+
+io_error::io_error(const std::string& s) : error(s) {}
+
+namespace {
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+    if (result == SOCKET_ERROR)
+        throw io_error(msg + error_str());
+    return result;
+}
+
+void gai_check(int result, const std::string& msg="") {
+    if (result)
+        throw io_error(msg + gai_strerror(result));
+}
+
+} // namespace
+
+void initialize() {
+    WSADATA unused;
+    check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2
+}
+
+void finalize() {
+    WSACleanup();
+}
+
+void engine::init() {
+    u_long nonblock = 1;
+    check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: ");
+}
+
+engine::engine(descriptor fd, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(fd)
+{
+    init();
+}
+
+engine::engine(const url& u, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(connect(u))
+{
+    init();
+}
+
+engine::~engine() {}
+
+void engine::read() {
+    mutable_buffer rbuf = read_buffer();
+    if (rbuf.size > 0) {
+        int n = ::recv(socket_, rbuf.data, rbuf.size, 0);
+        if (n > 0)
+            read_done(n);
+        else if (n == 0)
+            read_close();
+        else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
+            close("io_error", error_str());
+    }
+}
+
+void engine::write() {
+    const_buffer wbuf = write_buffer();
+    if (wbuf.size > 0) {
+    int n = ::send(socket_, wbuf.data, wbuf.size, 0);
+    if (n > 0)
+        write_done(n);
+    else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
+        close("io_error", error_str());
+    }
+}
+
+void engine::run() {
+    while (dispatch()) {
+        fd_set rd, wr;
+        FD_ZERO(&rd);
+        if (read_buffer().size)
+            FD_SET(socket_, &rd);
+        FD_ZERO(&wr);
+        if (write_buffer().size)
+            FD_SET(socket_, &wr);
+        int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
+        if (n < 0) {
+            close("io_error", error_str());
+            break;
+        }
+        if (FD_ISSET(socket_, &rd)) {
+            read();
+        }
+        if (FD_ISSET(socket_, &wr))
+            write();
+    }
+    ::closesocket(socket_);
+}
+
+namespace {
+struct auto_addrinfo {
+    struct addrinfo *ptr;
+    auto_addrinfo() : ptr(0) {}
+    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
+    addrinfo* operator->() const { return ptr; }
+};
+
+static const char *amqp_service(const char *port) {
+  // Help older Windows to know about amqp[s] ports
+  if (port) {
+    if (!strcmp("amqp", port)) return "5672";
+    if (!strcmp("amqps", port)) return "5671";
+  }
+  return port;
+}
+}
+
+
+descriptor connect(const proton::url& u) {
+    // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
+    std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host();
+    descriptor fd = INVALID_SOCKET;
+    try{
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                amqp_service(u.port().empty() ? 0 : u.port().c_str()),
+                                0, &addr.ptr),
+                  "connect address invalid: ");
+        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: ");
+        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
+        return fd;
+    } catch (...) {
+        if (fd != INVALID_SOCKET) ::closesocket(fd);
+        throw;
+    }
+}
+
+listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) {
+    try {
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
+                  "listener address invalid: ");
+        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: ");
+        bool yes = true;
+        check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: ");
+        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: ");
+        check(::listen(socket_, 32), "listener listen: ");
+    } catch (...) {
+        if (socket_ != INVALID_SOCKET) ::closesocket(socket_);
+        throw;
+    }
+}
+
+listener::~listener() { ::closesocket(socket_); }
+
+descriptor listener::accept(std::string& host_str, std::string& port_str) {
+    struct sockaddr_storage addr;
+    socklen_t size = sizeof(addr);
+    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
+    char host[NI_MAXHOST], port[NI_MAXSERV];
+    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
+                          host, sizeof(host), port, sizeof(port), 0),
+              "accept invalid remote address: ");
+    host_str = host;
+    port_str = port;
+    return fd;
+}
+
+}}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/posix/io.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/posix/io.cpp b/proton-c/bindings/cpp/src/posix/io.cpp
deleted file mode 100644
index be9db44..0000000
--- a/proton-c/bindings/cpp/src/posix/io.cpp
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "msg.hpp"
-
-#include <proton/io.hpp>
-#include <proton/url.hpp>
-
-#include <errno.h>
-#include <string.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-namespace proton {
-namespace io {
-
-const descriptor INVALID_DESCRIPTOR = -1;
-
-std::string error_str() {
-    char buf[512] = "Unknown error";
-#ifdef _GNU_SOURCE
-    // GNU strerror_r returns the message
-    return ::strerror_r(errno, buf, sizeof(buf));
-#else
-    // POSIX strerror_r doesn't return the buffer
-    ::strerror_r(errno, buf, sizeof(buf));
-    return std::string(buf)
-#endif
-}
-
-namespace {
-
-template <class T> T check(T result, const std::string& msg=std::string()) {
-    if (result < 0) throw connection_engine::io_error(msg + error_str());
-    return result;
-}
-
-void gai_check(int result, const std::string& msg="") {
-    if (result) throw connection_engine::io_error(msg + gai_strerror(result));
-}
-
-}
-
-void socket_engine::init() {
-    check(fcntl(socket_, F_SETFL, fcntl(socket_, F_GETFL, 0) | O_NONBLOCK), "set nonblock: ");
-}
-
-socket_engine::socket_engine(descriptor fd, handler& h, const connection_options &opts)
-    : connection_engine(h, opts), socket_(fd)
-{
-    init();
-}
-
-socket_engine::socket_engine(const url& u, handler& h, const connection_options& opts)
-    : connection_engine(h, opts), socket_(connect(u))
-{
-    init();
-}
-
-socket_engine::~socket_engine() {}
-
-std::pair<size_t, bool> socket_engine::io_read(char *buf, size_t size) {
-    ssize_t n = ::read(socket_, buf, size);
-    if (n > 0) return std::make_pair(n, true);
-    if (n == 0) return std::make_pair(0, false);
-    if (errno == EAGAIN || errno == EWOULDBLOCK)
-        return std::make_pair(0, true);
-    throw io_error("read: " + error_str());
-}
-
-size_t socket_engine::io_write(const char *buf, size_t size) {
-    ssize_t n = ::write(socket_, buf, size);
-    if (n == EAGAIN || n == EWOULDBLOCK) return 0;
-    if (n < 0) check(n, "write: ");
-    return n;
-}
-
-void socket_engine::io_close() { ::close(socket_); }
-
-void socket_engine::run() {
-    fd_set self;
-    FD_ZERO(&self);
-    FD_SET(socket_, &self);
-    while (!closed()) {
-        process();
-        if (!closed()) {
-            int n = select(FD_SETSIZE,
-                           can_read() ? &self : NULL,
-                           can_write() ? &self : NULL,
-                           NULL, NULL);
-            check(n, "select: ");
-        }
-    }
-}
-
-namespace {
-struct auto_addrinfo {
-    struct addrinfo *ptr;
-    auto_addrinfo() : ptr(0) {}
-    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
-    addrinfo* operator->() const { return ptr; }
-};
-}
-
-descriptor connect(const proton::url& u) {
-    descriptor fd = INVALID_DESCRIPTOR;
-    try{
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(u.host().empty() ? 0 : u.host().c_str(),
-                                u.port().empty() ? 0 : u.port().c_str(),
-                                0, &addr.ptr), u.str()+": ");
-        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect: ");
-        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
-        return fd;
-    } catch (...) {
-        if (fd >= 0) close(fd);
-        throw;
-    }
-}
-
-listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_DESCRIPTOR) {
-    try {
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
-                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
-                  "listener address invalid: ");
-        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listen: ");
-        int yes = 1;
-        check(setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt: ");
-        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "bind: ");
-        check(::listen(socket_, 32), "listen: ");
-    } catch (...) {
-        if (socket_ >= 0) close(socket_);
-        throw;
-    }
-}
-
-listener::~listener() { ::close(socket_); }
-
-descriptor listener::accept(std::string& host_str, std::string& port_str) {
-    struct sockaddr_storage addr;
-    socklen_t size = sizeof(addr);
-    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
-    char host[NI_MAXHOST], port[NI_MAXSERV];
-    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
-                          host, sizeof(host), port, sizeof(port), 0),
-              "accept invalid remote address: ");
-    host_str = host;
-    port_str = port;
-    return fd;
-}
-
-// Empty stubs, only needed on windows.
-void initialize() {}
-void finalize() {}
-}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/scalar_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/scalar_test.cpp b/proton-c/bindings/cpp/src/scalar_test.cpp
index b5955d5..cd00d31 100644
--- a/proton-c/bindings/cpp/src/scalar_test.cpp
+++ b/proton-c/bindings/cpp/src/scalar_test.cpp
@@ -62,7 +62,6 @@ template <class T> void type_test(T x, type_id tid, T y) {
         FAIL("expected conversion_error: " #EXPR);                      \
     } catch (const conversion_error& e) {}
 
-// FIXME aconway 2016-03-15: new coerce stuff.
 void coerce_test() {
     scalar a;
     ASSERT_EQUAL(NULL_TYPE, a.type());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/test_bits.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/test_bits.hpp b/proton-c/bindings/cpp/src/test_bits.hpp
index 38cd7a8..18298cb 100644
--- a/proton-c/bindings/cpp/src/test_bits.hpp
+++ b/proton-c/bindings/cpp/src/test_bits.hpp
@@ -30,18 +30,29 @@
 
 namespace test {
 
-struct fail : public std::logic_error { fail(const std::string& what) : logic_error(what) {} };
+struct fail : public std::logic_error {
+    fail(const std::string& what) : logic_error(what) {}
+};
+
+template <class T, class U>
+void assert_equal(const T& want, const U& got, const std::string& what) {
+    if (!(want == got))
+        throw fail(MSG(what << " " << want << " != " << got));
+}
 
-bool close(double want, double got, double delta) {
-    return fabs(want-got) <= delta;
+void assert_equalish(double want, double got, double delta, const std::string& what)
+{
+    if (!(fabs(want-got) <= delta))
+        throw fail(MSG(what << " " << want << " !=~ " << got));
 }
 
-#define FAIL(WHAT) throw fail(MSG(__FILE__ << ":" << __LINE__ << ": " << WHAT))
-#define ASSERT(TEST) do { if (!(TEST)) FAIL("assert failed: " << #TEST); } while(false)
-#define ASSERT_EQUAL(WANT, GOT) if (!((WANT) == (GOT))) \
-        FAIL(#WANT << " !=  " << #GOT << ": " << (WANT) << " != " << (GOT))
-#define ASSERT_CLOSE(WANT, GOT, DELTA) if (!close((WANT), (GOT), (DELTA))) \
-        FAIL(#WANT << " != " << #GOT << ": " << (WANT) << " != " << (GOT))
+#define FAIL_MSG(WHAT) (MSG(__FILE__ << ":" << __LINE__ << ": " << WHAT).str())
+#define FAIL(WHAT) throw fail(FAIL_MSG(WHAT))
+#define ASSERT(TEST) do { if (!(TEST)) FAIL("failed ASSERT(" #TEST ")"); } while(false)
+#define ASSERT_EQUAL(WANT, GOT) \
+    assert_equal((WANT), (GOT), FAIL_MSG("failed ASSERT_EQUAL(" #WANT ", " #GOT ")"))
+#define ASSERT_EQUALISH(WANT, GOT, DELTA) \
+    assert_equalish((WANT), (GOT), (DELTA), FAIL_MSG("failed ASSERT_EQUALISH(" #WANT ", " #GOT ")"))
 
 #define RUN_TEST(BAD_COUNT, TEST)                                       \
     do {                                                                \

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/value_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/value_test.cpp b/proton-c/bindings/cpp/src/value_test.cpp
index d636ffa..7b4940a 100644
--- a/proton-c/bindings/cpp/src/value_test.cpp
+++ b/proton-c/bindings/cpp/src/value_test.cpp
@@ -132,8 +132,8 @@ void get_coerce_test() {
     ASSERT_EQUAL(4, coerce<uint64_t>(value(uint32_t(4))));
     ASSERT_EQUAL(-4, coerce<int64_t>(value(int32_t(-4))));
 
-    ASSERT_CLOSE(1.2, coerce<float>(value(double(1.2))), 0.001);
-    ASSERT_CLOSE(3.4, coerce<double>(value(float(3.4))), 0.001);
+    ASSERT_EQUALISH(1.2, coerce<float>(value(double(1.2))), 0.001);
+    ASSERT_EQUALISH(3.4, coerce<double>(value(float(3.4))), 0.001);
 
     ASSERT_EQUAL(std::string("foo"), coerce<std::string>(value(symbol("foo"))));
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/windows/io.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/windows/io.cpp b/proton-c/bindings/cpp/src/windows/io.cpp
deleted file mode 100644
index 52f5fc0..0000000
--- a/proton-c/bindings/cpp/src/windows/io.cpp
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "msg.hpp"
-#include <proton/io.hpp>
-#include <proton/url.hpp>
-
-#define FD_SETSIZE 2048
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#if _WIN32_WINNT < 0x0501
-#error "Proton requires Windows API support for XP or later."
-#endif
-#include <winsock2.h>
-#include <mswsock.h>
-#include <Ws2tcpip.h>
-
-#include <ctype.h>
-#include <errno.h>
-#include <stdio.h>
-#include <assert.h>
-
-namespace proton {
-namespace io {
-
-const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET;
-
-std::string error_str() {
-    HRESULT code = WSAGetLastError();
-    char err[1024] = {0};
-    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
-                  FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
-    return err;
-}
-
-namespace {
-
-template <class T> T check(T result, const std::string& msg=std::string()) {
-    if (result == SOCKET_ERROR)
-        throw connection_engine::io_error(msg + error_str());
-    return result;
-}
-
-void gai_check(int result, const std::string& msg="") {
-    if (result)
-        throw connection_engine::io_error(msg + gai_strerror(result));
-}
-} // namespace
-
-void initialize() {
-    WSADATA unused;
-    check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2
-}
-
-void finalize() {
-    WSACleanup();
-}
-
-void socket_engine::init() {
-    u_long nonblock = 1;
-    check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: ");
-}
-
-socket_engine::socket_engine(descriptor fd, handler& h, const connection_options &opts)
-    : connection_engine(h, opts), socket_(fd)
-{
-    init();
-}
-
-socket_engine::socket_engine(const url& u, handler& h, const connection_options &opts)
-    : connection_engine(h, opts), socket_(connect(u))
-{
-    init();
-}
-
-socket_engine::~socket_engine() {}
-
-std::pair<size_t, bool> socket_engine::io_read(char *buf, size_t size) {
-    int n = ::recv(socket_, buf, size, 0);
-    if (n > 0) return std::make_pair(n, true);
-    if (n == 0) return std::make_pair(0, false);
-    if (n == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK)
-        return std::make_pair(0, true);
-    throw connection_engine::io_error("read: " + error_str());
-}
-
-size_t socket_engine::io_write(const char *buf, size_t size) {
-    int n = ::send(socket_, buf, size, 0);
-    if (n == SOCKET_ERROR && n == WSAEWOULDBLOCK) return 0;
-    return check(n, "write: ");
-}
-
-void socket_engine::io_close() { ::closesocket(socket_); }
-
-void socket_engine::run() {
-    fd_set self;
-    FD_ZERO(&self);
-    FD_SET(socket_, &self);
-    while (!closed()) {
-        process();
-        if (!closed()) {
-            int n = ::select(FD_SETSIZE,
-                           can_read() ? &self : NULL,
-                           can_write() ? &self : NULL,
-                           NULL, NULL);
-            check(n, "select: ");
-        }
-    }
-}
-
-namespace {
-struct auto_addrinfo {
-    struct addrinfo *ptr;
-    auto_addrinfo() : ptr(0) {}
-    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
-    addrinfo* operator->() const { return ptr; }
-};
-
-static const char *amqp_service(const char *port) {
-  // Help older Windows to know about amqp[s] ports
-  if (port) {
-    if (!strcmp("amqp", port)) return "5672";
-    if (!strcmp("amqps", port)) return "5671";
-  }
-  return port;
-}
-}
-
-descriptor connect(const proton::url& u) {
-    // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
-    std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host();
-    descriptor fd = INVALID_SOCKET;
-    try{
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
-                                amqp_service(u.port().empty() ? 0 : u.port().c_str()),
-                                0, &addr.ptr),
-                  "connect address invalid: ");
-        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: ");
-        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
-        return fd;
-    } catch (...) {
-        if (fd != INVALID_SOCKET) ::closesocket(fd);
-        throw;
-    }
-}
-
-listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) {
-    try {
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
-                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
-                  "listener address invalid: ");
-        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: ");
-        bool yes = true;
-        check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: ");
-        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: ");
-        check(::listen(socket_, 32), "listener listen: ");
-    } catch (...) {
-        if (socket_ != INVALID_SOCKET) ::closesocket(socket_);
-        throw;
-    }
-}
-
-listener::~listener() { ::closesocket(socket_); }
-
-descriptor listener::accept(std::string& host_str, std::string& port_str) {
-    struct sockaddr_storage addr;
-    socklen_t size = sizeof(addr);
-    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
-    char host[NI_MAXHOST], port[NI_MAXSERV];
-    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
-                          host, sizeof(host), port, sizeof(port), 0),
-              "accept invalid remote address: ");
-    host_str = host;
-    port_str = port;
-    return fd;
-}
-
-}}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-proton git commit: PROTON-1161 - c++: better interface to connection_engine.

Posted by ac...@apache.org.
PROTON-1161 - c++: better interface to connection_engine.

More flexible connection_engine interface to support reactor and proactor patterns.
Provides direct access to proton buffers for minimal copies in either case.

connection_engine is now completely IO and thread neutral, it simply handles:

    bytes-in -> events as handler function calls -> bytes-out

Moved connection_engine and related classes into the proton::io namespace.

Cleaned up engine implementation, improved the unit tests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/222574ed
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/222574ed
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/222574ed

Branch: refs/heads/master
Commit: 222574ed95acaa4143ea3e2c2cd9e7abac9e1dd9
Parents: c63b2be
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 17 18:29:13 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 23 13:48:02 2016 -0400

----------------------------------------------------------------------
 examples/cpp/engine/broker.cpp                  | 141 +++++-----
 examples/cpp/engine/client.cpp                  |   4 +-
 examples/cpp/engine/direct_recv.cpp             |   6 +-
 examples/cpp/engine/direct_send.cpp             |   6 +-
 examples/cpp/engine/helloworld.cpp              |   4 +-
 examples/cpp/engine/server.cpp                  |   4 +-
 examples/cpp/engine/simple_recv.cpp             |   4 +-
 examples/cpp/engine/simple_send.cpp             |   4 +-
 examples/cpp/example_test.py                    |   2 +-
 proton-c/bindings/cpp/CMakeLists.txt            |  12 +-
 proton-c/bindings/cpp/docs/mainpage.md          |   6 +-
 proton-c/bindings/cpp/docs/tutorial.dox         |   4 +-
 .../bindings/cpp/include/proton/connection.hpp  |   6 +-
 .../cpp/include/proton/connection_engine.hpp    | 168 ------------
 .../cpp/include/proton/connection_options.hpp   |   6 +-
 .../bindings/cpp/include/proton/handler.hpp     |   6 +-
 proton-c/bindings/cpp/include/proton/io.hpp     | 134 ----------
 .../cpp/include/proton/io/connection_engine.hpp | 182 +++++++++++++
 .../bindings/cpp/include/proton/io/socket.hpp   | 130 ++++++++++
 .../bindings/cpp/include/proton/transport.hpp   |   5 +
 proton-c/bindings/cpp/src/connection_engine.cpp | 208 ---------------
 proton-c/bindings/cpp/src/contexts.hpp          |  15 +-
 proton-c/bindings/cpp/src/engine_test.cpp       | 258 +++++++++++--------
 .../bindings/cpp/src/io/connection_engine.cpp   | 172 +++++++++++++
 proton-c/bindings/cpp/src/io/posix/socket.cpp   | 194 ++++++++++++++
 proton-c/bindings/cpp/src/io/windows/socket.cpp | 217 ++++++++++++++++
 proton-c/bindings/cpp/src/posix/io.cpp          | 175 -------------
 proton-c/bindings/cpp/src/scalar_test.cpp       |   1 -
 proton-c/bindings/cpp/src/test_bits.hpp         |  29 ++-
 proton-c/bindings/cpp/src/value_test.cpp        |   4 +-
 proton-c/bindings/cpp/src/windows/io.cpp        | 197 --------------
 31 files changed, 1187 insertions(+), 1117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/broker.cpp b/examples/cpp/engine/broker.cpp
index de08991..698d795 100644
--- a/examples/cpp/engine/broker.cpp
+++ b/examples/cpp/engine/broker.cpp
@@ -24,76 +24,25 @@
 
 #include <iostream>
 
-#ifdef WIN32
-#include "proton/acceptor.hpp"
-#include "proton/container.hpp"
-#include "proton/value.hpp"
-
-
-#include "fake_cpp11.hpp"
-
-class broker {
-  public:
-    broker(const proton::url& url) : handler_(url, queues_) {}
-
-    proton::handler& handler() { return handler_; }
-
-  private:
-
-    class my_handler : public broker_handler {
-      public:
-        my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
-
-        void on_start(proton::event &e) override {
-            e.container().listen(url_);
-            std::cout << "broker listening on " << url_ << std::endl;
-        }
-
-      private:
-        const proton::url& url_;
-    };
-
-  private:
-    queues queues_;
-    my_handler handler_;
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    proton::url url("0.0.0.0");
-    options opts(argc, argv);
-    opts.add_value(url, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        broker b(url);
-        proton::container(b.handler()).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}
-#else // WIN32
-#include <proton/io.hpp>
+#ifndef WIN32                   // TODO aconway 2016-03-23: windows broker example
+#include <proton/io/socket.hpp>
 #include <sys/select.h>
 #include <set>
 
 template <class T> T check(T result, const std::string& msg="io_error: ") {
     if (result < 0)
-        throw proton::connection_engine::io_error(msg + proton::io::error_str());
+        throw proton::io::socket::io_error(msg + proton::io::socket::error_str());
     return result;
 }
 
 void fd_set_if(bool on, int fd, fd_set *fds);
 
 class broker {
-    typedef std::set<proton::io::socket_engine*> engines;
+    typedef std::set<proton::io::socket::engine*> engines;
 
     queues queues_;
     broker_handler handler_;
-    proton::connection_engine::container container_;
+    proton::io::connection_engine::container container_;
     engines engines_;
     fd_set reading_, writing_;
 
@@ -109,7 +58,7 @@ class broker {
     }
 
     void run(const proton::url& url) {
-        proton::io::listener listener(url.host(), url.port());
+        proton::io::socket::listener listener(url.host(), url.port());
         std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl;
         FD_SET(listener.socket(), &reading_);
         while(true) {
@@ -122,27 +71,26 @@ class broker {
                 int fd = listener.accept(client_host, client_port);
                 std::cout << "accepted " << client_host << ":" << client_port
                           << " fd=" << fd << std::endl;
-                engines_.insert(new proton::io::socket_engine(fd, handler_, container_.make_options()));
+                engines_.insert(
+                    new proton::io::socket::engine(
+                        fd, handler_, container_.make_options()));
                 FD_SET(fd, &reading_);
                 FD_SET(fd, &writing_);
             }
 
             for (engines::iterator i = engines_.begin(); i != engines_.end(); ) {
-                engines::iterator j = i++;        // Save iterator in case we need to erase it.
-                proton::io::socket_engine *eng = *j;
+                proton::io::socket::engine *eng = *(i++);
                 int flags = 0;
-                if (FD_ISSET(eng->socket(), &readable_set))
-                    flags |= proton::io::socket_engine::READ;
                 if (FD_ISSET(eng->socket(), &writable_set))
-                    flags |= proton::io::socket_engine::WRITE;
-                if (flags) eng->process(flags);
-                // Set reading/writing bits for next time around
-                fd_set_if(eng->can_read(), eng->socket(), &reading_);
-                fd_set_if(eng->can_write(), eng->socket(), &writing_);
-
-                if (eng->closed()) {
+                    eng->write();
+                if (FD_ISSET(eng->socket(), &readable_set))
+                    eng->read();
+                if (eng->dispatch()) {
+                    fd_set_if(eng->read_buffer().size, eng->socket(), &reading_);
+                    fd_set_if(eng->write_buffer().size, eng->socket(), &writing_);
+                } else {
                     std::cout << "closed fd=" << eng->socket() << std::endl;
-                    engines_.erase(j);
+                    engines_.erase(eng);
                     delete eng;
                 }
             }
@@ -173,4 +121,57 @@ int main(int argc, char **argv) {
     }
     return 1;
 }
+#else // WIN32
+
+#include "proton/acceptor.hpp"
+#include "proton/container.hpp"
+#include "proton/value.hpp"
+
+
+#include "fake_cpp11.hpp"
+
+class broker {
+  public:
+    broker(const proton::url& url) : handler_(url, queues_) {}
+
+    proton::handler& handler() { return handler_; }
+
+  private:
+
+    class my_handler : public broker_handler {
+      public:
+        my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
+
+        void on_start(proton::event &e) override {
+            e.container().listen(url_);
+            std::cout << "broker listening on " << url_ << std::endl;
+        }
+
+      private:
+        const proton::url& url_;
+    };
+
+  private:
+    queues queues_;
+    my_handler handler_;
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    proton::url url("0.0.0.0");
+    options opts(argc, argv);
+    opts.add_value(url, 'a', "address", "listen on URL", "URL");
+    try {
+        opts.parse();
+        broker b(url);
+        proton::container(b.handler()).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}
+
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp
index d2d37c0..941ca75 100644
--- a/examples/cpp/engine/client.cpp
+++ b/examples/cpp/engine/client.cpp
@@ -20,7 +20,7 @@
  */
 
 #include "options.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -87,7 +87,7 @@ int main(int argc, char **argv) {
         requests.push_back("All mimsy were the borogroves,");
         requests.push_back("And the mome raths outgrabe.");
         client handler(address, requests);
-        proton::io::socket_engine(address, handler).run();
+        proton::io::socket::engine(address, handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_recv.cpp b/examples/cpp/engine/direct_recv.cpp
index 3fcc28e..51f6572 100644
--- a/examples/cpp/engine/direct_recv.cpp
+++ b/examples/cpp/engine/direct_recv.cpp
@@ -21,7 +21,7 @@
 
 #include "options.hpp"
 
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
 #include "proton/link.hpp"
@@ -70,10 +70,10 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         proton::url url(address);
-        proton::io::listener listener(url.host(), url.port());
+        proton::io::socket::listener listener(url.host(), url.port());
         std::cout << "direct_recv listening on " << url << std::endl;
         direct_recv handler(message_count);
-        proton::io::socket_engine(listener.accept(), handler).run();
+        proton::io::socket::engine(listener.accept(), handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_send.cpp b/examples/cpp/engine/direct_send.cpp
index 28ea845..4d7be72 100644
--- a/examples/cpp/engine/direct_send.cpp
+++ b/examples/cpp/engine/direct_send.cpp
@@ -23,7 +23,7 @@
 
 #include "proton/acceptor.hpp"
 #include "proton/connection.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -82,10 +82,10 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         proton::url url(address);
-        proton::io::listener listener(url.host(), url.port());
+        proton::io::socket::listener listener(url.host(), url.port());
         std::cout << "direct_send listening on " << url << std::endl;
         simple_send handler(message_count);
-        proton::io::socket_engine(listener.accept(), handler).run();
+        proton::io::socket::engine(listener.accept(), handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/helloworld.cpp b/examples/cpp/engine/helloworld.cpp
index d5a9f44..4bb0ed3 100644
--- a/examples/cpp/engine/helloworld.cpp
+++ b/examples/cpp/engine/helloworld.cpp
@@ -22,7 +22,7 @@
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
 #include "proton/url.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 
 #include <iostream>
 
@@ -57,7 +57,7 @@ int main(int argc, char **argv) {
     try {
         proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
         hello_world hw(url.path());
-        proton::io::socket_engine(url, hw).run();
+        proton::io::socket::engine(url, hw).run();
 
         return 0;
     } catch (const std::exception& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/server.cpp b/examples/cpp/engine/server.cpp
index 794d27e..bdd1a73 100644
--- a/examples/cpp/engine/server.cpp
+++ b/examples/cpp/engine/server.cpp
@@ -22,7 +22,7 @@
 #include "options.hpp"
 
 #include "proton/connection.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -79,7 +79,7 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         server handler(address);
-        proton::io::socket_engine(address, handler).run();
+        proton::io::socket::engine(address, handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_recv.cpp b/examples/cpp/engine/simple_recv.cpp
index 5b6cf21..a081227 100644
--- a/examples/cpp/engine/simple_recv.cpp
+++ b/examples/cpp/engine/simple_recv.cpp
@@ -21,7 +21,7 @@
 
 #include "options.hpp"
 
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -76,7 +76,7 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         simple_recv handler(address, message_count);
-        proton::io::socket_engine(address, handler).run();
+        proton::io::socket::engine(address, handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_send.cpp b/examples/cpp/engine/simple_send.cpp
index 39d8939..f6c0318 100644
--- a/examples/cpp/engine/simple_send.cpp
+++ b/examples/cpp/engine/simple_send.cpp
@@ -21,7 +21,7 @@
 
 #include "options.hpp"
 
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -85,7 +85,7 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         simple_send handler(address, message_count);
-        proton::io::socket_engine(address, handler).run();
+        proton::io::socket::engine(address, handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 6cfc632..d228d67 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -90,7 +90,7 @@ class Proc(Popen):
                         self.ready_set = True
                         self.ready.set()
             if self.wait() != 0:
-                self.error = ProcError(self)
+                raise ProcError(self)
         except Exception, e:
             self.error = sys.exc_info()
         finally:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 26a7c94..9254a3a 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -28,7 +28,6 @@ set(qpid-proton-cpp-source
   src/acceptor.cpp
   src/binary.cpp
   src/byte_array.cpp
-  src/scalar_base.cpp
   src/condition.cpp
   src/connection.cpp
   src/connection_options.cpp
@@ -37,21 +36,21 @@ set(qpid-proton-cpp-source
   src/container_impl.cpp
   src/contexts.cpp
   src/data.cpp
-  src/decoder.cpp
   src/decimal.cpp
+  src/decoder.cpp
   src/delivery.cpp
   src/duration.cpp
   src/encoder.cpp
   src/endpoint.cpp
-  src/connection_engine.cpp
   src/error.cpp
+  src/handler.cpp
   src/id_generator.cpp
+  src/io/connection_engine.cpp
   src/link.cpp
   src/link_options.cpp
   src/message.cpp
   src/messaging_adapter.cpp
   src/messaging_event.cpp
-  src/handler.cpp
   src/object.cpp
   src/proton_bits.cpp
   src/proton_event.cpp
@@ -60,6 +59,7 @@ set(qpid-proton-cpp-source
   src/receiver.cpp
   src/reconnect_timer.cpp
   src/sasl.cpp
+  src/scalar_base.cpp
   src/sender.cpp
   src/session.cpp
   src/ssl.cpp
@@ -75,9 +75,9 @@ set(qpid-proton-cpp-source
   )
 
 if(MSVC)
-  list(APPEND qpid-proton-cpp-source src/windows/io.cpp)
+  list(APPEND qpid-proton-cpp-source src/io/windows/socket.cpp)
 else(MSVC)
-  list(APPEND qpid-proton-cpp-source src/posix/io.cpp)
+  list(APPEND qpid-proton-cpp-source src/io/posix/socket.cpp)
 endif(MSVC)
 
 set_source_files_properties (

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/docs/mainpage.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mainpage.md b/proton-c/bindings/cpp/docs/mainpage.md
index f20d957..8ad34cb 100644
--- a/proton-c/bindings/cpp/docs/mainpage.md
+++ b/proton-c/bindings/cpp/docs/mainpage.md
@@ -103,10 +103,10 @@ from your handler methods.
 ### %proton::connection_engine
 
 `proton::connection_engine` dispatches events for a *single
-connection*. The subclass `proton::io::socket_engine` does
+connection*. The subclass `proton::io::socket::engine` does
 socket-based IO. An application with a single connection is just like
 using `proton::container` except you attach your handler to a
-`proton::io::socket_engine` instead. You can compare examples, such as
+`proton::io::socket::engine` instead. You can compare examples, such as
 \ref helloworld.cpp and \ref engine/helloworld.cpp.
 
 Now consider multiple connections. `proton::container` is easy to use
@@ -124,7 +124,7 @@ platforms. The example \ref engine/broker.cpp shows a broker using
 sockets and poll, but you can see how the code could be adapted.
 
 `proton::connection_engine` also does not dictate the IO mechanism,
-but it is an abstract class. `proton::socket_engine` provides
+but it is an abstract class. `proton::socket::engine` provides
 ready-made socket-based IO, but you can write your own subclass with
 any IO code. Just override the `io_read`, `io_write` and `io_close`
 methods. For example, the proton test suite implements an in-memory

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/docs/tutorial.dox
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/tutorial.dox b/proton-c/bindings/cpp/docs/tutorial.dox
index dcfbe05..e40a3e7 100644
--- a/proton-c/bindings/cpp/docs/tutorial.dox
+++ b/proton-c/bindings/cpp/docs/tutorial.dox
@@ -405,7 +405,7 @@ applications or applications with unusual IO requirements.
 We'll look at the \ref engine/helloworld.cpp example step-by-step to see how it differs
 from the container \ref helloworld.cpp version.
 
-First we include the `proton::io::socket_engine` class, which is a `proton::connection_engine`
+First we include the `proton::io::socket::engine` class, which is a `proton::connection_engine`
 that uses socket IO.
 
 \skipline proton/io.hpp
@@ -417,7 +417,7 @@ engine's' connection:
 \skip on_start
 \until }
 
-Our `main` function only differs in that it creates and runs a `socket_engine`
+Our `main` function only differs in that it creates and runs a `socket::engine`
 instead of a `container`.
 
 \skip main

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index da7f806..eb4e598 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -37,6 +37,10 @@ namespace proton {
 
 class handler;
 
+namespace io {
+class connection_engine;
+}
+
 /// A connection to a remote AMQP peer.
 class
 PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, public endpoint {
@@ -123,7 +127,7 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
     void host(const std::string& h);
 
     friend class connection_context;
-    friend class connection_engine;
+    friend class io::connection_engine;
     friend class connection_options;
     friend class connector;
     friend class container_impl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/connection_engine.hpp
deleted file mode 100644
index 0b1a947..0000000
--- a/proton-c/bindings/cpp/include/proton/connection_engine.hpp
+++ /dev/null
@@ -1,168 +0,0 @@
-#ifndef CONNECTION_ENGINE_HPP
-#define CONNECTION_ENGINE_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/connection.hpp"
-#include "proton/connection_options.hpp"
-#include "proton/error.hpp"
-#include "proton/export.hpp"
-#include "proton/pn_unique_ptr.hpp"
-#include "proton/types.hpp"
-
-#include <cstddef>
-#include <utility>
-#include <string>
-
-namespace proton {
-
-class connection_engine_context;
-class handler;
-class connection;
-
-// TODO aconway 2016-01-23: doc contrast with container.
-
-/// An interface for connection-oriented IO integration.  A
-/// connection_engine manages a single AMQP connection.  It is useful
-/// for integrating AMQP into an existing IO framework.
-///
-/// The engine provides a simple "bytes-in/bytes-out" interface. Incoming AMQP
-/// bytes from any kind of data connection are fed into the engine and processed
-/// to dispatch events to a proton::handler.  The resulting AMQP output data is
-/// available from the engine and can sent back over the connection.
-///
-/// The engine does no IO of its own. It assumes a two-way flow of bytes over
-/// some externally-managed "connection". The "connection" could be a socket
-/// managed by select, poll, epoll or some other mechanism, or it could be
-/// something else such as an RDMA connection, a shared-memory buffer or a Unix
-/// pipe.
-///
-/// The application is coded the same way for engine or container: you implement
-/// proton::handler. Handlers attached to an engine will receive transport,
-/// connection, session, link and message events. They will not receive reactor,
-/// selectable or timer events, the engine assumes those are managed externally.
-///
-/// THREAD SAFETY: A single engine instance cannot be called concurrently, but
-/// different engine instances can be processed concurrently in separate threads.
-class
-PN_CPP_CLASS_EXTERN connection_engine {
-  public:
-    class container {
-      public:
-        /// Create a container with id.  Default to random UUID if id
-        /// == "".
-        PN_CPP_EXTERN container(const std::string &id = "");
-        PN_CPP_EXTERN ~container();
-
-        /// Return the container-id
-        PN_CPP_EXTERN std::string id() const;
-
-        /// Make options to configure a new engine, using the default options.
-        ///
-        /// Call this once for each new engine as the options include a generated unique link_prefix.
-        /// You can modify the configuration before creating the engine but you should not
-        /// modify the container_id or link_prefix.
-        PN_CPP_EXTERN connection_options make_options();
-
-        /// Set the default options to be used for connection engines.
-        /// The container will set the container_id and link_prefix when make_options is called.
-        PN_CPP_EXTERN void options(const connection_options&);
-
-      private:
-        class impl;
-        internal::pn_unique_ptr<impl> impl_;
-    };
-    /// Create a connection engine that dispatches to handler.
-    PN_CPP_EXTERN connection_engine(handler&, const connection_options& = no_opts);
-
-    PN_CPP_EXTERN virtual ~connection_engine();
-
-    /// Return the number of bytes that the engine is currently ready to read.
-    PN_CPP_EXTERN size_t can_read() const;
-
-    /// Return the number of bytes that the engine is currently ready to write.
-    PN_CPP_EXTERN size_t can_write() const;
-
-    /// Combine these flags with | to indicate read, write, both or neither
-    enum io_flag {
-        READ = 1,
-        WRITE = 2
-    };
-
-    /// Read, write and dispatch events.
-    ///
-    /// io_flags indicates whether to read, write, both or neither.
-    /// Dispatches all events generated by reading or writing.
-    /// Use closed() to check if the engine is closed after processing.
-    ///
-    /// @throw exceptions thrown by the engines handler or the IO adapter.
-    PN_CPP_EXTERN void process(int io_flags=READ|WRITE);
-
-    /// True if the engine is closed, meaning there are no further
-    /// events to process and close_io has been called.  Call
-    /// error_str() to get an error description.
-    PN_CPP_EXTERN bool closed() const;
-
-    /// Get the AMQP connection associated with this connection_engine.
-    PN_CPP_EXTERN class connection connection() const;
-
-    /// Thrown by io_read and io_write functions to indicate an error.
-    struct PN_CPP_CLASS_EXTERN io_error : public error {
-        PN_CPP_EXTERN explicit io_error(const std::string&); ///< Construct with message
-    };
-
-  protected:
-    /// Do a non-blocking read on the IO stream.
-    ///
-    ///@return pair(size, true) if size bytes were read.
-    /// size==0 means no data could be read without blocking, the stream is still open.
-    /// Returns pair(0, false) if the stream closed.
-    ///
-    ///@throw proton::connection_engine::io_error if there is a read error.
-    virtual std::pair<size_t, bool> io_read(char* buf, size_t max) = 0;
-
-    /// Do a non-blocking write of up to max bytes from buf.
-    ///
-    /// Return the number of byes written , 0 if no data could be written
-    /// without blocking.
-    ///
-    ///throw proton::connection_engine::io_error if there is a write error.
-    virtual size_t io_write(const char*, size_t) = 0;
-
-    /// Close the io, no more _io methods will be called after this is called.
-    virtual void io_close() = 0;
-
-    PN_CPP_EXTERN static const connection_options no_opts;
-
-  private:
-    connection_engine(const connection_engine&);
-    connection_engine& operator=(const connection_engine&);
-
-    void dispatch();
-    void try_read();
-    void try_write();
-
-    class connection connection_;
-    connection_engine_context* ctx_;
-};
-
-}
-
-#endif // CONNECTION_ENGINE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 1a22b73..d2efab9 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -40,6 +40,10 @@ namespace proton {
 class proton_handler;
 class connection;
 
+namespace io {
+class connection_engine;
+}
+
 /// Options for creating a connection.
 ///
 /// Options can be "chained" like this:
@@ -149,7 +153,7 @@ class connection_options {
 
     friend class container_impl;
     friend class connector;
-    friend class connection_engine;
+    friend class io::connection_engine;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index 6ea11d4..3086037 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -32,6 +32,10 @@ class condition;
 class event;
 class messaging_adapter;
 
+namespace io {
+class connection_engine;
+}
+
 /// Callback functions for handling proton events.
 ///
 /// Subclass and override event-handling member functions.
@@ -131,7 +135,7 @@ PN_CPP_CLASS_EXTERN handler
     internal::pn_unique_ptr<messaging_adapter> messaging_adapter_;
 
     friend class container;
-    friend class connection_engine;
+    friend class io::connection_engine;
     friend class connection_options;
     friend class link_options;
     /// @endcond

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/io.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io.hpp b/proton-c/bindings/cpp/include/proton/io.hpp
deleted file mode 100644
index 9c63edb..0000000
--- a/proton-c/bindings/cpp/include/proton/io.hpp
+++ /dev/null
@@ -1,134 +0,0 @@
-#ifndef SOCKET_IO_HPP
-#define SOCKET_IO_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/connection_engine.hpp>
-#include <proton/url.hpp>
-
-namespace proton {
-
-///@details
-/// IO using sockets, file descriptors, or handles, for use with
-/// proton::connection_engine.
-///
-/// Note that you can use proton::connection_engine to communicate using AMQP
-/// over your own IO implementation or to integrate an existing IO framework of
-/// your choice, this implementation is provided as a convenience if sockets is
-/// sufficient for your needs.
-namespace io {
-
-/// @name Setup and teardown
-///
-/// Call proton::io::initialize before using any functions in the
-/// proton::io namespace.  Call proton::io::finalize when you are
-/// done.
-///
-/// You can call initialize/finalize more than once as long as they
-/// are in matching pairs. Use proton::io::guard to call
-/// initialize/finalize around a scope.
-///
-/// Note that on POSIX systems these are no-ops, but they are required
-/// for Windows.
-///
-/// @{
-
-/// Initialize the proton::io subsystem.
-PN_CPP_EXTERN void initialize();
-
-/// Finalize the proton::io subsystem.
-PN_CPP_EXTERN void finalize(); // nothrow
-
-/// Use to call io::initialize and io::finalize around a scope.
-struct guard {
-    guard() { initialize(); }
-    ~guard() { finalize(); }
-};
-
-/// @}
-
-/// An IO resource.
-typedef int64_t descriptor;
-
-/// An invalid descriptor.
-PN_CPP_EXTERN extern const descriptor INVALID_DESCRIPTOR;
-
-/// Return a string describing the most recent IO error.
-PN_CPP_EXTERN std::string error_str();
-
-/// Open a TCP connection to the host:port (port can be a service name or number) from a proton::url.
-PN_CPP_EXTERN descriptor connect(const proton::url&);
-
-/// Listening socket.
-class listener {
-  public:
-    /// Listen on host/port. Empty host means listen on all interfaces.
-    /// port can be a service name or number
-    PN_CPP_EXTERN listener(const std::string& host, const std::string& port);
-    PN_CPP_EXTERN ~listener();
-
-    /// Accept a connection. Return the descriptor, set host, port to the remote address.
-    /// port can be a service name or number.
-    PN_CPP_EXTERN descriptor accept(std::string& host, std::string& port);
-
-    /// Accept a connection, does not provide address info.
-    descriptor accept() { std::string dummy; return accept(dummy, dummy); }
-
-    /// Convert to descriptor
-    descriptor socket() const { return socket_; }
-
-  private:
-    guard guard_;
-    listener(const listener&);
-    listener& operator=(const listener&);
-    descriptor socket_;
-};
-
-/// A connection_engine for socket-based IO.
-class socket_engine : public connection_engine {
-  public:
-    /// Wrap an open socket. Sets non-blocking mode.
-    PN_CPP_EXTERN socket_engine(descriptor socket_, handler&, const connection_options& = no_opts);
-
-    /// Create socket engine connected to url.
-    PN_CPP_EXTERN socket_engine(const url&, handler&, const connection_options& = no_opts);
-
-    PN_CPP_EXTERN ~socket_engine();
-
-    /// Get the socket descriptor.
-    descriptor socket() const { return socket_; }
-
-    /// Start the engine.
-    PN_CPP_EXTERN void run();
-
-  protected:
-    PN_CPP_EXTERN std::pair<size_t, bool> io_read(char* buf, size_t max);
-    PN_CPP_EXTERN size_t io_write(const char*, size_t);
-    PN_CPP_EXTERN void io_close();
-
-  private:
-    void init();
-    guard guard_;
-    descriptor socket_;
-};
-
-}} // proton::io
-
-#endif // SOCKET_IO_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
new file mode 100644
index 0000000..3192d44
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
@@ -0,0 +1,182 @@
+#ifndef CONNECTION_ENGINE_HPP
+#define CONNECTION_ENGINE_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/condition.hpp"
+#include "proton/connection.hpp"
+#include "proton/connection_options.hpp"
+#include "proton/error.hpp"
+#include "proton/export.hpp"
+#include "proton/pn_unique_ptr.hpp"
+#include "proton/transport.hpp"
+#include "proton/types.hpp"
+
+#include <cstddef>
+#include <utility>
+#include <string>
+
+struct pn_collector_t;
+
+namespace proton {
+
+class handler;
+
+/// Contains classes to integrate proton into different IO and threading environments.
+namespace io {
+
+///@cond INTERNAL
+class connection_engine_context;
+///
+
+/// Pointer to a mutable memory region with a size.
+struct mutable_buffer {
+    char* data;
+    size_t size;
+
+    mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// Pointer to a const memory region with a size.
+struct const_buffer {
+    const char* data;
+    size_t size;
+
+    const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// A protocol engine to integrate AMQP into any IO or concurrency framework.
+///
+/// io::connection_engine manages a single proton::connection and dispatches
+/// events to a proton::handler. It does no IO of its own, but allows you to
+/// integrate AMQP protocol handling into any IO or concurrency framework.
+///
+/// The application is coded the same way as for the proton::container. The
+/// application implements a proton::handler to respond to transport,
+/// connection, session, link and message events. With a little care, the same
+/// handler classes can be used for both container and connection_engine, the
+/// \ref broker.cpp example illustrates this.
+///
+/// You need to write the IO code to read AMQP data to the read_buffer(). The
+/// engine parses the AMQP frames. dispatch() calls the appropriate functions on
+/// the applications proton::handler. You write output data from the engines
+/// write_buffer() to your IO.
+///
+/// The engine is not safe for concurrent use, but you can process different
+/// engines concurrently. A common pattern for high-performance servers is to
+/// serialize read/write activity per-connection and dispatch in a fixed-size
+/// thread pool.
+///
+/// The engine is designed to work with a classic reactor (e.g. select, poll,
+/// epoll) or an async-request driven proactor (e.g. windows completion ports,
+/// boost.asio, libuv etc.)
+///
+class
+PN_CPP_CLASS_EXTERN connection_engine {
+  public:
+    // TODO aconway 2016-03-18: this will change
+    class container {
+      public:
+        /// Create a container with id.  Default to random UUID.
+        PN_CPP_EXTERN container(const std::string &id = "");
+        PN_CPP_EXTERN ~container();
+
+        /// Return the container-id
+        PN_CPP_EXTERN std::string id() const;
+
+        /// Make options to configure a new engine, using the default options.
+        ///
+        /// Call this once for each new engine as the options include a generated unique link_prefix.
+        /// You can modify the configuration before creating the engine but you should not
+        /// modify the container_id or link_prefix.
+        PN_CPP_EXTERN connection_options make_options();
+
+        /// Set the default options to be used for connection engines.
+        /// The container will set the container_id and link_prefix when make_options is called.
+        PN_CPP_EXTERN void options(const connection_options&);
+
+      private:
+        class impl;
+        internal::pn_unique_ptr<impl> impl_;
+    };
+
+    /// Create a connection engine that dispatches to handler.
+    PN_CPP_EXTERN connection_engine(handler&, const connection_options& = connection_options());
+
+    PN_CPP_EXTERN virtual ~connection_engine();
+
+    /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
+    /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
+    /// Calling dispatch() may open up more buffer space.
+    PN_CPP_EXTERN mutable_buffer read_buffer();
+
+    /// Indicate that the first n bytes of read_buffer() have valid data.
+    /// This changes the buffer, call read_buffer() to get the updated buffer.
+    PN_CPP_EXTERN void read_done(size_t n);
+
+    /// Indicate that the read side of the transport is closed and no more data will be read.
+    PN_CPP_EXTERN void read_close();
+
+    /// The engine's write buffer. Write data from this buffer then call write_done()
+    /// Returns const_buffer(0, 0) if the engine has nothing to write.
+    /// Calling dispatch() may generate more data in the write buffer.
+    PN_CPP_EXTERN const_buffer write_buffer() const;
+
+    /// Indicate that the first n bytes of write_buffer() have been written successfully.
+    /// This changes the buffer, call write_buffer() to get the updated buffer.
+    PN_CPP_EXTERN void write_done(size_t n);
+
+    /// Indicate that the write side of the transport has closed and no more data will be written.
+    PN_CPP_EXTERN void write_close();
+
+    /// Indicate that the transport has closed with an error condition.
+    /// This calls both read_close() and write_close().
+    /// The error condition will be passed to handler::on_transport_error()
+    PN_CPP_EXTERN void close(const std::string& name, const std::string& description);
+
+    /// Dispatch all available events and call the corresponding \ref handler methods.
+    ///
+    /// Returns true if the engine is still active, false if it is finished and
+    /// can be destroyed. The engine is finished when either of the following is
+    /// true:
+    ///
+    /// - both read_close() and write_close() have been called, no more IO is possible.
+    /// - The AMQP connection() is closed AND write_buffer() is empty.
+    ///
+    /// May expand the read_buffer() and/or the write_buffer().
+    ///
+    /// @throw any exceptions thrown by the \ref handler.
+    PN_CPP_EXTERN bool dispatch();
+
+    /// Get the AMQP connection associated with this connection_engine.
+    PN_CPP_EXTERN proton::connection connection() const;
+
+  private:
+    connection_engine(const connection_engine&);
+    connection_engine& operator=(const connection_engine&);
+
+    proton::handler& handler_;
+    proton::connection connection_;
+    proton::transport transport_;
+    proton::internal::pn_ptr<pn_collector_t> collector_;
+};
+}}
+
+#endif // CONNECTION_ENGINE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/io/socket.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/socket.hpp b/proton-c/bindings/cpp/include/proton/io/socket.hpp
new file mode 100644
index 0000000..a43c0ae
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/socket.hpp
@@ -0,0 +1,130 @@
+#ifndef PROTON_IO_IO_HPP
+#define PROTON_IO_IO_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/io/connection_engine.hpp>
+#include <proton/url.hpp>
+
+
+namespace proton {
+namespace io {
+namespace socket {
+
+struct
+PN_CPP_CLASS_EXTERN io_error : public proton::error {
+    PN_CPP_EXTERN explicit io_error(const std::string&); ///< Construct with message
+};
+
+/// @name Setup and teardown
+///
+/// Call initialize() before using any functions in the proton::io::socket
+/// namespace.  Call finalize() when you are done.
+///
+/// You can call initialize/finalize more than once as long as they are in
+/// matching pairs. Use \ref guard to call initialize/finalize around a scope.
+///
+/// Note that on POSIX systems these are no-ops, but they are required
+/// for Windows.
+///
+/// @{
+
+/// Initialize the proton::io subsystem.
+PN_CPP_EXTERN void initialize();
+
+/// Finalize the proton::io subsystem.
+PN_CPP_EXTERN void finalize(); // nothrow
+
+/// Use to call io::initialize and io::finalize around a scope.
+struct guard {
+    guard() { initialize(); }
+    ~guard() { finalize(); }
+};
+
+/// @}
+
+/// An IO resource.
+typedef int64_t descriptor;
+
+/// An invalid descriptor.
+PN_CPP_EXTERN extern const descriptor INVALID_DESCRIPTOR;
+
+/// Return a string describing the most recent IO error.
+PN_CPP_EXTERN std::string error_str();
+
+/// Open a TCP connection to the host:port (port can be a service name or number) from a proton::url.
+PN_CPP_EXTERN descriptor connect(const proton::url&);
+
+/// Listening socket.
+class listener {
+  public:
+    /// Listen on host/port. Empty host means listen on all interfaces.
+    /// port can be a service name or number
+    PN_CPP_EXTERN listener(const std::string& host, const std::string& port);
+    PN_CPP_EXTERN ~listener();
+
+    /// Accept a connection. Return the descriptor, set host, port to the remote address.
+    /// port can be a service name or number.
+    PN_CPP_EXTERN descriptor accept(std::string& host, std::string& port);
+
+    /// Accept a connection, does not provide address info.
+    descriptor accept() { std::string dummy; return accept(dummy, dummy); }
+
+    /// Convert to descriptor
+    descriptor socket() const { return socket_; }
+
+  private:
+    guard guard_;
+    listener(const listener&);
+    listener& operator=(const listener&);
+    descriptor socket_;
+};
+
+/// A \ref connection_engine with non-blocking socket IO.
+class engine : public connection_engine {
+  public:
+    /// Wrap an open socket. Sets non-blocking mode.
+    PN_CPP_EXTERN engine(descriptor socket_, handler&, const connection_options& = connection_options());
+
+    /// Create socket engine connected to url.
+    PN_CPP_EXTERN engine(const url&, handler&, const connection_options& = connection_options());
+
+    PN_CPP_EXTERN ~engine();
+
+    /// Run the engine until it closes
+    PN_CPP_EXTERN void run();
+
+    /// Non-blocking read from socket to engine
+    PN_CPP_EXTERN void read();
+
+    /// Non-blocking write from engine to socket
+    PN_CPP_EXTERN void write();
+
+    descriptor socket() const { return socket_; }
+
+  private:
+    void init();
+    guard guard_;
+    descriptor socket_;
+};
+
+}}}
+
+#endif  /*!PROTON_IO_IO_HPP*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/transport.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp b/proton-c/bindings/cpp/include/proton/transport.hpp
index 9e32ac5..bebc974 100644
--- a/proton-c/bindings/cpp/include/proton/transport.hpp
+++ b/proton-c/bindings/cpp/include/proton/transport.hpp
@@ -34,6 +34,10 @@ class connection;
 class condition;
 class sasl;
 
+namespace io {
+class connection_engine;
+}
+
 class transport : public internal::object<pn_transport_t> {
     /// @cond INTERNAL
     transport(pn_transport_t* t) : internal::object<pn_transport_t>(t) {}
@@ -71,6 +75,7 @@ class transport : public internal::object<pn_transport_t> {
     friend class connection_options;
     friend class connector;
     friend class proton_event;
+    friend class io::connection_engine;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_engine.cpp b/proton-c/bindings/cpp/src/connection_engine.cpp
deleted file mode 100644
index be9efeb..0000000
--- a/proton-c/bindings/cpp/src/connection_engine.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/connection_engine.hpp"
-#include "proton/error.hpp"
-#include "proton/handler.hpp"
-#include "proton/uuid.hpp"
-
-#include "contexts.hpp"
-#include "id_generator.hpp"
-#include "messaging_adapter.hpp"
-#include "messaging_event.hpp"
-#include "msg.hpp"
-#include "proton_bits.hpp"
-#include "proton_bits.hpp"
-
-#include <proton/connection.h>
-#include <proton/transport.h>
-#include <proton/event.h>
-
-#include <algorithm>
-
-#include <iosfwd>
-
-namespace proton {
-
-namespace {
-void set_error(connection_engine_context *ctx_, const std::string& reason) {
-    pn_condition_t *c = pn_transport_condition(ctx_->transport);
-    pn_condition_set_name(c, "io_error");
-    pn_condition_set_description(c, reason.c_str());
-}
-
-void close_transport(connection_engine_context *ctx_) {
-    if (pn_transport_pending(ctx_->transport) >= 0)
-        pn_transport_close_head(ctx_->transport);
-    if (pn_transport_capacity(ctx_->transport) >= 0)
-        pn_transport_close_tail(ctx_->transport);
-}
-
-std::string  make_id(const std::string s="") { return s.empty() ? uuid::random().str() : s; }
-}
-
-connection_engine::io_error::io_error(const std::string& msg) : error(msg) {}
-
-class connection_engine::container::impl {
-  public:
-    impl(const std::string s="") : id_(make_id(s)) {}
-
-    const std::string id_;
-    id_generator id_gen_;
-    connection_options options_;
-};
-
-connection_engine::container::container(const std::string& s) : impl_(new impl(s)) {}
-
-connection_engine::container::~container() {}
-
-std::string connection_engine::container::id() const { return impl_->id_; }
-
-connection_options connection_engine::container::make_options() {
-    connection_options opts = impl_->options_;
-    opts.container_id(id()).link_prefix(impl_->id_gen_.next()+"/");
-    return opts;
-}
-
-void connection_engine::container::options(const connection_options &opts) {
-    impl_->options_ = opts;
-}
-
-connection_engine::connection_engine(class handler &h, const connection_options& opts) {
-    connection_ = proton::connection(internal::take_ownership(pn_connection()).get());
-    internal::pn_ptr<pn_transport_t> transport = internal::take_ownership(pn_transport());
-    internal::pn_ptr<pn_collector_t> collector = internal::take_ownership(pn_collector());
-    if (!connection_ || !transport || !collector)
-        throw proton::error("engine create");
-    int err = pn_transport_bind(transport.get(), connection_.pn_object());
-    if (err)
-        throw error(msg() << "transport bind:" << pn_code(err));
-    pn_connection_collect(connection_.pn_object(), collector.get());
-
-    ctx_ = &connection_engine_context::get(connection_); // Creates context
-    ctx_->engine_handler = &h;
-    ctx_->transport = transport.release();
-    ctx_->collector = collector.release();
-    opts.apply(connection_);
-    // Provide defaults for connection_id and link_prefix if not set.
-    std::string cid = connection_.container_id();
-    if (cid.empty()) {
-        cid = make_id();
-        pn_connection_set_container(connection_.pn_object(), cid.c_str());
-    }
-    id_generator &link_gen = connection_context::get(connection_).link_gen;
-    if (link_gen.prefix().empty()) {
-        link_gen.prefix(make_id()+"/");
-    }
-}
-
-connection_engine::~connection_engine() {
-    pn_transport_unbind(ctx_->transport);
-    pn_transport_free(ctx_->transport);
-    internal::pn_ptr<pn_connection_t> c(connection_.pn_object());
-    connection_ = proton::connection();
-    pn_connection_free(c.release());
-    pn_collector_free(ctx_->collector);
-}
-
-void connection_engine::process(int flags) {
-    if (closed()) return;
-    if (flags & WRITE) try_write();
-    dispatch();
-    if (flags & READ) try_read();
-    dispatch();
-
-    if (connection_.closed() && !closed()) {
-        dispatch();
-        while (can_write()) {
-            try_write(); // Flush final data.
-        }
-        // no transport errors.
-        close_transport(ctx_);
-    }
-    if (closed()) {
-        pn_transport_unbind(ctx_->transport);
-        dispatch();
-        try { io_close(); } catch(const io_error&) {} // Tell the IO to close.
-    }
-}
-
-void connection_engine::dispatch() {
-    proton_handler& h = *ctx_->engine_handler->messaging_adapter_;
-    pn_collector_t* c = ctx_->collector;
-    for (pn_event_t *e = pn_collector_peek(c); e; e = pn_collector_peek(c)) {
-        if (pn_event_type(e) == PN_CONNECTION_INIT) {
-            // Make the messaging_adapter issue a START event.
-            proton_event(e, PN_REACTOR_INIT, 0).dispatch(h);
-        }
-        proton_event(e, pn_event_type(e), 0).dispatch(h);
-        pn_collector_pop(c);
-    }
-}
-
-size_t connection_engine::can_read() const {
-    return std::max(ssize_t(0), pn_transport_capacity(ctx_->transport));
-}
-
-void connection_engine::try_read() {
-    size_t max = can_read();
-    if (max == 0) return;
-    try {
-        std::pair<size_t, bool> r = io_read(pn_transport_tail(ctx_->transport), max);
-        if (r.second) {
-            if (r.first > max)
-                throw io_error(msg() << "read invalid size: " << r.first << ">" << max);
-            pn_transport_process(ctx_->transport, r.first);
-        } else {
-            pn_transport_close_tail(ctx_->transport);
-        }
-    } catch (const io_error& e) {
-        set_error(ctx_, e.what());
-        pn_transport_close_tail(ctx_->transport);
-    }
-}
-
-size_t connection_engine::can_write() const {
-    return std::max(ssize_t(0), pn_transport_pending(ctx_->transport));
-}
-
-void connection_engine::try_write() {
-    size_t max = can_write();
-    if (max == 0) return;
-    try {
-        size_t n = io_write(pn_transport_head(ctx_->transport), max);
-        if (n > max) {
-            throw io_error(msg() << "write invalid size: " << n << " > " << max);
-        }
-        pn_transport_pop(ctx_->transport, n);
-    } catch (const io_error& e) {
-        set_error(ctx_, e.what());
-        pn_transport_close_head(ctx_->transport);
-    }
-}
-
-bool connection_engine::closed() const {
-    return pn_transport_closed(ctx_->transport);
-}
-
-connection connection_engine::connection() const { return connection_.pn_object(); }
-
-const connection_options connection_engine::no_opts;
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index 03271a1..a60c1fa 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -26,7 +26,7 @@
 #include "proton/message.hpp"
 #include "proton/connection.hpp"
 #include "proton/container.hpp"
-#include "proton/connection_engine.hpp"
+#include "proton/io/connection_engine.hpp"
 
 #include "id_generator.hpp"
 #include "proton_handler.hpp"
@@ -98,19 +98,6 @@ class connection_context : public context {
     static context::id id(const connection& c) { return id(c.pn_object()); }
 };
 
-// Connection context with information used by the connection_engine.
-class connection_engine_context : public connection_context {
-  public:
-    connection_engine_context() :  engine_handler(0), transport(0), collector(0) {}
-
-    class handler *engine_handler;
-    pn_transport_t  *transport;
-    pn_collector_t  *collector;
-    static connection_engine_context& get(const connection &c) {
-        return ref<connection_engine_context>(id(c));
-    }
-};
-
 void container_context(const reactor&, container&);
 
 class container_context {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/engine_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index 74a7a6a..afae6c2 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -20,7 +20,7 @@
 
 #include "test_bits.hpp"
 #include <proton/uuid.hpp>
-#include <proton/connection_engine.hpp>
+#include <proton/io/connection_engine.hpp>
 #include <proton/handler.hpp>
 #include <proton/event.hpp>
 #include <proton/types_fwd.hpp>
@@ -32,167 +32,209 @@
 #define override
 #endif
 
+using namespace proton::io;
 using namespace proton;
 using namespace test;
+using namespace std;
 
-// One end of an in-memory connection
-struct mem_pipe {
-    mem_pipe(std::deque<char>& r, std::deque<char>& w) : read(r), write(w) {}
-    std::deque<char>  &read, &write;
-};
+typedef std::deque<char> byte_stream;
 
-struct mem_queues : public std::pair<std::deque<char>, std::deque<char> > {
-    mem_pipe a() { return mem_pipe(first, second); }
-    mem_pipe b() { return mem_pipe(second, first); }
-};
+/// In memory connection_engine that reads and writes from byte_streams
+struct in_memory_engine : public connection_engine {
 
-// In memory connection_engine
-struct mem_engine : public connection_engine {
-    mem_pipe socket;
-    std::string read_error;
-    std::string write_error;
-
-    mem_engine(mem_pipe s, handler &h, const connection_options &opts)
-        : connection_engine(h, opts), socket(s) {}
-
-    std::pair<size_t, bool> io_read(char* buf, size_t size) override {
-        if (!read_error.empty()) throw io_error(read_error);
-        size = std::min(socket.read.size(), size);
-        copy(socket.read.begin(), socket.read.begin()+size, buf);
-        socket.read.erase(socket.read.begin(), socket.read.begin()+size);
-        return std::make_pair(size, true);
-    }
+    byte_stream& reads;
+    byte_stream& writes;
 
-    size_t io_write(const char* buf, size_t size) override {
-        if (!write_error.empty()) throw io_error(write_error);
-        socket.write.insert(socket.write.begin(), buf, buf+size);
-        return size;
-    }
+    in_memory_engine(byte_stream& rd, byte_stream& wr, handler &h,
+                     const connection_options &opts = connection_options()) :
+        connection_engine(h, opts), reads(rd), writes(wr) {}
 
-    void io_close() override {
-        read_error = write_error = "closed";
+    void do_read() {
+        mutable_buffer rbuf = read_buffer();
+        size_t size = std::min(reads.size(), rbuf.size);
+        if (size) {
+            copy(reads.begin(), reads.begin()+size, static_cast<char*>(rbuf.data));
+            read_done(size);
+            reads.erase(reads.begin(), reads.begin()+size);
+        }
     }
-};
 
-struct debug_handler : handler {
-    void on_unhandled(event& e) override {
-        std::cout << e.name() << std::endl;
+    void do_write() {
+        const_buffer wbuf = write_buffer();
+        if (wbuf.size) {
+            writes.insert(writes.begin(),
+                          static_cast<const char*>(wbuf.data),
+                          static_cast<const char*>(wbuf.data) + wbuf.size);
+            write_done(wbuf.size);
+        }
     }
-};
 
-struct record_handler : handler {
-    std::deque<std::string> events;
-    void on_unhandled(event& e) override {
-        events.push_back(e.name());
-    }
+    void process() { do_read(); do_write(); dispatch(); }
 };
 
-template <class HA=record_handler, class HB=record_handler> struct engine_pair {
+/// A pair of engines that talk to each other in-memory.
+struct engine_pair {
+    byte_stream ab, ba;
     connection_engine::container cont;
-    mem_queues queues;
-    HA ha;
-    HB hb;
-    mem_engine a, b;
-    engine_pair() : a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options()) {}
-    engine_pair(const std::string& id)
-        : cont(id), a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options())
-    {}
-    engine_pair(const connection_options &aopts, connection_options &bopts)
-        : a(queues.a(), ha, aopts), b(queues.b(), hb, bopts)
-    {}
 
-    void process() { a.process(); b.process(); }
-};
+    in_memory_engine a, b;
 
-void test_process_amqp() {
-    engine_pair<> e;
+    engine_pair(handler& ha, handler& hb,
+                const connection_options& ca = connection_options(),
+                const connection_options& cb = connection_options()) :
+        a(ba, ab, ha, ca), b(ab, ba, hb, cb) {}
 
-    e.a.process(connection_engine::READ); // Don't write unlesss writable
-    ASSERT(e.a.socket.write.empty());
-    e.a.process(connection_engine::WRITE);
-
-    std::string wrote(e.a.socket.write.begin(), e.a.socket.write.end());
-    e.a.process(connection_engine::WRITE);
-    ASSERT_EQUAL(8, wrote.size());
-    ASSERT_EQUAL("AMQP", wrote.substr(0,4));
+    void process() { a.process(); b.process(); }
+};
 
-    e.b.process();              // Read and write AMQP
-    ASSERT_EQUAL("AMQP", std::string(e.b.socket.write.begin(), e.b.socket.write.begin()+4));
-    ASSERT(e.b.socket.read.empty());
-    ASSERT(e.a.socket.write.empty());
-    ASSERT_EQUAL(many<std::string>() + "START", e.ha.events);
+template <class S> typename S::value_type quick_pop(S& s) {
+    ASSERT(!s.empty());
+    typename S::value_type x = s.front();
+    s.pop_front();
+    return x;
 }
 
-
-struct link_handler : public record_handler {
+/// A handler that records incoming endpoints, errors etc.
+struct record_handler : public handler {
     std::deque<proton::link> links;
+    std::deque<proton::session> sessions;
+    std::deque<std::string> errors;
+
     void on_link_open(event& e) override {
         links.push_back(e.link());
     }
 
-    proton::link pop() {
-        proton::link l;
-        if (!links.empty()) {
-            l = links.front();
-            links.pop_front();
-        }
-        return l;
+    void on_session_open(event& e) {
+        sessions.push_back(e.session());
+    }
+
+    void on_unhandled_error(event& e, const condition& c) {
+        errors.push_back(e.name() + "/" + c.what());
     }
 };
 
 void test_engine_prefix() {
     // Set container ID and prefix explicitly
-    engine_pair<link_handler, link_handler> e(
-        connection_options().container_id("a").link_prefix("x/"),
-        connection_options().container_id("b").link_prefix("y/"));
+    record_handler ha, hb;
+    engine_pair e(ha, hb,
+                  connection_options().container_id("a").link_prefix("x/"),
+                  connection_options().container_id("b").link_prefix("y/"));
     e.a.connection().open();
     ASSERT_EQUAL("a", e.a.connection().container_id());
     e.b.connection().open();
     ASSERT_EQUAL("b", e.b.connection().container_id());
 
-    e.a.connection().open_sender("");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("x/1", e.ha.pop().name());
-    ASSERT_EQUAL("x/1", e.hb.pop().name());
+    e.a.connection().open_sender("x");
+    while (ha.links.empty() || hb.links.empty()) e.process();
+    ASSERT_EQUAL("x/1", quick_pop(ha.links).name());
+    ASSERT_EQUAL("x/1", quick_pop(hb.links).name());
 
     e.a.connection().open_receiver("");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("x/2", e.ha.pop().name());
-    ASSERT_EQUAL("x/2", e.hb.pop().name());
+    while (ha.links.empty() || hb.links.empty()) e.process();
+    ASSERT_EQUAL("x/2", quick_pop(ha.links).name());
+    ASSERT_EQUAL("x/2", quick_pop(hb.links).name());
 
     e.b.connection().open_receiver("");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("y/1", e.ha.pop().name());
-    ASSERT_EQUAL("y/1", e.hb.pop().name());
+    while (ha.links.empty() || hb.links.empty()) e.process();
+    ASSERT_EQUAL("y/1", quick_pop(ha.links).name());
+    ASSERT_EQUAL("y/1", quick_pop(hb.links).name());
 }
 
 void test_container_prefix() {
     /// Let the container set the options.
-    engine_pair<link_handler, link_handler> e;
+    record_handler ha, hb;
+    connection_engine::container ca("a"), cb("b");
+    engine_pair e(ha, hb, ca.make_options(), cb.make_options());
+
+    ASSERT_EQUAL("a", e.a.connection().container_id());
+    ASSERT_EQUAL("b", e.b.connection().container_id());
+
     e.a.connection().open();
+    sender s = e.a.connection().open_sender("x");
+    ASSERT_EQUAL("1/1", s.name());
 
-    e.a.connection().open_sender("x");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("1/1", e.ha.pop().name());
-    ASSERT_EQUAL("1/1", e.hb.pop().name());
+    while (ha.links.empty() || hb.links.empty()) e.process();
+
+    ASSERT_EQUAL("1/1", quick_pop(ha.links).name());
+    ASSERT_EQUAL("1/1", quick_pop(hb.links).name());
 
     e.a.connection().open_receiver("y");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("1/2", e.ha.pop().name());
-    ASSERT_EQUAL("1/2", e.hb.pop().name());
+    while (ha.links.empty() || hb.links.empty()) e.process();
+    ASSERT_EQUAL("1/2", quick_pop(ha.links).name());
+    ASSERT_EQUAL("1/2", quick_pop(hb.links).name());
 
-    e.b.connection().open_receiver("z");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("2/1", e.ha.pop().name());
-    ASSERT_EQUAL("2/1", e.hb.pop().name());
+    // Open a second connection in each container, make sure links have different IDs.
+    record_handler ha2, hb2;
+    engine_pair e2(ha2, hb2, ca.make_options(), cb.make_options());
 
-    // TODO aconway 2016-01-22: check we respect name set in linkn-options.
+    ASSERT_EQUAL("a", e2.a.connection().container_id());
+    ASSERT_EQUAL("b", e2.b.connection().container_id());
+
+    e2.b.connection().open();
+    receiver r = e2.b.connection().open_receiver("z");
+    ASSERT_EQUAL("2/1", r.name());
+
+    while (ha2.links.empty() || hb2.links.empty()) e2.process();
+
+    ASSERT_EQUAL("2/1", quick_pop(ha2.links).name());
+    ASSERT_EQUAL("2/1", quick_pop(hb2.links).name());
 };
 
+void test_endpoint_close() {
+    // Make sure conditions are sent to the remote end.
+
+    // FIXME aconway 2016-03-22: re-enable these tests when we can set error conditions.
+
+    // record_handler ha, hb;
+    // engine_pair e(ha, hb);
+    // e.a.connection().open();
+    // e.a.connection().open_sender("x");
+    // e.a.connection().open_receiver("y");
+    // while (ha.links.size() < 2 || hb.links.size() < 2) e.process();
+    // link ax = quick_pop(ha.links), ay = quick_pop(ha.links);
+    // link bx = quick_pop(hb.links), by = quick_pop(hb.links);
+
+    // // Close a link
+    // ax.close(condition("err", "foo bar"));
+    // while (!(bx.state() & endpoint::REMOTE_CLOSED)) e.process();
+    // condition c = bx.remote_condition();
+    // ASSERT_EQUAL("err", c.name());
+    // ASSERT_EQUAL("foo bar", c.description());
+    // ASSERT_EQUAL("err: foo bar", ax.local_condition().what());
+
+    // // Close a link with an empty condition
+    // ay.close(condition());
+    // while (!(by.state() & endpoint::REMOTE_CLOSED)) e.process();
+    // ASSERT(by.remote_condition().empty());
+
+    // // Close a connection
+    // connection ca = e.a.connection(), cb = e.b.connection();
+    // ca.close(condition("conn", "bad connection"));
+    // while (!cb.closed()) e.process();
+    // ASSERT_EQUAL("conn: bad connection", cb.remote_condition().what());
+}
+
+void test_transport_close() {
+    // Make sure conditions are sent to the remote end.
+    record_handler ha, hb;
+    engine_pair e(ha, hb);
+    e.a.connection().open();
+    while (!e.a.connection().state() & endpoint::REMOTE_ACTIVE) e.process();
+
+    e.a.close("oops", "engine failure");
+    // Closed but we still have output data to flush so a.dispatch() is true.
+    ASSERT(e.a.dispatch());
+    while (!e.b.connection().closed()) e.process();
+    ASSERT_EQUAL(1, hb.errors.size());
+    ASSERT_EQUAL("trasport_error/oops: engine failure", hb.errors.front());
+    ASSERT_EQUAL("oops", e.b.connection().remote_condition().name());
+    ASSERT_EQUAL("engine failure", e.b.connection().remote_condition().description());
+}
+
 int main(int, char**) {
     int failed = 0;
-    RUN_TEST(failed, test_process_amqp());
     RUN_TEST(failed, test_engine_prefix());
     RUN_TEST(failed, test_container_prefix());
+    RUN_TEST(failed, test_endpoint_close());
     return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
new file mode 100644
index 0000000..5d8e5cc
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/io/connection_engine.hpp"
+#include "proton/error.hpp"
+#include "proton/handler.hpp"
+#include "proton/uuid.hpp"
+
+#include "contexts.hpp"
+#include "id_generator.hpp"
+#include "messaging_adapter.hpp"
+#include "messaging_event.hpp"
+#include "msg.hpp"
+#include "proton_bits.hpp"
+#include "proton_bits.hpp"
+
+#include <proton/connection.h>
+#include <proton/transport.h>
+#include <proton/event.h>
+
+#include <algorithm>
+
+#include <iosfwd>
+
+#include <assert.h>
+
+namespace proton {
+namespace io {
+
+namespace {
+std::string  make_id(const std::string s="") {
+    return s.empty() ? uuid::random().str() : s;
+}
+}
+
+class connection_engine::container::impl {
+  public:
+    impl(const std::string s="") : id_(make_id(s)) {}
+
+    const std::string id_;
+    id_generator id_gen_;
+    connection_options options_;
+};
+
+connection_engine::container::container(const std::string& s) : impl_(new impl(s)) {}
+
+connection_engine::container::~container() {}
+
+std::string connection_engine::container::id() const { return impl_->id_; }
+
+connection_options connection_engine::container::make_options() {
+    connection_options opts = impl_->options_;
+    opts.container_id(id()).link_prefix(impl_->id_gen_.next()+"/");
+    return opts;
+}
+
+void connection_engine::container::options(const connection_options &opts) {
+    impl_->options_ = opts;
+}
+
+connection_engine::connection_engine(class handler &h, const connection_options& opts) :
+    handler_(h),
+    connection_(internal::take_ownership(pn_connection()).get()),
+    transport_(internal::take_ownership(pn_transport()).get()),
+    collector_(internal::take_ownership(pn_collector()).get())
+{
+    if (!connection_ || !transport_ || !collector_)
+        throw proton::error("engine create");
+    transport_.bind(connection_);
+    pn_connection_collect(connection_.pn_object(), collector_.get());
+    opts.apply(connection_);
+
+    // Provide defaults for connection_id and link_prefix if not set.
+    std::string cid = connection_.container_id();
+    if (cid.empty()) {
+        cid = make_id();
+        pn_connection_set_container(connection_.pn_object(), cid.c_str());
+    }
+    id_generator &link_gen = connection_context::get(connection_).link_gen;
+    if (link_gen.prefix().empty()) {
+        link_gen.prefix(make_id()+"/");
+    }
+}
+
+connection_engine::~connection_engine() {
+    transport_.unbind();
+    pn_collector_free(collector_.release()); // Break cycle with connection_
+}
+
+bool connection_engine::dispatch() {
+    proton_handler& h = *handler_.messaging_adapter_;
+    for (pn_event_t *e = pn_collector_peek(collector_.get());
+         e;
+         e = pn_collector_peek(collector_.get()))
+    {
+        switch (pn_event_type(e)) {
+          case PN_CONNECTION_INIT:
+            // FIXME aconway 2016-03-21: don't use START in connection handlers
+            // reserve it for containers.
+            proton_event(e, PN_REACTOR_INIT, 0).dispatch(h);
+            break;
+          default:
+            break;
+        }
+        proton_event(e, pn_event_type(e), 0).dispatch(h);
+        pn_collector_pop(collector_.get());
+    }
+    return !(pn_transport_closed(transport_.pn_object()) ||
+          (connection().closed() && write_buffer().size == 0));
+}
+
+mutable_buffer connection_engine::read_buffer() {
+    ssize_t cap = pn_transport_capacity(transport_.pn_object());
+    if (cap > 0)
+        return mutable_buffer(pn_transport_tail(transport_.pn_object()), cap);
+    else
+        return mutable_buffer(0, 0);
+}
+
+void connection_engine::read_done(size_t n) {
+    pn_transport_process(transport_.pn_object(), n);
+}
+
+void connection_engine::read_close() {
+    pn_transport_close_tail(transport_.pn_object());
+}
+
+const_buffer connection_engine::write_buffer() const {
+    ssize_t pending = pn_transport_pending(transport_.pn_object());
+    if (pending > 0)
+        return const_buffer(pn_transport_head(transport_.pn_object()), pending);
+    else
+        return const_buffer(0, 0);
+}
+
+void connection_engine::write_done(size_t n) {
+    pn_transport_pop(transport_.pn_object(), n);
+}
+
+void connection_engine::write_close() {
+    pn_transport_close_head(transport_.pn_object());
+}
+
+void connection_engine::close(const std::string& name, const std::string& description) {
+    pn_condition_t* c = pn_transport_condition(transport_.pn_object());
+    pn_condition_set_name(c, name.c_str());
+    pn_condition_set_description(c, description.c_str());
+    read_close();
+    write_close();
+}
+
+proton::connection connection_engine::connection() const {
+    return connection_;
+}
+
+}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/io/posix/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/posix/socket.cpp b/proton-c/bindings/cpp/src/io/posix/socket.cpp
new file mode 100644
index 0000000..656a837
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/posix/socket.cpp
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "msg.hpp"
+
+#include <proton/io/socket.hpp>
+#include <proton/url.hpp>
+
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace proton {
+namespace io {
+namespace socket {
+
+io_error::io_error(const std::string& s) : error(s) {}
+
+const descriptor INVALID_DESCRIPTOR = -1;
+
+std::string error_str() {
+    char buf[512] = "Unknown error";
+#ifdef _GNU_SOURCE
+    // GNU strerror_r returns the message
+    return ::strerror_r(errno, buf, sizeof(buf));
+#else
+    // POSIX strerror_r doesn't return the buffer
+    ::strerror_r(errno, buf, sizeof(buf));
+    return std::string(buf)
+#endif
+}
+
+namespace {
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+    if (result < 0) throw io_error(msg + error_str());
+    return result;
+}
+
+void gai_check(int result, const std::string& msg="") {
+    if (result) throw io_error(msg + gai_strerror(result));
+}
+
+}
+
+void engine::init() {
+    check(fcntl(socket_, F_SETFL, fcntl(socket_, F_GETFL, 0) | O_NONBLOCK), "set nonblock: ");
+}
+
+engine::engine(descriptor fd, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(fd)
+{
+    init();
+}
+
+engine::engine(const url& u, handler& h, const connection_options& opts)
+    : connection_engine(h, opts), socket_(connect(u))
+{
+    init();
+}
+
+engine::~engine() {}
+
+void engine::read() {
+    mutable_buffer rbuf = read_buffer();
+    if (rbuf.size > 0) {
+        ssize_t n = ::read(socket_, rbuf.data, rbuf.size);
+        if (n > 0)
+            read_done(n);
+        else if (n == 0)
+            read_close();
+        else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
+            close("io_error", error_str());
+    }
+}
+
+void engine::write() {
+    const_buffer wbuf = write_buffer();
+    if (wbuf.size > 0) {
+        ssize_t n = ::write(socket_, wbuf.data, wbuf.size);
+        if (n > 0)
+            write_done(n);
+        else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+            close("io_error", error_str());
+        }
+    }
+}
+
+void engine::run() {
+    while (dispatch()) {
+        fd_set rd, wr;
+        FD_ZERO(&rd);
+        if (read_buffer().size)
+            FD_SET(socket_, &rd);
+        FD_ZERO(&wr);
+        if (write_buffer().size)
+            FD_SET(socket_, &wr);
+        int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
+        if (n < 0) {
+            close("select: ", error_str());
+            break;
+        }
+        if (FD_ISSET(socket_, &rd)) {
+            read();
+        }
+        if (FD_ISSET(socket_, &wr))
+            write();
+    }
+    ::close(socket_);
+}
+
+namespace {
+struct auto_addrinfo {
+    struct addrinfo *ptr;
+    auto_addrinfo() : ptr(0) {}
+    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
+    addrinfo* operator->() const { return ptr; }
+};
+}
+
+descriptor connect(const proton::url& u) {
+    descriptor fd = INVALID_DESCRIPTOR;
+    try{
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(u.host().empty() ? 0 : u.host().c_str(),
+                                u.port().empty() ? 0 : u.port().c_str(),
+                                0, &addr.ptr), u.str()+": ");
+        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect: ");
+        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
+        return fd;
+    } catch (...) {
+        if (fd >= 0) close(fd);
+        throw;
+    }
+}
+
+listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_DESCRIPTOR) {
+    try {
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
+                  "listener address invalid: ");
+        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listen: ");
+        int yes = 1;
+        check(setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt: ");
+        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "bind: ");
+        check(::listen(socket_, 32), "listen: ");
+    } catch (...) {
+        if (socket_ >= 0) close(socket_);
+        throw;
+    }
+}
+
+listener::~listener() { ::close(socket_); }
+
+descriptor listener::accept(std::string& host_str, std::string& port_str) {
+    struct sockaddr_storage addr;
+    socklen_t size = sizeof(addr);
+    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
+    char host[NI_MAXHOST], port[NI_MAXSERV];
+    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
+                          host, sizeof(host), port, sizeof(port), 0),
+              "accept invalid remote address: ");
+    host_str = host;
+    port_str = port;
+    return fd;
+}
+
+// Empty stubs, only needed on windows.
+void initialize() {}
+void finalize() {}
+
+}}}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-proton git commit: PROTON-1161: c++: drop use of on_start by connection_engine.

Posted by ac...@apache.org.
PROTON-1161: c++: drop use of on_start by connection_engine.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/38a71ffe
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/38a71ffe
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/38a71ffe

Branch: refs/heads/master
Commit: 38a71ffe5fd34de17b14564d7f121bd9594c5e94
Parents: 222574e
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 23 13:58:23 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 23 14:20:07 2016 -0400

----------------------------------------------------------------------
 examples/cpp/engine/broker.cpp                     | 3 +--
 examples/cpp/engine/client.cpp                     | 3 +--
 examples/cpp/engine/direct_recv.cpp                | 4 ----
 examples/cpp/engine/direct_send.cpp                | 4 ----
 examples/cpp/engine/helloworld.cpp                 | 3 +--
 examples/cpp/engine/server.cpp                     | 3 +--
 examples/cpp/engine/simple_recv.cpp                | 3 +--
 examples/cpp/engine/simple_send.cpp                | 3 +--
 proton-c/bindings/cpp/include/proton/io/socket.hpp | 4 ++--
 proton-c/bindings/cpp/src/io/connection_engine.cpp | 9 ---------
 proton-c/bindings/cpp/src/io/posix/socket.cpp      | 1 +
 proton-c/bindings/cpp/src/io/windows/socket.cpp    | 1 +
 12 files changed, 10 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/examples/cpp/engine/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/broker.cpp b/examples/cpp/engine/broker.cpp
index 698d795..78bcbb4 100644
--- a/examples/cpp/engine/broker.cpp
+++ b/examples/cpp/engine/broker.cpp
@@ -127,8 +127,7 @@ int main(int argc, char **argv) {
 #include "proton/container.hpp"
 #include "proton/value.hpp"
 
-
-#include "fake_cpp11.hpp"
+#include "../fake_cpp11.hpp"
 
 class broker {
   public:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/examples/cpp/engine/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp
index 941ca75..ac3ec08 100644
--- a/examples/cpp/engine/client.cpp
+++ b/examples/cpp/engine/client.cpp
@@ -41,8 +41,7 @@ class client : public proton::handler {
   public:
     client(const proton::url &u, const std::vector<std::string>& r) : url(u), requests(r) {}
 
-    void on_start(proton::event &e) override {
-        e.connection().open();
+    void on_connection_open(proton::event &e) override {
         sender = e.connection().open_sender(url.path());
         receiver = e.connection().open_receiver("", proton::link_options().dynamic_address(true));
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/examples/cpp/engine/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_recv.cpp b/examples/cpp/engine/direct_recv.cpp
index 51f6572..1a2f138 100644
--- a/examples/cpp/engine/direct_recv.cpp
+++ b/examples/cpp/engine/direct_recv.cpp
@@ -41,10 +41,6 @@ class direct_recv : public proton::handler {
   public:
     direct_recv(int c) : expected(c), received(0) {}
 
-    void on_start(proton::event &e) override {
-        e.connection().open();
-    }
-
     void on_message(proton::event &e) override {
         proton::message& msg = e.message();
         if (msg.id().get<uint64_t>() < received)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/examples/cpp/engine/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_send.cpp b/examples/cpp/engine/direct_send.cpp
index 4d7be72..a79136f 100644
--- a/examples/cpp/engine/direct_send.cpp
+++ b/examples/cpp/engine/direct_send.cpp
@@ -42,10 +42,6 @@ class simple_send : public proton::handler {
   public:
     simple_send(int c) : sent(0), confirmed(0), total(c) {}
 
-    void on_start(proton::event &e) override {
-        e.connection().open();
-    }
-
     void on_sendable(proton::event &e) override {
         proton::sender sender = e.sender();
         while (sender.credit() && sent < total) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/examples/cpp/engine/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/helloworld.cpp b/examples/cpp/engine/helloworld.cpp
index 4bb0ed3..7d9f507 100644
--- a/examples/cpp/engine/helloworld.cpp
+++ b/examples/cpp/engine/helloworld.cpp
@@ -35,8 +35,7 @@ class hello_world : public proton::handler {
   public:
     hello_world(const std::string& address) : address_(address) {}
 
-    void on_start(proton::event &e) override {
-        e.connection().open();
+    void on_connection_open(proton::event &e) override {
         e.connection().open_receiver(address_);
         e.connection().open_sender(address_);
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/examples/cpp/engine/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/server.cpp b/examples/cpp/engine/server.cpp
index bdd1a73..9bb896f 100644
--- a/examples/cpp/engine/server.cpp
+++ b/examples/cpp/engine/server.cpp
@@ -45,8 +45,7 @@ class server : public proton::handler {
 
     server(const std::string &u) : url(u) {}
 
-    void on_start(proton::event &e) override {
-        e.connection().open();
+    void on_connection_open(proton::event &e) override {
         e.connection().open_receiver(url.path());
         std::cout << "server connected to " << url << std::endl;
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/examples/cpp/engine/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_recv.cpp b/examples/cpp/engine/simple_recv.cpp
index a081227..d5327bf 100644
--- a/examples/cpp/engine/simple_recv.cpp
+++ b/examples/cpp/engine/simple_recv.cpp
@@ -44,8 +44,7 @@ class simple_recv : public proton::handler {
 
     simple_recv(const std::string &s, int c) : url(s), expected(c), received(0) {}
 
-    void on_start(proton::event &e) override {
-        e.connection().open();
+    void on_connection_open(proton::event &e) override {
         receiver = e.connection().open_receiver(url.path());
         std::cout << "simple_recv listening on " << url << std::endl;
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/examples/cpp/engine/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_send.cpp b/examples/cpp/engine/simple_send.cpp
index f6c0318..5161852 100644
--- a/examples/cpp/engine/simple_send.cpp
+++ b/examples/cpp/engine/simple_send.cpp
@@ -44,8 +44,7 @@ class simple_send : public proton::handler {
 
     simple_send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {}
 
-    void on_start(proton::event &e) override {
-        e.connection().open();
+    void on_connection_open(proton::event &e) override {
         sender = e.connection().open_sender(url.path());
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/proton-c/bindings/cpp/include/proton/io/socket.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/socket.hpp b/proton-c/bindings/cpp/include/proton/io/socket.hpp
index a43c0ae..bcbcecf 100644
--- a/proton-c/bindings/cpp/include/proton/io/socket.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/socket.hpp
@@ -100,10 +100,10 @@ class listener {
 /// A \ref connection_engine with non-blocking socket IO.
 class engine : public connection_engine {
   public:
-    /// Wrap an open socket. Sets non-blocking mode.
+    /// Wrap an open socket. Does not automatically open the connection.
     PN_CPP_EXTERN engine(descriptor socket_, handler&, const connection_options& = connection_options());
 
-    /// Create socket engine connected to url.
+    /// Create socket engine connected to url, open the connection as a client.
     PN_CPP_EXTERN engine(const url&, handler&, const connection_options& = connection_options());
 
     PN_CPP_EXTERN ~engine();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index 5d8e5cc..9513110 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -109,15 +109,6 @@ bool connection_engine::dispatch() {
          e;
          e = pn_collector_peek(collector_.get()))
     {
-        switch (pn_event_type(e)) {
-          case PN_CONNECTION_INIT:
-            // FIXME aconway 2016-03-21: don't use START in connection handlers
-            // reserve it for containers.
-            proton_event(e, PN_REACTOR_INIT, 0).dispatch(h);
-            break;
-          default:
-            break;
-        }
         proton_event(e, pn_event_type(e), 0).dispatch(h);
         pn_collector_pop(collector_.get());
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/proton-c/bindings/cpp/src/io/posix/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/posix/socket.cpp b/proton-c/bindings/cpp/src/io/posix/socket.cpp
index 656a837..e275288 100644
--- a/proton-c/bindings/cpp/src/io/posix/socket.cpp
+++ b/proton-c/bindings/cpp/src/io/posix/socket.cpp
@@ -78,6 +78,7 @@ engine::engine(const url& u, handler& h, const connection_options& opts)
     : connection_engine(h, opts), socket_(connect(u))
 {
     init();
+    connection().open();
 }
 
 engine::~engine() {}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38a71ffe/proton-c/bindings/cpp/src/io/windows/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/windows/socket.cpp b/proton-c/bindings/cpp/src/io/windows/socket.cpp
index afd3b56..27660d8 100644
--- a/proton-c/bindings/cpp/src/io/windows/socket.cpp
+++ b/proton-c/bindings/cpp/src/io/windows/socket.cpp
@@ -93,6 +93,7 @@ engine::engine(const url& u, handler& h, const connection_options &opts)
     : connection_engine(h, opts), socket_(connect(u))
 {
     init();
+    connection().open();
 }
 
 engine::~engine() {}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org