You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/07 14:33:31 UTC

[1/4] qpid-proton git commit: PROTON-1495: c libuv proactor bug with multiple listeners

Repository: qpid-proton
Updated Branches:
  refs/heads/master 54645b793 -> cb5996ee5


PROTON-1495: c libuv proactor bug with multiple listeners

Listener was incorrectly overwriting the uv_tcp_t struct in an lsocket_t if it
there was a resolve failure. Once uv_tcp_init is called the uv_tcp_t must be
uv_closed and wait for the on_close() callback before freeing or re-using the memory.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4806000
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4806000
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4806000

Branch: refs/heads/master
Commit: d4806000b3eb7e6b556c07819dd642ed9e0a3668
Parents: 54645b7
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Jun 5 12:50:19 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 5 13:27:47 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/libuv.c | 92 +++++++++++++++++++++-----------------
 1 file changed, 52 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4806000/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 8cd6dd7..e632f10 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -149,6 +149,7 @@ typedef struct lsocket_t {
   struct_type type;             /* Always T_LSOCKET */
   pn_listener_t *parent;
   uv_tcp_t tcp;
+  struct lsocket_t *next;
 } lsocket_t;
 
 PN_STRUCT_CLASSDEF(lsocket, CID_pn_listener_socket)
@@ -193,7 +194,7 @@ typedef enum {
   L_UNINIT,                     /**<< Not yet listening */
   L_LISTENING,                  /**<< Listening */
   L_CLOSE,                      /**<< Close requested  */
-  L_CLOSING,                    /**<< Socket close initiated, wait for close */
+  L_CLOSING,                    /**<< Socket close initiated, wait for all to close */
   L_CLOSED                      /**<< User saw PN_LISTENER_CLOSED, all done  */
 } listener_state;
 
@@ -201,10 +202,6 @@ typedef enum {
 struct pn_listener_t {
   work_t work;                  /* Must be first to allow casting */
 
-  size_t nsockets;
-  lsocket_t *sockets;
-  lsocket_t prealloc[1];       /* Pre-allocated socket array, allocate larger if needed */
-
   /* Only used by owner thread */
   pn_event_batch_t batch;
   pn_record_t *attachments;
@@ -213,6 +210,7 @@ struct pn_listener_t {
 
   /* Only used by leader */
   addr_t addr;
+  lsocket_t *lsockets;
 
   /* Locked for thread-safe access. uv_listen can't be stopped or cancelled so we can't
    * detach a listener from the UV loop to prevent concurrent access.
@@ -424,10 +422,15 @@ static void listener_close_lh(pn_listener_t* l) {
 static void on_close_lsocket(uv_handle_t *h) {
   lsocket_t* ls = (lsocket_t*)h->data;
   pn_listener_t *l = ls->parent;
-  uv_mutex_lock(&l->lock);
-  --l->nsockets;
-  listener_close_lh(l);
-  uv_mutex_unlock(&l->lock);
+  if (l) {
+    /* Remove from list */
+    lsocket_t **pp = &l->lsockets;
+    for (; *pp != ls; pp = &(*pp)->next)
+      ;
+    *pp = ls->next;
+    work_notify(&l->work);
+  }
+  free(ls);
 }
 
 static pconnection_t *get_pconnection(pn_connection_t* c) {
@@ -611,16 +614,27 @@ static bool leader_connect(pconnection_t *pc) {
   }
 }
 
-static int lsocket_init(lsocket_t *ls, pn_listener_t *l, struct addrinfo *ai) {
+static int lsocket(pn_listener_t *l, struct addrinfo *ai) {
+  lsocket_t *ls = (lsocket_t*)calloc(1, sizeof(lsocket_t));
   ls->type = T_LSOCKET;
-  ls->parent = l;
   ls->tcp.data = ls;
+  ls->parent = NULL;
+  ls->next = NULL;
   int err = uv_tcp_init(&l->work.proactor->loop, &ls->tcp);
-  if (!err) {
+  if (err) {
+    free(ls);                   /* Will never be closed */
+  } else {
     int flags = (ai->ai_family == AF_INET6) ? UV_TCP_IPV6ONLY : 0;
     err = uv_tcp_bind(&ls->tcp, ai->ai_addr, flags);
     if (!err) err = uv_listen((uv_stream_t*)&ls->tcp, l->backlog, on_connection);
-    if (err) uv_close((uv_handle_t*)&ls->tcp, NULL);
+    if (!err) {
+      /* Add to l->lsockets list */
+      ls->parent = l;
+      ls->next = l->lsockets;
+      l->lsockets = ls;
+    } else {
+      uv_close((uv_handle_t*)&ls->tcp, on_close_lsocket); /* Freed by on_close_lsocket */
+    }
   }
   return err;
 }
@@ -632,28 +646,18 @@ static void leader_listen_lh(pn_listener_t *l) {
   leader_inc(l->work.proactor);
   int err = leader_resolve(l->work.proactor, &l->addr, true);
   if (!err) {
-    /* Count addresses, allocate enough space */
-    size_t len = 0;
-    for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) {
-      ++len;
-    }
-    assert(len > 0);            /* Guaranteed by getaddrinfo() */
-    l->sockets = (len > ARRAY_LEN(l->prealloc)) ? (lsocket_t*)calloc(len, sizeof(lsocket_t)) : l->prealloc;
     /* Find the working addresses */
-    l->nsockets = 0;
-    int first_err = 0;
     for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) {
-      lsocket_t *ls = &l->sockets[l->nsockets];
-      int err2 = lsocket_init(ls, l, ai);
-      if (!err2) {
-        ++l->nsockets;                    /* Next socket */
-      } else if (!first_err) {
-        first_err = err2;
+      int err2 = lsocket(l, ai);
+      if (err2) {
+        err = err2;
       }
     }
     uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo);
     l->addr.getaddrinfo.addrinfo = NULL;
-    if (l->nsockets == 0) err = first_err;
+    if (l->lsockets) {    /* Ignore errors if we got at least one good listening socket */
+      err = 0;
+    }
   }
   /* Always put an OPEN event for symmetry, even if we immediately close with err */
   pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
@@ -670,7 +674,11 @@ void pn_listener_free(pn_listener_t *l) {
     if (l->collector) pn_collector_free(l->collector);
     if (l->condition) pn_condition_free(l->condition);
     if (l->attachments) pn_free(l->attachments);
-    if (l->sockets && l->sockets != l->prealloc) free(l->sockets);
+    while (l->lsockets) {
+      lsocket_t *ls = l->lsockets;
+      l->lsockets = ls->next;
+      free(ls);
+    }
     assert(!l->accept.front);
     free(l);
   }
@@ -707,13 +715,13 @@ static bool leader_process_listener(pn_listener_t *l) {
 
    case L_CLOSE:                /* Close requested, start closing lsockets */
     l->state = L_CLOSING;
-    for (size_t i = 0; i < l->nsockets; ++i) {
-      uv_safe_close((uv_handle_t*)&l->sockets[i].tcp, on_close_lsocket);
+    for (lsocket_t *ls = l->lsockets; ls; ls = ls->next) {
+      uv_safe_close((uv_handle_t*)&ls->tcp, on_close_lsocket);
     }
     /* NOTE: Fall through in case we have 0 sockets - e.g. resolver error */
 
    case L_CLOSING:              /* Closing - can we send PN_LISTENER_CLOSE? */
-    if (l->nsockets == 0) {
+    if (!l->lsockets) {
       l->state = L_CLOSED;
       pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
     }
@@ -940,9 +948,11 @@ static void on_proactor_disconnect(uv_handle_t* h, void* v) {
      }
      case T_LSOCKET: {
        pn_listener_t *l = ((lsocket_t*)h->data)->parent;
-       pn_condition_t *cond = l->work.proactor->disconnect_cond;
-       if (cond) {
-         pn_condition_copy(pn_listener_condition(l), cond);
+       if (l) {
+         pn_condition_t *cond = l->work.proactor->disconnect_cond;
+         if (cond) {
+           pn_condition_copy(pn_listener_condition(l), cond);
+         }
        }
        pn_listener_close(l);
        break;
@@ -995,7 +1005,7 @@ static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
     }
   }
   batch = get_batch_lh(p);      /* Check for work */
-  if (!batch) { /* No work, run the UV loop */
+  if (!batch) {                 /* No work, run the UV loop */
     uv_mutex_unlock(&p->lock);  /* Unlock to run UV loop */
     uv_run(&p->loop, mode);
     uv_mutex_lock(&p->lock);
@@ -1142,8 +1152,10 @@ static void on_proactor_free(uv_handle_t* h, void* v) {
   if (h->type == UV_TCP) {      /* Put the corresponding work item on the leader_q for cleanup */
     work_t *w = NULL;
     switch (*(struct_type*)h->data) {
-     case T_CONNECTION: w = (work_t*)h->data; break;
-     case T_LSOCKET: w = &((lsocket_t*)h->data)->parent->work; break;
+     case T_CONNECTION:
+      w = (work_t*)h->data; break;
+     case T_LSOCKET:
+      w = &((lsocket_t*)h->data)->parent->work; break;
      default: break;
     }
     if (w && w->next == work_unqueued) {
@@ -1317,5 +1329,5 @@ int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) {
 }
 
 pn_millis_t pn_proactor_now(void) {
-    return uv_hrtime() / 1000000; // uv_hrtime returns time in nanoseconds
+  return uv_hrtime() / 1000000; // uv_hrtime returns time in nanoseconds
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/4] qpid-proton git commit: PROTON-1495: move examples/exampletest.py to tools/py/proctest.py

Posted by ac...@apache.org.
PROTON-1495: move examples/exampletest.py to tools/py/proctest.py

It is not an example but part of our test framework.
Want to use it for non-example tests as well.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/08505261
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/08505261
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/08505261

Branch: refs/heads/master
Commit: 08505261f4008c43b2399cb4a72bd887bbadd7df
Parents: d480600
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 2 09:33:43 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 7 09:59:23 2017 -0400

----------------------------------------------------------------------
 config.sh.in                        |   2 +-
 examples/CMakeLists.txt             |   4 +-
 examples/c/proactor/example_test.py |   4 +-
 examples/cpp/example_test.py        |   4 +-
 examples/exampletest.py             | 193 -----------------------------
 tools/py/proctest.py                | 204 +++++++++++++++++++++++++++++++
 6 files changed, 211 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 72d4ea9..d9debd3 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -39,7 +39,7 @@ RUBY_BINDINGS=$PROTON_BINDINGS/ruby
 PERL_BINDINGS=$PROTON_BINDINGS/perl
 
 # Python
-COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python:$PROTON_HOME/examples
+COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python:$PROTON_HOME/tools/py
 export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS
 
 # PHP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 4d744d2..8a8327a 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -29,8 +29,8 @@ macro(set_search_path result)  # args after result are directories or search pat
   file(TO_NATIVE_PATH "${${result}}" ${result}) # native slash separators
 endmacro()
 
-# Some non-python examples use exampletest.py to drive their self-tests.
-set_search_path(EXAMPLE_PYTHONPATH "${CMAKE_CURRENT_SOURCE_DIR}" "$ENV{PYTHON_PATH}")
+# Add the tools directory for the 'proctest' module
+set_search_path(EXAMPLE_PYTHONPATH "${CMAKE_SOURCE_DIR}/tools/py" "$ENV{PYTHON_PATH}")
 set(EXAMPLE_ENV "PYTHONPATH=${EXAMPLE_PYTHONPATH}")
 
 add_subdirectory(c)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/examples/c/proactor/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/example_test.py b/examples/c/proactor/example_test.py
index 38f7fc8..02bb1fd 100644
--- a/examples/c/proactor/example_test.py
+++ b/examples/c/proactor/example_test.py
@@ -20,7 +20,7 @@
 # This is a test script to run the examples and verify that they behave as expected.
 
 import unittest, sys, time
-from exampletest import *
+from proctest import *
 
 def python_cmd(name):
     dir = os.path.dirname(__file__)
@@ -49,7 +49,7 @@ class Broker(object):
                 raise ProcError(b, "broker crash")
             b.kill()
 
-class CExampleTest(ExampleTestCase):
+class CExampleTest(ProcTestCase):
 
     def test_send_receive(self):
         """Send first then receive"""

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 5773142..0ae929c 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -21,7 +21,7 @@
 
 import unittest
 import os, sys, socket, time, re, inspect
-from exampletest import *
+from proctest import *
 from  random import randrange
 from subprocess import Popen, PIPE, STDOUT, call
 from copy import copy
@@ -82,7 +82,7 @@ def ensureCanTestExtendedSASL():
     raise Skipped("Can't Test Extended SASL: Couldn't create auth db")
 
 
-class BrokerTestCase(ExampleTestCase):
+class BrokerTestCase(ProcTestCase):
     """
     ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
     Subclasses must set `broker_exe` class variable with the name of the broker executable.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
deleted file mode 100644
index 5c53616..0000000
--- a/examples/exampletest.py
+++ /dev/null
@@ -1,193 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License
-#
-
-# A test library to make it easy to run unittest tests that start,
-# monitor, and report output from sub-processes. In particular
-# it helps with starting processes that listen on random ports.
-
-import unittest
-import os, sys, socket, time, re, inspect, errno, threading
-from  random import randrange
-from subprocess import Popen, PIPE, STDOUT
-from copy import copy
-import platform
-from os.path import dirname as dirname
-
-DEFAULT_TIMEOUT=10
-
-class TestPort(object):
-    """Get an unused port using bind(0) and SO_REUSEADDR and hold it till close()"""
-    def __init__(self):
-        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.sock.bind(('127.0.0.1', 0)) # Testing exampless is local only
-        self.host, self.port = socket.getnameinfo(self.sock.getsockname(), 0)
-        self.addr = "%s:%s" % (self.host, self.port)
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *args):
-        self.close()
-
-    def close(self):
-        self.sock.close()
-
-class ProcError(Exception):
-    """An exception that captures failed process output"""
-    def __init__(self, proc, what="bad exit status"):
-        out = proc.out.strip()
-        if out:
-            out = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % out
-        else:
-            out = ", no output)"
-        super(Exception, self, ).__init__(
-            "%s %s, code=%s%s" % (proc.args, what, getattr(proc, 'returncode', 'noreturn'), out))
-
-class NotFoundError(ProcError):
-    pass
-
-class Proc(Popen):
-    """A example process that stores its stdout and can scan it for a 'ready' pattern'"""
-
-    if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
-        vg_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"]
-    else:
-        vg_args = []
-
-    @property
-    def out(self):
-        self._out.seek(0)
-        # Normalize line endings, os.tmpfile() opens in binary mode.
-        return self._out.read().replace('\r\n','\n').replace('\r','\n')
-
-    def __init__(self, args, skip_valgrind=False, **kwargs):
-        """Start an example process"""
-        args = list(args)
-        if skip_valgrind:
-            self.args = args
-        else:
-            self.args = self.vg_args + args
-        self.kwargs = kwargs
-        self._out = os.tmpfile()
-        try:
-            Popen.__init__(self, self.args, stdout=self._out, stderr=STDOUT, **kwargs)
-        except OSError, e:
-            if e.errno == errno.ENOENT:
-                raise NotFoundError(self, str(e))
-            raise ProcError(self, str(e))
-        except Exception, e:
-            raise ProcError(self, str(e))
-
-    def kill(self):
-        try:
-            if self.poll() is None:
-                Popen.kill(self)
-        except:
-            pass                # Already exited.
-        return self.out
-
-    def wait_exit(self, timeout=DEFAULT_TIMEOUT, expect=0):
-        """Wait for process to exit, return output. Raise ProcError  on failure."""
-        t = threading.Thread(target=self.wait)
-        t.start()
-        t.join(timeout)
-        if self.poll() is None:      # Still running
-            self.kill()
-            raise ProcError(self, "still running after %ss" % timeout)
-        if expect is not None and self.poll() != expect:
-            raise ProcError(self)
-        return self.out
-
-    def wait_re(self, regexp, timeout=DEFAULT_TIMEOUT):
-        """
-        Wait for regexp to appear in the output, returns the re.search match result.
-        The target process should flush() important output to ensure it appears.
-        """
-        if timeout:
-            deadline = time.time() + timeout
-        while timeout is None or time.time() < deadline:
-            match = re.search(regexp, self.out)
-            if match:
-                return match
-            time.sleep(0.01)    # Not very efficient
-        raise ProcError(self, "gave up waiting for '%s' after %ss" % (regexp, timeout))
-
-def _tc_missing(attr):
-    return not hasattr(unittest.TestCase, attr)
-
-class TestCase(unittest.TestCase):
-    """
-    Roughly provides setUpClass() and tearDownClass() and other features missing
-    in python 2.6. If subclasses override setUp() or tearDown() they *must*
-    call the superclass.
-    """
-
-    if _tc_missing('setUpClass') and _tc_missing('tearDownClass'):
-
-        @classmethod
-        def setUpClass(cls):
-            pass
-
-        @classmethod
-        def tearDownClass(cls):
-            pass
-
-        def setUp(self):
-            super(TestCase, self).setUp()
-            cls = type(self)
-            if not hasattr(cls, '_setup_class_count'): # First time
-                def is_test(m):
-                    return inspect.ismethod(m) and m.__name__.startswith('test_')
-                cls._setup_class_count = len(inspect.getmembers(cls, predicate=is_test))
-                cls.setUpClass()
-
-        def tearDown(self):
-            self.assertTrue(self._setup_class_count > 0)
-            self._setup_class_count -=  1
-            if self._setup_class_count == 0:
-                type(self).tearDownClass()
-            super(TestCase, self).tearDown()
-
-    if _tc_missing('assertIn'):
-        def assertIn(self, a, b):
-            self.assertTrue(a in b, "%r not in %r" % (a, b))
-
-    if _tc_missing('assertMultiLineEqual'):
-        def assertMultiLineEqual(self, a, b):
-            self.assertEqual(a, b)
-
-class ExampleTestCase(TestCase):
-    """TestCase that manages started processes"""
-    def setUp(self):
-        super(ExampleTestCase, self).setUp()
-        self.procs = []
-
-    def tearDown(self):
-        for p in self.procs:
-            p.kill()
-        super(ExampleTestCase, self).tearDown()
-
-    def proc(self, *args, **kwargs):
-        p = Proc(*args, **kwargs)
-        self.procs.append(p)
-        return p
-
-if __name__ == "__main__":
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/tools/py/proctest.py
----------------------------------------------------------------------
diff --git a/tools/py/proctest.py b/tools/py/proctest.py
new file mode 100644
index 0000000..ba83c7d
--- /dev/null
+++ b/tools/py/proctest.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+"""Unit test library to simplify tests that start, monitor, check and report
+output from sub-processes. Provides safe port allocation for processes that
+listen on a port. Allows executables to be run under a debugging tool like
+valgrind.
+"""
+
+import unittest
+import os, sys, socket, time, re, inspect, errno, threading
+from  random import randrange
+from subprocess import Popen, PIPE, STDOUT
+from copy import copy
+import platform
+from os.path import dirname as dirname
+
+DEFAULT_TIMEOUT=10
+
+class TestPort(object):
+    """Get an unused port using bind(0) and SO_REUSEADDR and hold it till close()
+    Can be used as `with TestPort() as tp:` Provides tp.host, tp.port and tp.addr
+    (a "host:port" string)
+    """
+    def __init__(self):
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.sock.bind(('127.0.0.1', 0)) # Testing exampless is local only
+        self.host, self.port = socket.getnameinfo(self.sock.getsockname(), 0)
+        self.addr = "%s:%s" % (self.host, self.port)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def close(self):
+        self.sock.close()
+
+class ProcError(Exception):
+    """An exception that displays failed process output"""
+    def __init__(self, proc, what="bad exit status"):
+        self.out = proc.out.strip()
+        if self.out:
+            msgtail = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % self.out
+        else:
+            msgtail = ", no output"
+        super(Exception, self, ).__init__(
+            "%s %s, code=%s%s" % (proc.args, what, getattr(proc, 'returncode', 'noreturn'), msgtail))
+
+class NotFoundError(ProcError):
+    pass
+
+class Proc(Popen):
+    """Subclass of suprocess.Popen that stores its output and can scan it for a
+    'ready' pattern' Use self.out to access output (combined stdout and stderr).
+    You can't set the Popen stdout and stderr arguments, they will be overwritten.
+    """
+
+    if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
+        vg_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"]
+    else:
+        vg_args = []
+
+    @property
+    def out(self):
+        self._out.seek(0)
+        # Normalize line endings, os.tmpfile() opens in binary mode.
+        return self._out.read().replace('\r\n','\n').replace('\r','\n')
+
+    def __init__(self, args, skip_valgrind=False, **kwargs):
+        """Start an example process"""
+        args = list(args)
+        if skip_valgrind:
+            self.args = args
+        else:
+            self.args = self.vg_args + args
+        self.kwargs = kwargs
+        self._out = os.tmpfile()
+        try:
+            Popen.__init__(self, self.args, stdout=self._out, stderr=STDOUT, **kwargs)
+        except OSError, e:
+            if e.errno == errno.ENOENT:
+                raise NotFoundError(self, str(e))
+            raise ProcError(self, str(e))
+        except Exception, e:
+            raise ProcError(self, str(e))
+
+    def kill(self):
+        try:
+            if self.poll() is None:
+                Popen.kill(self)
+        except:
+            pass                # Already exited.
+        return self.out
+
+    def wait_exit(self, timeout=DEFAULT_TIMEOUT, expect=0):
+        """Wait for process to exit, return output. Raise ProcError  on failure."""
+        t = threading.Thread(target=self.wait)
+        t.start()
+        t.join(timeout)
+        if self.poll() is None:      # Still running
+            self.kill()
+            raise ProcError(self, "still running after %ss" % timeout)
+        if expect is not None and self.poll() != expect:
+            raise ProcError(self)
+        return self.out
+
+    def wait_re(self, regexp, timeout=DEFAULT_TIMEOUT):
+        """
+        Wait for regexp to appear in the output, returns the re.search match result.
+        The target process should flush() important output to ensure it appears.
+        """
+        if timeout:
+            deadline = time.time() + timeout
+        while timeout is None or time.time() < deadline:
+            match = re.search(regexp, self.out)
+            if match:
+                return match
+            if self.poll() is not None:
+                raise ProcError(self, "process exited while waiting for '%s'" % (regexp))
+            time.sleep(0.01)    # Not very efficient
+        raise ProcError(self, "gave up waiting for '%s' after %ss" % (regexp, timeout))
+
+def _tc_missing(attr):
+    return not hasattr(unittest.TestCase, attr)
+
+class ProcTestCase(unittest.TestCase):
+    """TestCase that manages started processes
+
+    Also roughly provides setUpClass() and tearDownClass() and other features
+    missing in python 2.6. If subclasses override setUp() or tearDown() they
+    *must* call the superclass.
+    """
+
+    def setUp(self):
+        super(ProcTestCase, self).setUp()
+        self.procs = []
+
+    def tearDown(self):
+        for p in self.procs:
+            p.kill()
+        super(ProcTestCase, self).tearDown()
+
+    def proc(self, *args, **kwargs):
+        """Return a Proc() that will be automatically killed on teardown"""
+        p = Proc(*args, **kwargs)
+        self.procs.append(p)
+        return p
+
+    if _tc_missing('setUpClass') and _tc_missing('tearDownClass'):
+
+        @classmethod
+        def setUpClass(cls):
+            pass
+
+        @classmethod
+        def tearDownClass(cls):
+            pass
+
+        def setUp(self):
+            super(ProcTestCase, self).setUp()
+            cls = type(self)
+            if not hasattr(cls, '_setup_class_count'): # First time
+                def is_test(m):
+                    return inspect.ismethod(m) and m.__name__.startswith('test_')
+                cls._setup_class_count = len(inspect.getmembers(cls, predicate=is_test))
+                cls.setUpClass()
+
+        def tearDown(self):
+            self.assertTrue(self._setup_class_count > 0)
+            self._setup_class_count -=  1
+            if self._setup_class_count == 0:
+                type(self).tearDownClass()
+            super(ProcTestCase, self).tearDown()
+
+    if _tc_missing('assertIn'):
+        def assertIn(self, a, b):
+            self.assertTrue(a in b, "%r not in %r" % (a, b))
+
+    if _tc_missing('assertMultiLineEqual'):
+        def assertMultiLineEqual(self, a, b):
+            self.assertEqual(a, b)
+
+from unittest import main
+if __name__ == "__main__":
+    main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/4] qpid-proton git commit: PROTON-1493: c epoll proactor spinning on interruptfd

Posted by ac...@apache.org.
PROTON-1493: c epoll proactor spinning on interruptfd

Missing read() on interruptfd was causing the proactor to spin.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cb5996ee
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cb5996ee
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cb5996ee

Branch: refs/heads/master
Commit: cb5996ee5c714b28d0449caef21d1b50c8412d69
Parents: ad52e3a
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Jun 5 14:23:15 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 7 10:20:41 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 29 ++++++++++++++---------------
 1 file changed, 14 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cb5996ee/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 3a0ea29..b6d2bd0 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -185,6 +185,16 @@ static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
   unlock(&pt->mutex);
 }
 
+/* Read from a timer or event FD */
+static uint64_t read_uint64(int fd) {
+  uint64_t result = 0;
+  ssize_t n = read(fd, &result, sizeof(result));
+  if (n != sizeof(result) && !(n < 0 && errno == EAGAIN)) {
+    EPOLL_FATAL("timerfd or eventfd read error", errno);
+  }
+  return result;
+}
+
 // Callback bookkeeping. Return true if there is an expired timer.
 static bool ptimer_callback(ptimer_t *pt) {
   lock(&pt->mutex);
@@ -193,23 +203,13 @@ static bool ptimer_callback(ptimer_t *pt) {
     if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0)
       pt->timer_active = false;
   }
-  uint64_t u_exp_count = 0;
-  ssize_t l = read(pt->timerfd, &u_exp_count, sizeof(uint64_t));
-  if (l != sizeof(uint64_t)) {
-    if (l == -1) {
-      if (errno != EAGAIN) {
-        EPOLL_FATAL("timer read", errno);
-      }
-    }
-    else
-      EPOLL_FATAL("timer internal error", 0);
-  }
+  uint64_t u_exp_count = read_uint64(pt->timerfd);
   if (!pt->timer_active) {
     // Expiry counter just cleared, timer not set, timerfd not armed
     pt->in_doubt = false;
   }
   unlock(&pt->mutex);
-  return (l == sizeof(uint64_t)) && u_exp_count > 0;
+  return u_exp_count > 0;
 }
 
 // Return true if timerfd has and will have no pollable expiries in the current armed state
@@ -444,9 +444,7 @@ static pcontext_t *wake_pop_front(pn_proactor_t *p) {
        * Can the read system call be made without holding the lock?
        * Note that if the reads/writes happen out of order, the wake
        * mechanism will hang. */
-      uint64_t ignored;
-      int err = read(p->eventfd, &ignored, sizeof(uint64_t));
-      (void)err; // TODO: check for error
+      (void)read_uint64(p->eventfd);
       p->wakes_in_progress = false;
     }
   }
@@ -1673,6 +1671,7 @@ static bool proactor_remove(pcontext_t *ctx) {
 
 static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) {
   if  (ee->fd == p->interruptfd) {        /* Interrupts have their own dedicated eventfd */
+    (void)read_uint64(p->interruptfd);
     rearm(p, &p->epoll_interrupt);
     return proactor_process(p, PN_PROACTOR_INTERRUPT);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/4] qpid-proton git commit: PROTON-1495: c proactor error naming and formatting

Posted by ac...@apache.org.
PROTON-1495: c proactor error naming and formatting

Use consistent condition name "proton:io" for IO-related error conditions.
Consistent error formmatting for proactor implementations
receive.c example minor fix - missing return code


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ad52e3ab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ad52e3ab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ad52e3ab

Branch: refs/heads/master
Commit: ad52e3abb49da97436269ee78c07aba20ccaf742
Parents: 0850526
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jun 1 15:42:33 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 7 09:59:29 2017 -0400

----------------------------------------------------------------------
 examples/c/proactor/receive.c             |  1 +
 proton-c/src/proactor/epoll.c             |  5 ++---
 proton-c/src/proactor/libuv.c             | 13 +++++--------
 proton-c/src/proactor/proactor-internal.c | 10 ++++++++++
 proton-c/src/proactor/proactor-internal.h | 16 +++++++++++++++-
 5 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index ddf0a35..1b9e3f9 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -184,4 +184,5 @@ int main(int argc, char **argv) {
   pn_proactor_connect(app.proactor, pn_connection(), addr);
   run(&app);
   pn_proactor_free(app.proactor);
+  return exit_code;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 3e5327e..3a0ea29 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -245,7 +245,6 @@ pn_timestamp_t pn_i_now2(void)
 // Proactor common code
 // ========================================================================
 
-const char *COND_NAME = "proactor";
 const char *AMQP_PORT = "5672";
 const char *AMQP_PORT_NAME = "amqp";
 
@@ -586,13 +585,13 @@ static void psocket_error(psocket_t *ps, int err, const char* what) {
   if (!ps->listener) {
     pn_connection_driver_t *driver = &psocket_pconnection(ps)->driver;
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
-    pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
+    pn_connection_driver_errorf(driver, PNI_IO_CONDITION, "%s %s:%s: %s",
                                 what, ps->host, ps->port,
                                 strerror(err));
     pn_connection_driver_close(driver);
   } else {
     pn_listener_t *l = psocket_listener(ps);
-    pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
+    pn_condition_format(l->condition, PNI_IO_CONDITION, "%s %s:%s: %s",
                         what, ps->host, ps->port,
                         strerror(err));
     listener_begin_close(l);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index e632f10..a851c4e 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -456,9 +456,8 @@ static void pconnection_set_error(pconnection_t *pc, int err, const char* what)
   pn_connection_driver_t *driver = &pc->driver;
   pn_connection_driver_bind(driver); /* Make sure we are bound so errors will be reported */
   if (!pn_condition_is_set(pn_transport_condition(driver->transport))) {
-    pn_connection_driver_errorf(driver, uv_err_name(err), "%s %s:%s: %s",
-                                what, pc->addr.host, pc->addr.port,
-                                uv_strerror(err));
+    pni_proactor_set_cond(pn_transport_condition(driver->transport),
+                                what, pc->addr.host , pc->addr.port, uv_strerror(err));
   }
 }
 
@@ -473,9 +472,7 @@ static void pconnection_error(pconnection_t *pc, int err, const char* what) {
 static void listener_error_lh(pn_listener_t *l, int err, const char* what) {
   assert(err);
   if (!pn_condition_is_set(l->condition)) {
-    pn_condition_format(l->condition, uv_err_name(err), "%s %s:%s: %s",
-                        what, l->addr.host, l->addr.port,
-                        uv_strerror(err));
+    pni_proactor_set_cond(l->condition, what, l->addr.host, l->addr.port, uv_strerror(err));
   }
   listener_close_lh(l);
 }
@@ -659,11 +656,11 @@ static void leader_listen_lh(pn_listener_t *l) {
       err = 0;
     }
   }
-  /* Always put an OPEN event for symmetry, even if we immediately close with err */
-  pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
   if (err) {
     listener_error_lh(l, err, "listening on");
   }
+  /* Always put an OPEN event for symmetry, even if we have an error. */
+  pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
 }
 
 void pn_listener_free(pn_listener_t *l) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/proton-c/src/proactor/proactor-internal.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/proactor-internal.c b/proton-c/src/proactor/proactor-internal.c
index af7b057..3525c71 100644
--- a/proton-c/src/proactor/proactor-internal.c
+++ b/proton-c/src/proactor/proactor-internal.c
@@ -34,6 +34,8 @@ static const char *AMQP_PORT_NAME = "amqp";
 static const char *AMQPS_PORT = "5671";
 static const char *AMQPS_PORT_NAME = "amqps";
 
+const char *PNI_IO_CONDITION = "proton:io";
+
 int pn_proactor_addr(char *buf, size_t len, const char *host, const char *port) {
   return snprintf(buf, len, "%s:%s", host ? host : "", port ? port : "");
 }
@@ -64,3 +66,11 @@ int pni_parse_addr(const char *addr, char *buf, size_t len, const char **host, c
   }
   return 0;
 }
+
+static inline const char *nonull(const char *str) { return str ? str : ""; }
+
+void pni_proactor_set_cond(
+  pn_condition_t *cond, const char *what, const char *host, const char *port, const char *msg)
+{
+  pn_condition_format(cond, PNI_IO_CONDITION, "%s - %s %s:%s", msg, what, nonull(host), nonull(port));
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/proton-c/src/proactor/proactor-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/proactor-internal.h b/proton-c/src/proactor/proactor-internal.h
index 894cb5b..5307a45 100644
--- a/proton-c/src/proactor/proactor-internal.h
+++ b/proton-c/src/proactor/proactor-internal.h
@@ -22,8 +22,11 @@
 
 #include <proton/type_compat.h>
 #include <proton/import_export.h>
+#include <proton/condition.h>
 
-/*
+/* NOTE PNP_EXTERN is for use by proton-internal tests  */
+
+/**
  * Parse a pn_proactor_addr string, copy data into buf as necessary.
  * Set *host and *port to point to the host and port strings.
  *
@@ -34,4 +37,15 @@
  */
 PNP_EXTERN int pni_parse_addr(const char *addr, char *buf, size_t len, const char **host, const char **port);
 
+/**
+ * Condition name for error conditions related to proton-IO.
+ */
+extern const char *PNI_IO_CONDITION;
+
+/**
+ * Format a proactor error condition with message "<what> (<host>:<port>): <msg>"
+ */
+void pni_proactor_set_cond(
+  pn_condition_t *cond, const char *what, const char *msg, const char *host, const char *port);
+
 #endif // PROACTOR_NETADDR_INTERNAL_H


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org