You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/09/29 06:37:18 UTC

qpid-proton git commit: PROTON-1349: Windows iocp proactor additional changes for 0.18 Beta

Repository: qpid-proton
Updated Branches:
  refs/heads/master db3ee8284 -> 5105b6418


PROTON-1349: Windows iocp proactor additional changes for 0.18 Beta


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5105b641
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5105b641
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5105b641

Branch: refs/heads/master
Commit: 5105b641894bfe4dedd87d80d501a5c4601e720a
Parents: db3ee82
Author: Clifford Jansen <cl...@apache.org>
Authored: Thu Sep 28 23:37:23 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Thu Sep 28 23:37:23 2017 -0700

----------------------------------------------------------------------
 examples/c/CMakeLists.txt            | 17 +++++-----
 examples/c/thread.h                  | 53 +++++++++++++++++++++---------
 examples/cpp/CMakeLists.txt          |  4 ++-
 proton-c/CMakeLists.txt              |  1 -
 proton-c/bindings/cpp/CMakeLists.txt |  5 ++-
 proton-c/src/proactor/win_iocp.c     | 54 ++++++++++++++++++++++---------
 proton-c/src/tests/CMakeLists.txt    |  6 +---
 proton-c/src/tests/proactor.c        |  9 ++++++
 8 files changed, 101 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 9447ade..3bd5c6b 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -34,16 +34,15 @@ else()
   set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
 endif()
 
-if(WIN32)
-  message(STATUS "Windows IOCP proactor examples temporarily disabled for build")
-else()
-  foreach (name broker send receive direct send-abort)
-    add_executable(c-${name} ${name}.c)
-    target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
-    set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name})
-  endforeach()
-endif()
+foreach (name broker send receive direct send-abort)
+  add_executable(c-${name} ${name}.c)
+  target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
+  set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name})
+endforeach()
 
 set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
 
+# windows exclusion only for 0.18 beta
+if(NOT WIN32)
 add_test(c-example-tests ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)
+endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/examples/c/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/thread.h b/examples/c/thread.h
index 1bd5595..d96104a 100644
--- a/examples/c/thread.h
+++ b/examples/c/thread.h
@@ -22,23 +22,46 @@
 /* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
 
 #ifdef _WIN32
-
 #include <windows.h>
-#include <time.h>
-#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
 #include <process.h>
-#include <windows.h>
 
-#define pthread_function DWORD WINAPI
-#define pthread_function_return DWORD
-#define pthread_t HANDLE
-#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
-#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
-#define pthread_mutex_T HANDLE
-#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
-#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
-#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
-#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
+typedef struct {
+  HANDLE handle;
+  void *(*func)(void *);
+  void *arg;
+} pthread_t;
+
+static unsigned __stdcall pthread_run(void *thr0) {
+  pthread_t *t = (pthread_t *) thr0;
+  t->func(t->arg);
+  return 0;
+}
+
+static int pthread_create(pthread_t *t, void *unused, void *(*f)(void *), void *arg) {
+  t->handle = 0;
+  t->func = f;
+  t->arg = arg;
+  HANDLE th = (HANDLE) _beginthreadex(0, 0, &pthread_run, t, 0, 0);
+  if (th) {
+    t->handle = th;
+    return 0;
+  }
+  return -1;
+}
+
+static int pthread_join(pthread_t t, void **unused) {
+  if (t.handle) {
+    WaitForSingleObject(t.handle, INFINITE);
+    CloseHandle(t.handle);
+  }
+  return 0;
+}
+
+typedef CRITICAL_SECTION pthread_mutex_t;
+#define pthread_mutex_init(m, unused) InitializeCriticalSectionAndSpinCount(m, 4000)
+#define pthread_mutex_destroy(m) DeleteCriticalSection(m)
+#define pthread_mutex_lock(m) EnterCriticalSection(m)
+#define pthread_mutex_unlock(m) LeaveCriticalSection(m)
 
 #else
 
@@ -46,4 +69,4 @@
 
 #endif
 
-#endif /* thread.h */
+#endif  /* thread.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index a8d9d34..eeab787 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -73,10 +73,12 @@ endif()
 
 add_cpp_test(cpp-example-container ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleTest)
 
+# windows exclusion only for 0.18 beta 
+if (NOT WIN32)
 if (NOT SSL_IMPL STREQUAL none)
 add_cpp_test(cpp-example-container-ssl ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleSSLTest)
 endif()
-
+endif()
 
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index cb05ce5..921906e 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -529,7 +529,6 @@ endif()
 
 if (PROACTOR STREQUAL "iocp" OR (NOT PROACTOR AND NOT PROACTOR_OK))
   if(WIN32 AND NOT CYGWIN)
-    message(WARNING "Windows IOCP proactor will be built as a prototype but does not yet pass tests")
     set (PROACTOR_OK iocp)
     set (qpid-proton-proactor src/proactor/win_iocp.c src/proactor/proactor-internal.c)
     set_source_files_properties (${qpid-proton-proactor} PROPERTIES

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 9409d30..83f564f 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -207,4 +207,7 @@ add_cpp_test(scalar_test)
 add_cpp_test(value_test)
 add_cpp_test(container_test)
 add_cpp_test(url_test)
-add_cpp_test(reconnect_test)
+# windows exclusion only for 0.18 beta
+if(NOT WIN32)
+  add_cpp_test(reconnect_test)
+endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/src/proactor/win_iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/win_iocp.c b/proton-c/src/proactor/win_iocp.c
index 0ebaf90..36467ff 100644
--- a/proton-c/src/proactor/win_iocp.c
+++ b/proton-c/src/proactor/win_iocp.c
@@ -64,6 +64,7 @@
  */
 
 // TODO: make all code C++ or all C90-ish
+//       change INACTIVE to be from begin_close instead of zombie reap, to be more like Posix
 //       make the global write lock window much smaller
 //       2 exclusive write buffers per connection
 //       make the zombie processing thread safe
@@ -1515,7 +1516,7 @@ void pni_iocp_initialize(void *obj)
   iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
   assert(iocp->completion_port != NULL);
   iocp->zombie_list = pn_list(PN_OBJECT, 0);
-  iocp->iocp_trace = true;
+  iocp->iocp_trace = false;
 }
 
 void pni_iocp_finalize(void *obj)
@@ -1564,6 +1565,7 @@ class csguard {
     bool set_;
 };
 
+
 // Get string from error status
 std::string errno_str2(DWORD status) {
   char buf[512];
@@ -1813,7 +1815,7 @@ void do_complete(iocp_result_t *result) {
   switch (result->type) {
 
   case IOCP_ACCEPT:
-    /* accept is now processed inline to do in parallel except on teardown */
+    /* accept is now processed inline to do in parallel, except on teardown */
     assert(iocpd->closing);
     complete_accept((accept_result_t *) result, result->status);  // free's result and retires new_sock
     break;
@@ -1897,7 +1899,7 @@ VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ );
 class reaper {
   public:
     reaper(pn_proactor_t *p, CRITICAL_SECTION *wlock, iocp_t *iocp)
-      : iocp_(iocp), global_wlock_(wlock), timer_(NULL) {
+      : iocp_(iocp), global_wlock_(wlock), timer_(NULL), running(true) {
       InitializeCriticalSectionAndSpinCount(&lock_, 4000);
       timer_queue_ = CreateTimerQueue();
       if (!timer_queue_) {
@@ -1948,6 +1950,7 @@ class reaper {
 
     // Called when all competing threads have terminated except our own reap_check timer.
     void final_shutdown() {
+        running = false;
         DeleteTimerQueueEx(timer_queue_, INVALID_HANDLE_VALUE);
         // No pending or active timers from thread pool remain.  Truly single threaded now.
         pn_free((void *) iocp_); // calls pni_iocp_finalize(); cleans up all sockets, completions, completion port.
@@ -1965,13 +1968,14 @@ class reaper {
   private:
     void reap_timer() {
         // Call with lock
-        if (timer_)
+        if (timer_ || !running)
             return;
         pn_timestamp_t now = pn_i_now2();
         pni_zombie_check(iocp_, now);
         pn_timestamp_t zd = pni_zombie_deadline(iocp_);
-        if (zd && zd > now) {
-            if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, zd - now,
+        if (zd) {
+            DWORD tm = (zd > now) ? zd - now : 1;
+            if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, tm,
                                        0, WT_EXECUTEONLYONCE)) {
                 perror("CreateTimerQueueTimer");
                 abort();
@@ -1984,6 +1988,7 @@ class reaper {
     CRITICAL_SECTION *global_wlock_;
     HANDLE timer_queue_;
     HANDLE timer_;
+    bool running;
 };
 
 VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
@@ -2033,25 +2038,27 @@ static pn_event_t *log_event(void* p, pn_event_t *e) {
   return e;
 }
 
-static void psocket_error(psocket_t *ps, int err, const char* what) {
+static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
   if (ps->is_reaper)
     return;
   if (!ps->listener) {
     pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
-    pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
-                                what, ps->host, ps->port,
-                                errno_str2(err).c_str());
+    pni_proactor_set_cond(pn_transport_condition(driver->transport), what, ps->host, ps->port, msg);
     pn_connection_driver_close(driver);
   } else {
     pn_listener_t *l = as_listener(ps);
-    pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
-                        what, ps->host, ps->port,
-                        errno_str2(err).c_str());
+    pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg);
     listener_begin_close(l);
   }
 }
 
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+  psocket_error_str(ps, errno_str2(err).c_str(), what);
+}
+
+
+
 // ========================================================================
 // pconnection
 // ========================================================================
@@ -2168,6 +2175,17 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
   return NULL;
 }
 
+// Call after successful accept
+static void set_sock_names(pconnection_t *pc) {
+  // This works.  Note possible use of GetAcceptExSockaddrs()
+  pn_socket_t sock = pc->psocket.iocpd->socket;
+  socklen_t len = sizeof(pc->local.ss);
+  getsockname(sock, (struct sockaddr*)&pc->local.ss, &len);
+  len = sizeof(pc->remote.ss);
+  getpeername(sock, (struct sockaddr*)&pc->remote.ss, &len);
+}
+
+
 // Call with lock held when closing and transitioning away from working context
 static inline bool pconnection_can_free(pconnection_t *pc) {
   return pc->psocket.iocpd == NULL && pc->context.completion_ops == 0
@@ -2632,6 +2650,8 @@ static bool connect_step(pconnection_t *pc) {
           if (success || WSAGetLastError() == ERROR_IO_PENDING) {
             iocpd->ops_in_progress++;
             iocpd->active_completer = &pc->psocket;
+            // getpeername unreliable for outgoing connections, but we know it at this point
+            memcpy(&pc->remote.ss, ai->ai_addr, ai->ai_addrlen);
             return true;  // logic resumes at connect_step_done()
           }
           pn_free(result);
@@ -2660,6 +2680,8 @@ static void connect_step_done(pconnection_t *pc, connect_result_t *result) {
     pc->psocket.iocpd->write_closed = false;
     pc->psocket.iocpd->read_closed = false;
     if (pc->addrinfo) {
+      socklen_t len = sizeof(pc->local.ss);
+      getsockname(pc->psocket.iocpd->socket, (struct sockaddr*)&pc->local.ss, &len);
       freeaddrinfo(pc->addrinfo);
       pc->addrinfo = NULL;
     }
@@ -2671,6 +2693,7 @@ static void connect_step_done(pconnection_t *pc, connect_result_t *result) {
     // Connect failed, no IO started, i.e. no pending iocpd based events
     pc->context.proactor->reaper->fast_reap(iocpd);
     pc->psocket.iocpd = NULL;
+    memset(&pc->remote.ss, 0, sizeof(pc->remote.ss));
     // Is there a next connection target in the addrinfo to try?
     if (pc->ai && connect_step(pc)) {
       // Trying the next addrinfo possibility.  Will return here.
@@ -2700,7 +2723,6 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
   if (!pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo)) {
     pc->ai = pc->addrinfo;
     if (connect_step(pc)) {
-      g.release();
       return;
     }
   }
@@ -3153,6 +3175,7 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
       iocpdesc_t *conn_iocpd = accept_result->new_sock;
       pc->psocket.iocpd = conn_iocpd;
       conn_iocpd->active_completer =&pc->psocket;
+      set_sock_names(pc);
       pni_iocpdesc_start(conn_iocpd);
     }
 
@@ -3287,7 +3310,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
   {
     csguard g(&p->context.cslock);
     // Move the whole contexts list into a disconnecting state
-    pcontext_t *disconnecting_pcontexts = p->contexts;
+    disconnecting_pcontexts = p->contexts;
     p->contexts = NULL;
     // First pass: mark each pcontext as disconnecting and update global pending count.
     pcontext_t *ctx = disconnecting_pcontexts;
@@ -3331,7 +3354,6 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
         }
       }
     } else {
-
       assert(l);
       if (!ctx->closing) {
         if (cond) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index c562025..ce7d25e 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -56,11 +56,7 @@ pn_add_c_test (c-condition-tests condition.c)
 pn_add_c_test (c-connection-driver-tests connection_driver.c)
 
 if(HAS_PROACTOR)
-  if(WIN32)
-    message(STATUS "Windows IOCP proactor tests temporarily suspended")
-  else(WIN32)
-    pn_add_c_test (c-proactor-tests proactor.c)
-  endif(WIN32)
+  pn_add_c_test (c-proactor-tests proactor.c)
 
   if(WIN32)
     # set(path "$<TARGET_FILE_DIR:c-broker>;$<TARGET_FILE_DIR:qpid-proton>")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 3995c1d..be00f46 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -216,9 +216,14 @@ typedef struct test_listener_t {
 
 test_listener_t test_listen(test_proactor_t *tp, const char *host) {
   test_listener_t l = { test_port(host), pn_listener() };
+#if defined(_WIN32)
+   sock_close(l.port.sock);  // small chance another process will steal the port in Windows
+#endif
   pn_proactor_listen(tp->proactor, l.listener, l.port.host_port, 4);
   TEST_ETYPE_EQUAL(tp->handler.t, PN_LISTENER_OPEN, test_proactors_run(tp, 1));
+#if !defined(_WIN32)
   sock_close(l.port.sock);
+#endif
   return l;
 }
 
@@ -686,7 +691,9 @@ static void test_ipv4_ipv6(test_t *t) {
   EXPECT_CONNECT(l.port, "");          /* local->all */
 
   if (has_ipv6) {
+#if !defined(_WIN32)
     EXPECT_CONNECT(l6.port, "::"); /* v6->v6 */
+#endif
     EXPECT_CONNECT(l6.port, "");     /* local->v6 */
     EXPECT_CONNECT(l.port, "::1"); /* v6->all */
 
@@ -1098,7 +1105,9 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
   RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
   RUN_ARGV_TEST(failed, t, test_release_free(&t));
+#if !defined(_WIN32)
   RUN_ARGV_TEST(failed, t, test_ssl(&t));
+#endif
   RUN_ARGV_TEST(failed, t, test_proactor_addr(&t));
   RUN_ARGV_TEST(failed, t, test_parse_addr(&t));
   RUN_ARGV_TEST(failed, t, test_netaddr(&t));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org