You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/09/29 06:37:18 UTC
qpid-proton git commit: PROTON-1349: Windows iocp proactor additional
changes for 0.18 Beta
Repository: qpid-proton
Updated Branches:
refs/heads/master db3ee8284 -> 5105b6418
PROTON-1349: Windows iocp proactor additional changes for 0.18 Beta
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5105b641
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5105b641
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5105b641
Branch: refs/heads/master
Commit: 5105b641894bfe4dedd87d80d501a5c4601e720a
Parents: db3ee82
Author: Clifford Jansen <cl...@apache.org>
Authored: Thu Sep 28 23:37:23 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Thu Sep 28 23:37:23 2017 -0700
----------------------------------------------------------------------
examples/c/CMakeLists.txt | 17 +++++-----
examples/c/thread.h | 53 +++++++++++++++++++++---------
examples/cpp/CMakeLists.txt | 4 ++-
proton-c/CMakeLists.txt | 1 -
proton-c/bindings/cpp/CMakeLists.txt | 5 ++-
proton-c/src/proactor/win_iocp.c | 54 ++++++++++++++++++++++---------
proton-c/src/tests/CMakeLists.txt | 6 +---
proton-c/src/tests/proactor.c | 9 ++++++
8 files changed, 101 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 9447ade..3bd5c6b 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -34,16 +34,15 @@ else()
set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
endif()
-if(WIN32)
- message(STATUS "Windows IOCP proactor examples temporarily disabled for build")
-else()
- foreach (name broker send receive direct send-abort)
- add_executable(c-${name} ${name}.c)
- target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
- set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name})
- endforeach()
-endif()
+foreach (name broker send receive direct send-abort)
+ add_executable(c-${name} ${name}.c)
+ target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
+ set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name})
+endforeach()
set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
+# windows exclusion only for 0.18 beta
+if(NOT WIN32)
add_test(c-example-tests ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)
+endif()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/examples/c/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/thread.h b/examples/c/thread.h
index 1bd5595..d96104a 100644
--- a/examples/c/thread.h
+++ b/examples/c/thread.h
@@ -22,23 +22,46 @@
/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
#ifdef _WIN32
-
#include <windows.h>
-#include <time.h>
-#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
#include <process.h>
-#include <windows.h>
-#define pthread_function DWORD WINAPI
-#define pthread_function_return DWORD
-#define pthread_t HANDLE
-#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
-#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
-#define pthread_mutex_T HANDLE
-#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
-#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
-#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
-#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
+typedef struct {
+ HANDLE handle;
+ void *(*func)(void *);
+ void *arg;
+} pthread_t;
+
+static unsigned __stdcall pthread_run(void *thr0) {
+ pthread_t *t = (pthread_t *) thr0;
+ t->func(t->arg);
+ return 0;
+}
+
+static int pthread_create(pthread_t *t, void *unused, void *(*f)(void *), void *arg) {
+ t->handle = 0;
+ t->func = f;
+ t->arg = arg;
+ HANDLE th = (HANDLE) _beginthreadex(0, 0, &pthread_run, t, 0, 0);
+ if (th) {
+ t->handle = th;
+ return 0;
+ }
+ return -1;
+}
+
+static int pthread_join(pthread_t t, void **unused) {
+ if (t.handle) {
+ WaitForSingleObject(t.handle, INFINITE);
+ CloseHandle(t.handle);
+ }
+ return 0;
+}
+
+typedef CRITICAL_SECTION pthread_mutex_t;
+#define pthread_mutex_init(m, unused) InitializeCriticalSectionAndSpinCount(m, 4000)
+#define pthread_mutex_destroy(m) DeleteCriticalSection(m)
+#define pthread_mutex_lock(m) EnterCriticalSection(m)
+#define pthread_mutex_unlock(m) LeaveCriticalSection(m)
#else
@@ -46,4 +69,4 @@
#endif
-#endif /* thread.h */
+#endif /* thread.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index a8d9d34..eeab787 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -73,10 +73,12 @@ endif()
add_cpp_test(cpp-example-container ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleTest)
+# windows exclusion only for 0.18 beta
+if (NOT WIN32)
if (NOT SSL_IMPL STREQUAL none)
add_cpp_test(cpp-example-container-ssl ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleSSLTest)
endif()
-
+endif()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index cb05ce5..921906e 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -529,7 +529,6 @@ endif()
if (PROACTOR STREQUAL "iocp" OR (NOT PROACTOR AND NOT PROACTOR_OK))
if(WIN32 AND NOT CYGWIN)
- message(WARNING "Windows IOCP proactor will be built as a prototype but does not yet pass tests")
set (PROACTOR_OK iocp)
set (qpid-proton-proactor src/proactor/win_iocp.c src/proactor/proactor-internal.c)
set_source_files_properties (${qpid-proton-proactor} PROPERTIES
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 9409d30..83f564f 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -207,4 +207,7 @@ add_cpp_test(scalar_test)
add_cpp_test(value_test)
add_cpp_test(container_test)
add_cpp_test(url_test)
-add_cpp_test(reconnect_test)
+# windows exclusion only for 0.18 beta
+if(NOT WIN32)
+ add_cpp_test(reconnect_test)
+endif()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/src/proactor/win_iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/win_iocp.c b/proton-c/src/proactor/win_iocp.c
index 0ebaf90..36467ff 100644
--- a/proton-c/src/proactor/win_iocp.c
+++ b/proton-c/src/proactor/win_iocp.c
@@ -64,6 +64,7 @@
*/
// TODO: make all code C++ or all C90-ish
+// change INACTIVE to be from begin_close instead of zombie reap, to be more like Posix
// make the global write lock window much smaller
// 2 exclusive write buffers per connection
// make the zombie processing thread safe
@@ -1515,7 +1516,7 @@ void pni_iocp_initialize(void *obj)
iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
assert(iocp->completion_port != NULL);
iocp->zombie_list = pn_list(PN_OBJECT, 0);
- iocp->iocp_trace = true;
+ iocp->iocp_trace = false;
}
void pni_iocp_finalize(void *obj)
@@ -1564,6 +1565,7 @@ class csguard {
bool set_;
};
+
// Get string from error status
std::string errno_str2(DWORD status) {
char buf[512];
@@ -1813,7 +1815,7 @@ void do_complete(iocp_result_t *result) {
switch (result->type) {
case IOCP_ACCEPT:
- /* accept is now processed inline to do in parallel except on teardown */
+ /* accept is now processed inline to do in parallel, except on teardown */
assert(iocpd->closing);
complete_accept((accept_result_t *) result, result->status); // free's result and retires new_sock
break;
@@ -1897,7 +1899,7 @@ VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ );
class reaper {
public:
reaper(pn_proactor_t *p, CRITICAL_SECTION *wlock, iocp_t *iocp)
- : iocp_(iocp), global_wlock_(wlock), timer_(NULL) {
+ : iocp_(iocp), global_wlock_(wlock), timer_(NULL), running(true) {
InitializeCriticalSectionAndSpinCount(&lock_, 4000);
timer_queue_ = CreateTimerQueue();
if (!timer_queue_) {
@@ -1948,6 +1950,7 @@ class reaper {
// Called when all competing threads have terminated except our own reap_check timer.
void final_shutdown() {
+ running = false;
DeleteTimerQueueEx(timer_queue_, INVALID_HANDLE_VALUE);
// No pending or active timers from thread pool remain. Truly single threaded now.
pn_free((void *) iocp_); // calls pni_iocp_finalize(); cleans up all sockets, completions, completion port.
@@ -1965,13 +1968,14 @@ class reaper {
private:
void reap_timer() {
// Call with lock
- if (timer_)
+ if (timer_ || !running)
return;
pn_timestamp_t now = pn_i_now2();
pni_zombie_check(iocp_, now);
pn_timestamp_t zd = pni_zombie_deadline(iocp_);
- if (zd && zd > now) {
- if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, zd - now,
+ if (zd) {
+ DWORD tm = (zd > now) ? zd - now : 1;
+ if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, tm,
0, WT_EXECUTEONLYONCE)) {
perror("CreateTimerQueueTimer");
abort();
@@ -1984,6 +1988,7 @@ class reaper {
CRITICAL_SECTION *global_wlock_;
HANDLE timer_queue_;
HANDLE timer_;
+ bool running;
};
VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
@@ -2033,25 +2038,27 @@ static pn_event_t *log_event(void* p, pn_event_t *e) {
return e;
}
-static void psocket_error(psocket_t *ps, int err, const char* what) {
+static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
if (ps->is_reaper)
return;
if (!ps->listener) {
pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
pn_connection_driver_bind(driver); /* Bind so errors will be reported */
- pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
- what, ps->host, ps->port,
- errno_str2(err).c_str());
+ pni_proactor_set_cond(pn_transport_condition(driver->transport), what, ps->host, ps->port, msg);
pn_connection_driver_close(driver);
} else {
pn_listener_t *l = as_listener(ps);
- pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
- what, ps->host, ps->port,
- errno_str2(err).c_str());
+ pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg);
listener_begin_close(l);
}
}
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+ psocket_error_str(ps, errno_str2(err).c_str(), what);
+}
+
+
+
// ========================================================================
// pconnection
// ========================================================================
@@ -2168,6 +2175,17 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
return NULL;
}
+// Call after successful accept
+static void set_sock_names(pconnection_t *pc) {
+ // This works. Note possible use of GetAcceptExSockaddrs()
+ pn_socket_t sock = pc->psocket.iocpd->socket;
+ socklen_t len = sizeof(pc->local.ss);
+ getsockname(sock, (struct sockaddr*)&pc->local.ss, &len);
+ len = sizeof(pc->remote.ss);
+ getpeername(sock, (struct sockaddr*)&pc->remote.ss, &len);
+}
+
+
// Call with lock held when closing and transitioning away from working context
static inline bool pconnection_can_free(pconnection_t *pc) {
return pc->psocket.iocpd == NULL && pc->context.completion_ops == 0
@@ -2632,6 +2650,8 @@ static bool connect_step(pconnection_t *pc) {
if (success || WSAGetLastError() == ERROR_IO_PENDING) {
iocpd->ops_in_progress++;
iocpd->active_completer = &pc->psocket;
+ // getpeername unreliable for outgoing connections, but we know it at this point
+ memcpy(&pc->remote.ss, ai->ai_addr, ai->ai_addrlen);
return true; // logic resumes at connect_step_done()
}
pn_free(result);
@@ -2660,6 +2680,8 @@ static void connect_step_done(pconnection_t *pc, connect_result_t *result) {
pc->psocket.iocpd->write_closed = false;
pc->psocket.iocpd->read_closed = false;
if (pc->addrinfo) {
+ socklen_t len = sizeof(pc->local.ss);
+ getsockname(pc->psocket.iocpd->socket, (struct sockaddr*)&pc->local.ss, &len);
freeaddrinfo(pc->addrinfo);
pc->addrinfo = NULL;
}
@@ -2671,6 +2693,7 @@ static void connect_step_done(pconnection_t *pc, connect_result_t *result) {
// Connect failed, no IO started, i.e. no pending iocpd based events
pc->context.proactor->reaper->fast_reap(iocpd);
pc->psocket.iocpd = NULL;
+ memset(&pc->remote.ss, 0, sizeof(pc->remote.ss));
// Is there a next connection target in the addrinfo to try?
if (pc->ai && connect_step(pc)) {
// Trying the next addrinfo possibility. Will return here.
@@ -2700,7 +2723,6 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
if (!pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo)) {
pc->ai = pc->addrinfo;
if (connect_step(pc)) {
- g.release();
return;
}
}
@@ -3153,6 +3175,7 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
iocpdesc_t *conn_iocpd = accept_result->new_sock;
pc->psocket.iocpd = conn_iocpd;
conn_iocpd->active_completer =&pc->psocket;
+ set_sock_names(pc);
pni_iocpdesc_start(conn_iocpd);
}
@@ -3287,7 +3310,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
{
csguard g(&p->context.cslock);
// Move the whole contexts list into a disconnecting state
- pcontext_t *disconnecting_pcontexts = p->contexts;
+ disconnecting_pcontexts = p->contexts;
p->contexts = NULL;
// First pass: mark each pcontext as disconnecting and update global pending count.
pcontext_t *ctx = disconnecting_pcontexts;
@@ -3331,7 +3354,6 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
}
}
} else {
-
assert(l);
if (!ctx->closing) {
if (cond) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index c562025..ce7d25e 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -56,11 +56,7 @@ pn_add_c_test (c-condition-tests condition.c)
pn_add_c_test (c-connection-driver-tests connection_driver.c)
if(HAS_PROACTOR)
- if(WIN32)
- message(STATUS "Windows IOCP proactor tests temporarily suspended")
- else(WIN32)
- pn_add_c_test (c-proactor-tests proactor.c)
- endif(WIN32)
+ pn_add_c_test (c-proactor-tests proactor.c)
if(WIN32)
# set(path "$<TARGET_FILE_DIR:c-broker>;$<TARGET_FILE_DIR:qpid-proton>")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 3995c1d..be00f46 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -216,9 +216,14 @@ typedef struct test_listener_t {
test_listener_t test_listen(test_proactor_t *tp, const char *host) {
test_listener_t l = { test_port(host), pn_listener() };
+#if defined(_WIN32)
+ sock_close(l.port.sock); // small chance another process will steal the port in Windows
+#endif
pn_proactor_listen(tp->proactor, l.listener, l.port.host_port, 4);
TEST_ETYPE_EQUAL(tp->handler.t, PN_LISTENER_OPEN, test_proactors_run(tp, 1));
+#if !defined(_WIN32)
sock_close(l.port.sock);
+#endif
return l;
}
@@ -686,7 +691,9 @@ static void test_ipv4_ipv6(test_t *t) {
EXPECT_CONNECT(l.port, ""); /* local->all */
if (has_ipv6) {
+#if !defined(_WIN32)
EXPECT_CONNECT(l6.port, "::"); /* v6->v6 */
+#endif
EXPECT_CONNECT(l6.port, ""); /* local->v6 */
EXPECT_CONNECT(l.port, "::1"); /* v6->all */
@@ -1098,7 +1105,9 @@ int main(int argc, char **argv) {
RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
RUN_ARGV_TEST(failed, t, test_release_free(&t));
+#if !defined(_WIN32)
RUN_ARGV_TEST(failed, t, test_ssl(&t));
+#endif
RUN_ARGV_TEST(failed, t, test_proactor_addr(&t));
RUN_ARGV_TEST(failed, t, test_parse_addr(&t));
RUN_ARGV_TEST(failed, t, test_netaddr(&t));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org