You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/07 14:33:31 UTC
[1/4] qpid-proton git commit: PROTON-1495: c libuv proactor bug with
multiple listeners
Repository: qpid-proton
Updated Branches:
refs/heads/master 54645b793 -> cb5996ee5
PROTON-1495: c libuv proactor bug with multiple listeners
Listener was incorrectly overwriting the uv_tcp_t struct in an lsocket_t if it
there was a resolve failure. Once uv_tcp_init is called the uv_tcp_t must be
uv_closed and wait for the on_close() callback before freeing or re-using the memory.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4806000
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4806000
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4806000
Branch: refs/heads/master
Commit: d4806000b3eb7e6b556c07819dd642ed9e0a3668
Parents: 54645b7
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Jun 5 12:50:19 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 5 13:27:47 2017 -0400
----------------------------------------------------------------------
proton-c/src/proactor/libuv.c | 92 +++++++++++++++++++++-----------------
1 file changed, 52 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4806000/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 8cd6dd7..e632f10 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -149,6 +149,7 @@ typedef struct lsocket_t {
struct_type type; /* Always T_LSOCKET */
pn_listener_t *parent;
uv_tcp_t tcp;
+ struct lsocket_t *next;
} lsocket_t;
PN_STRUCT_CLASSDEF(lsocket, CID_pn_listener_socket)
@@ -193,7 +194,7 @@ typedef enum {
L_UNINIT, /**<< Not yet listening */
L_LISTENING, /**<< Listening */
L_CLOSE, /**<< Close requested */
- L_CLOSING, /**<< Socket close initiated, wait for close */
+ L_CLOSING, /**<< Socket close initiated, wait for all to close */
L_CLOSED /**<< User saw PN_LISTENER_CLOSED, all done */
} listener_state;
@@ -201,10 +202,6 @@ typedef enum {
struct pn_listener_t {
work_t work; /* Must be first to allow casting */
- size_t nsockets;
- lsocket_t *sockets;
- lsocket_t prealloc[1]; /* Pre-allocated socket array, allocate larger if needed */
-
/* Only used by owner thread */
pn_event_batch_t batch;
pn_record_t *attachments;
@@ -213,6 +210,7 @@ struct pn_listener_t {
/* Only used by leader */
addr_t addr;
+ lsocket_t *lsockets;
/* Locked for thread-safe access. uv_listen can't be stopped or cancelled so we can't
* detach a listener from the UV loop to prevent concurrent access.
@@ -424,10 +422,15 @@ static void listener_close_lh(pn_listener_t* l) {
static void on_close_lsocket(uv_handle_t *h) {
lsocket_t* ls = (lsocket_t*)h->data;
pn_listener_t *l = ls->parent;
- uv_mutex_lock(&l->lock);
- --l->nsockets;
- listener_close_lh(l);
- uv_mutex_unlock(&l->lock);
+ if (l) {
+ /* Remove from list */
+ lsocket_t **pp = &l->lsockets;
+ for (; *pp != ls; pp = &(*pp)->next)
+ ;
+ *pp = ls->next;
+ work_notify(&l->work);
+ }
+ free(ls);
}
static pconnection_t *get_pconnection(pn_connection_t* c) {
@@ -611,16 +614,27 @@ static bool leader_connect(pconnection_t *pc) {
}
}
-static int lsocket_init(lsocket_t *ls, pn_listener_t *l, struct addrinfo *ai) {
+static int lsocket(pn_listener_t *l, struct addrinfo *ai) {
+ lsocket_t *ls = (lsocket_t*)calloc(1, sizeof(lsocket_t));
ls->type = T_LSOCKET;
- ls->parent = l;
ls->tcp.data = ls;
+ ls->parent = NULL;
+ ls->next = NULL;
int err = uv_tcp_init(&l->work.proactor->loop, &ls->tcp);
- if (!err) {
+ if (err) {
+ free(ls); /* Will never be closed */
+ } else {
int flags = (ai->ai_family == AF_INET6) ? UV_TCP_IPV6ONLY : 0;
err = uv_tcp_bind(&ls->tcp, ai->ai_addr, flags);
if (!err) err = uv_listen((uv_stream_t*)&ls->tcp, l->backlog, on_connection);
- if (err) uv_close((uv_handle_t*)&ls->tcp, NULL);
+ if (!err) {
+ /* Add to l->lsockets list */
+ ls->parent = l;
+ ls->next = l->lsockets;
+ l->lsockets = ls;
+ } else {
+ uv_close((uv_handle_t*)&ls->tcp, on_close_lsocket); /* Freed by on_close_lsocket */
+ }
}
return err;
}
@@ -632,28 +646,18 @@ static void leader_listen_lh(pn_listener_t *l) {
leader_inc(l->work.proactor);
int err = leader_resolve(l->work.proactor, &l->addr, true);
if (!err) {
- /* Count addresses, allocate enough space */
- size_t len = 0;
- for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) {
- ++len;
- }
- assert(len > 0); /* Guaranteed by getaddrinfo() */
- l->sockets = (len > ARRAY_LEN(l->prealloc)) ? (lsocket_t*)calloc(len, sizeof(lsocket_t)) : l->prealloc;
/* Find the working addresses */
- l->nsockets = 0;
- int first_err = 0;
for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) {
- lsocket_t *ls = &l->sockets[l->nsockets];
- int err2 = lsocket_init(ls, l, ai);
- if (!err2) {
- ++l->nsockets; /* Next socket */
- } else if (!first_err) {
- first_err = err2;
+ int err2 = lsocket(l, ai);
+ if (err2) {
+ err = err2;
}
}
uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo);
l->addr.getaddrinfo.addrinfo = NULL;
- if (l->nsockets == 0) err = first_err;
+ if (l->lsockets) { /* Ignore errors if we got at least one good listening socket */
+ err = 0;
+ }
}
/* Always put an OPEN event for symmetry, even if we immediately close with err */
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
@@ -670,7 +674,11 @@ void pn_listener_free(pn_listener_t *l) {
if (l->collector) pn_collector_free(l->collector);
if (l->condition) pn_condition_free(l->condition);
if (l->attachments) pn_free(l->attachments);
- if (l->sockets && l->sockets != l->prealloc) free(l->sockets);
+ while (l->lsockets) {
+ lsocket_t *ls = l->lsockets;
+ l->lsockets = ls->next;
+ free(ls);
+ }
assert(!l->accept.front);
free(l);
}
@@ -707,13 +715,13 @@ static bool leader_process_listener(pn_listener_t *l) {
case L_CLOSE: /* Close requested, start closing lsockets */
l->state = L_CLOSING;
- for (size_t i = 0; i < l->nsockets; ++i) {
- uv_safe_close((uv_handle_t*)&l->sockets[i].tcp, on_close_lsocket);
+ for (lsocket_t *ls = l->lsockets; ls; ls = ls->next) {
+ uv_safe_close((uv_handle_t*)&ls->tcp, on_close_lsocket);
}
/* NOTE: Fall through in case we have 0 sockets - e.g. resolver error */
case L_CLOSING: /* Closing - can we send PN_LISTENER_CLOSE? */
- if (l->nsockets == 0) {
+ if (!l->lsockets) {
l->state = L_CLOSED;
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
}
@@ -940,9 +948,11 @@ static void on_proactor_disconnect(uv_handle_t* h, void* v) {
}
case T_LSOCKET: {
pn_listener_t *l = ((lsocket_t*)h->data)->parent;
- pn_condition_t *cond = l->work.proactor->disconnect_cond;
- if (cond) {
- pn_condition_copy(pn_listener_condition(l), cond);
+ if (l) {
+ pn_condition_t *cond = l->work.proactor->disconnect_cond;
+ if (cond) {
+ pn_condition_copy(pn_listener_condition(l), cond);
+ }
}
pn_listener_close(l);
break;
@@ -995,7 +1005,7 @@ static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
}
}
batch = get_batch_lh(p); /* Check for work */
- if (!batch) { /* No work, run the UV loop */
+ if (!batch) { /* No work, run the UV loop */
uv_mutex_unlock(&p->lock); /* Unlock to run UV loop */
uv_run(&p->loop, mode);
uv_mutex_lock(&p->lock);
@@ -1142,8 +1152,10 @@ static void on_proactor_free(uv_handle_t* h, void* v) {
if (h->type == UV_TCP) { /* Put the corresponding work item on the leader_q for cleanup */
work_t *w = NULL;
switch (*(struct_type*)h->data) {
- case T_CONNECTION: w = (work_t*)h->data; break;
- case T_LSOCKET: w = &((lsocket_t*)h->data)->parent->work; break;
+ case T_CONNECTION:
+ w = (work_t*)h->data; break;
+ case T_LSOCKET:
+ w = &((lsocket_t*)h->data)->parent->work; break;
default: break;
}
if (w && w->next == work_unqueued) {
@@ -1317,5 +1329,5 @@ int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) {
}
pn_millis_t pn_proactor_now(void) {
- return uv_hrtime() / 1000000; // uv_hrtime returns time in nanoseconds
+ return uv_hrtime() / 1000000; // uv_hrtime returns time in nanoseconds
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/4] qpid-proton git commit: PROTON-1495: move
examples/exampletest.py to tools/py/proctest.py
Posted by ac...@apache.org.
PROTON-1495: move examples/exampletest.py to tools/py/proctest.py
It is not an example but part of our test framework.
Want to use it for non-example tests as well.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/08505261
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/08505261
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/08505261
Branch: refs/heads/master
Commit: 08505261f4008c43b2399cb4a72bd887bbadd7df
Parents: d480600
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 2 09:33:43 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 7 09:59:23 2017 -0400
----------------------------------------------------------------------
config.sh.in | 2 +-
examples/CMakeLists.txt | 4 +-
examples/c/proactor/example_test.py | 4 +-
examples/cpp/example_test.py | 4 +-
examples/exampletest.py | 193 -----------------------------
tools/py/proctest.py | 204 +++++++++++++++++++++++++++++++
6 files changed, 211 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 72d4ea9..d9debd3 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -39,7 +39,7 @@ RUBY_BINDINGS=$PROTON_BINDINGS/ruby
PERL_BINDINGS=$PROTON_BINDINGS/perl
# Python
-COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python:$PROTON_HOME/examples
+COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python:$PROTON_HOME/tools/py
export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS
# PHP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 4d744d2..8a8327a 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -29,8 +29,8 @@ macro(set_search_path result) # args after result are directories or search pat
file(TO_NATIVE_PATH "${${result}}" ${result}) # native slash separators
endmacro()
-# Some non-python examples use exampletest.py to drive their self-tests.
-set_search_path(EXAMPLE_PYTHONPATH "${CMAKE_CURRENT_SOURCE_DIR}" "$ENV{PYTHON_PATH}")
+# Add the tools directory for the 'proctest' module
+set_search_path(EXAMPLE_PYTHONPATH "${CMAKE_SOURCE_DIR}/tools/py" "$ENV{PYTHON_PATH}")
set(EXAMPLE_ENV "PYTHONPATH=${EXAMPLE_PYTHONPATH}")
add_subdirectory(c)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/examples/c/proactor/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/example_test.py b/examples/c/proactor/example_test.py
index 38f7fc8..02bb1fd 100644
--- a/examples/c/proactor/example_test.py
+++ b/examples/c/proactor/example_test.py
@@ -20,7 +20,7 @@
# This is a test script to run the examples and verify that they behave as expected.
import unittest, sys, time
-from exampletest import *
+from proctest import *
def python_cmd(name):
dir = os.path.dirname(__file__)
@@ -49,7 +49,7 @@ class Broker(object):
raise ProcError(b, "broker crash")
b.kill()
-class CExampleTest(ExampleTestCase):
+class CExampleTest(ProcTestCase):
def test_send_receive(self):
"""Send first then receive"""
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 5773142..0ae929c 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -21,7 +21,7 @@
import unittest
import os, sys, socket, time, re, inspect
-from exampletest import *
+from proctest import *
from random import randrange
from subprocess import Popen, PIPE, STDOUT, call
from copy import copy
@@ -82,7 +82,7 @@ def ensureCanTestExtendedSASL():
raise Skipped("Can't Test Extended SASL: Couldn't create auth db")
-class BrokerTestCase(ExampleTestCase):
+class BrokerTestCase(ProcTestCase):
"""
ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
Subclasses must set `broker_exe` class variable with the name of the broker executable.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
deleted file mode 100644
index 5c53616..0000000
--- a/examples/exampletest.py
+++ /dev/null
@@ -1,193 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License
-#
-
-# A test library to make it easy to run unittest tests that start,
-# monitor, and report output from sub-processes. In particular
-# it helps with starting processes that listen on random ports.
-
-import unittest
-import os, sys, socket, time, re, inspect, errno, threading
-from random import randrange
-from subprocess import Popen, PIPE, STDOUT
-from copy import copy
-import platform
-from os.path import dirname as dirname
-
-DEFAULT_TIMEOUT=10
-
-class TestPort(object):
- """Get an unused port using bind(0) and SO_REUSEADDR and hold it till close()"""
- def __init__(self):
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.sock.bind(('127.0.0.1', 0)) # Testing exampless is local only
- self.host, self.port = socket.getnameinfo(self.sock.getsockname(), 0)
- self.addr = "%s:%s" % (self.host, self.port)
-
- def __enter__(self):
- return self
-
- def __exit__(self, *args):
- self.close()
-
- def close(self):
- self.sock.close()
-
-class ProcError(Exception):
- """An exception that captures failed process output"""
- def __init__(self, proc, what="bad exit status"):
- out = proc.out.strip()
- if out:
- out = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % out
- else:
- out = ", no output)"
- super(Exception, self, ).__init__(
- "%s %s, code=%s%s" % (proc.args, what, getattr(proc, 'returncode', 'noreturn'), out))
-
-class NotFoundError(ProcError):
- pass
-
-class Proc(Popen):
- """A example process that stores its stdout and can scan it for a 'ready' pattern'"""
-
- if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
- vg_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"]
- else:
- vg_args = []
-
- @property
- def out(self):
- self._out.seek(0)
- # Normalize line endings, os.tmpfile() opens in binary mode.
- return self._out.read().replace('\r\n','\n').replace('\r','\n')
-
- def __init__(self, args, skip_valgrind=False, **kwargs):
- """Start an example process"""
- args = list(args)
- if skip_valgrind:
- self.args = args
- else:
- self.args = self.vg_args + args
- self.kwargs = kwargs
- self._out = os.tmpfile()
- try:
- Popen.__init__(self, self.args, stdout=self._out, stderr=STDOUT, **kwargs)
- except OSError, e:
- if e.errno == errno.ENOENT:
- raise NotFoundError(self, str(e))
- raise ProcError(self, str(e))
- except Exception, e:
- raise ProcError(self, str(e))
-
- def kill(self):
- try:
- if self.poll() is None:
- Popen.kill(self)
- except:
- pass # Already exited.
- return self.out
-
- def wait_exit(self, timeout=DEFAULT_TIMEOUT, expect=0):
- """Wait for process to exit, return output. Raise ProcError on failure."""
- t = threading.Thread(target=self.wait)
- t.start()
- t.join(timeout)
- if self.poll() is None: # Still running
- self.kill()
- raise ProcError(self, "still running after %ss" % timeout)
- if expect is not None and self.poll() != expect:
- raise ProcError(self)
- return self.out
-
- def wait_re(self, regexp, timeout=DEFAULT_TIMEOUT):
- """
- Wait for regexp to appear in the output, returns the re.search match result.
- The target process should flush() important output to ensure it appears.
- """
- if timeout:
- deadline = time.time() + timeout
- while timeout is None or time.time() < deadline:
- match = re.search(regexp, self.out)
- if match:
- return match
- time.sleep(0.01) # Not very efficient
- raise ProcError(self, "gave up waiting for '%s' after %ss" % (regexp, timeout))
-
-def _tc_missing(attr):
- return not hasattr(unittest.TestCase, attr)
-
-class TestCase(unittest.TestCase):
- """
- Roughly provides setUpClass() and tearDownClass() and other features missing
- in python 2.6. If subclasses override setUp() or tearDown() they *must*
- call the superclass.
- """
-
- if _tc_missing('setUpClass') and _tc_missing('tearDownClass'):
-
- @classmethod
- def setUpClass(cls):
- pass
-
- @classmethod
- def tearDownClass(cls):
- pass
-
- def setUp(self):
- super(TestCase, self).setUp()
- cls = type(self)
- if not hasattr(cls, '_setup_class_count'): # First time
- def is_test(m):
- return inspect.ismethod(m) and m.__name__.startswith('test_')
- cls._setup_class_count = len(inspect.getmembers(cls, predicate=is_test))
- cls.setUpClass()
-
- def tearDown(self):
- self.assertTrue(self._setup_class_count > 0)
- self._setup_class_count -= 1
- if self._setup_class_count == 0:
- type(self).tearDownClass()
- super(TestCase, self).tearDown()
-
- if _tc_missing('assertIn'):
- def assertIn(self, a, b):
- self.assertTrue(a in b, "%r not in %r" % (a, b))
-
- if _tc_missing('assertMultiLineEqual'):
- def assertMultiLineEqual(self, a, b):
- self.assertEqual(a, b)
-
-class ExampleTestCase(TestCase):
- """TestCase that manages started processes"""
- def setUp(self):
- super(ExampleTestCase, self).setUp()
- self.procs = []
-
- def tearDown(self):
- for p in self.procs:
- p.kill()
- super(ExampleTestCase, self).tearDown()
-
- def proc(self, *args, **kwargs):
- p = Proc(*args, **kwargs)
- self.procs.append(p)
- return p
-
-if __name__ == "__main__":
- unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/08505261/tools/py/proctest.py
----------------------------------------------------------------------
diff --git a/tools/py/proctest.py b/tools/py/proctest.py
new file mode 100644
index 0000000..ba83c7d
--- /dev/null
+++ b/tools/py/proctest.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+"""Unit test library to simplify tests that start, monitor, check and report
+output from sub-processes. Provides safe port allocation for processes that
+listen on a port. Allows executables to be run under a debugging tool like
+valgrind.
+"""
+
+import unittest
+import os, sys, socket, time, re, inspect, errno, threading
+from random import randrange
+from subprocess import Popen, PIPE, STDOUT
+from copy import copy
+import platform
+from os.path import dirname as dirname
+
+DEFAULT_TIMEOUT=10
+
+class TestPort(object):
+ """Get an unused port using bind(0) and SO_REUSEADDR and hold it till close()
+ Can be used as `with TestPort() as tp:` Provides tp.host, tp.port and tp.addr
+ (a "host:port" string)
+ """
+ def __init__(self):
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.sock.bind(('127.0.0.1', 0)) # Testing exampless is local only
+ self.host, self.port = socket.getnameinfo(self.sock.getsockname(), 0)
+ self.addr = "%s:%s" % (self.host, self.port)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ def close(self):
+ self.sock.close()
+
+class ProcError(Exception):
+ """An exception that displays failed process output"""
+ def __init__(self, proc, what="bad exit status"):
+ self.out = proc.out.strip()
+ if self.out:
+ msgtail = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % self.out
+ else:
+ msgtail = ", no output"
+ super(Exception, self, ).__init__(
+ "%s %s, code=%s%s" % (proc.args, what, getattr(proc, 'returncode', 'noreturn'), msgtail))
+
+class NotFoundError(ProcError):
+ pass
+
+class Proc(Popen):
+ """Subclass of suprocess.Popen that stores its output and can scan it for a
+ 'ready' pattern' Use self.out to access output (combined stdout and stderr).
+ You can't set the Popen stdout and stderr arguments, they will be overwritten.
+ """
+
+ if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
+ vg_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"]
+ else:
+ vg_args = []
+
+ @property
+ def out(self):
+ self._out.seek(0)
+ # Normalize line endings, os.tmpfile() opens in binary mode.
+ return self._out.read().replace('\r\n','\n').replace('\r','\n')
+
+ def __init__(self, args, skip_valgrind=False, **kwargs):
+ """Start an example process"""
+ args = list(args)
+ if skip_valgrind:
+ self.args = args
+ else:
+ self.args = self.vg_args + args
+ self.kwargs = kwargs
+ self._out = os.tmpfile()
+ try:
+ Popen.__init__(self, self.args, stdout=self._out, stderr=STDOUT, **kwargs)
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ raise NotFoundError(self, str(e))
+ raise ProcError(self, str(e))
+ except Exception, e:
+ raise ProcError(self, str(e))
+
+ def kill(self):
+ try:
+ if self.poll() is None:
+ Popen.kill(self)
+ except:
+ pass # Already exited.
+ return self.out
+
+ def wait_exit(self, timeout=DEFAULT_TIMEOUT, expect=0):
+ """Wait for process to exit, return output. Raise ProcError on failure."""
+ t = threading.Thread(target=self.wait)
+ t.start()
+ t.join(timeout)
+ if self.poll() is None: # Still running
+ self.kill()
+ raise ProcError(self, "still running after %ss" % timeout)
+ if expect is not None and self.poll() != expect:
+ raise ProcError(self)
+ return self.out
+
+ def wait_re(self, regexp, timeout=DEFAULT_TIMEOUT):
+ """
+ Wait for regexp to appear in the output, returns the re.search match result.
+ The target process should flush() important output to ensure it appears.
+ """
+ if timeout:
+ deadline = time.time() + timeout
+ while timeout is None or time.time() < deadline:
+ match = re.search(regexp, self.out)
+ if match:
+ return match
+ if self.poll() is not None:
+ raise ProcError(self, "process exited while waiting for '%s'" % (regexp))
+ time.sleep(0.01) # Not very efficient
+ raise ProcError(self, "gave up waiting for '%s' after %ss" % (regexp, timeout))
+
+def _tc_missing(attr):
+ return not hasattr(unittest.TestCase, attr)
+
+class ProcTestCase(unittest.TestCase):
+ """TestCase that manages started processes
+
+ Also roughly provides setUpClass() and tearDownClass() and other features
+ missing in python 2.6. If subclasses override setUp() or tearDown() they
+ *must* call the superclass.
+ """
+
+ def setUp(self):
+ super(ProcTestCase, self).setUp()
+ self.procs = []
+
+ def tearDown(self):
+ for p in self.procs:
+ p.kill()
+ super(ProcTestCase, self).tearDown()
+
+ def proc(self, *args, **kwargs):
+ """Return a Proc() that will be automatically killed on teardown"""
+ p = Proc(*args, **kwargs)
+ self.procs.append(p)
+ return p
+
+ if _tc_missing('setUpClass') and _tc_missing('tearDownClass'):
+
+ @classmethod
+ def setUpClass(cls):
+ pass
+
+ @classmethod
+ def tearDownClass(cls):
+ pass
+
+ def setUp(self):
+ super(ProcTestCase, self).setUp()
+ cls = type(self)
+ if not hasattr(cls, '_setup_class_count'): # First time
+ def is_test(m):
+ return inspect.ismethod(m) and m.__name__.startswith('test_')
+ cls._setup_class_count = len(inspect.getmembers(cls, predicate=is_test))
+ cls.setUpClass()
+
+ def tearDown(self):
+ self.assertTrue(self._setup_class_count > 0)
+ self._setup_class_count -= 1
+ if self._setup_class_count == 0:
+ type(self).tearDownClass()
+ super(ProcTestCase, self).tearDown()
+
+ if _tc_missing('assertIn'):
+ def assertIn(self, a, b):
+ self.assertTrue(a in b, "%r not in %r" % (a, b))
+
+ if _tc_missing('assertMultiLineEqual'):
+ def assertMultiLineEqual(self, a, b):
+ self.assertEqual(a, b)
+
+from unittest import main
+if __name__ == "__main__":
+ main()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/4] qpid-proton git commit: PROTON-1493: c epoll proactor spinning
on interruptfd
Posted by ac...@apache.org.
PROTON-1493: c epoll proactor spinning on interruptfd
Missing read() on interruptfd was causing the proactor to spin.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cb5996ee
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cb5996ee
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cb5996ee
Branch: refs/heads/master
Commit: cb5996ee5c714b28d0449caef21d1b50c8412d69
Parents: ad52e3a
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Jun 5 14:23:15 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 7 10:20:41 2017 -0400
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 29 ++++++++++++++---------------
1 file changed, 14 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cb5996ee/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 3a0ea29..b6d2bd0 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -185,6 +185,16 @@ static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
unlock(&pt->mutex);
}
+/* Read from a timer or event FD */
+static uint64_t read_uint64(int fd) {
+ uint64_t result = 0;
+ ssize_t n = read(fd, &result, sizeof(result));
+ if (n != sizeof(result) && !(n < 0 && errno == EAGAIN)) {
+ EPOLL_FATAL("timerfd or eventfd read error", errno);
+ }
+ return result;
+}
+
// Callback bookkeeping. Return true if there is an expired timer.
static bool ptimer_callback(ptimer_t *pt) {
lock(&pt->mutex);
@@ -193,23 +203,13 @@ static bool ptimer_callback(ptimer_t *pt) {
if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0)
pt->timer_active = false;
}
- uint64_t u_exp_count = 0;
- ssize_t l = read(pt->timerfd, &u_exp_count, sizeof(uint64_t));
- if (l != sizeof(uint64_t)) {
- if (l == -1) {
- if (errno != EAGAIN) {
- EPOLL_FATAL("timer read", errno);
- }
- }
- else
- EPOLL_FATAL("timer internal error", 0);
- }
+ uint64_t u_exp_count = read_uint64(pt->timerfd);
if (!pt->timer_active) {
// Expiry counter just cleared, timer not set, timerfd not armed
pt->in_doubt = false;
}
unlock(&pt->mutex);
- return (l == sizeof(uint64_t)) && u_exp_count > 0;
+ return u_exp_count > 0;
}
// Return true if timerfd has and will have no pollable expiries in the current armed state
@@ -444,9 +444,7 @@ static pcontext_t *wake_pop_front(pn_proactor_t *p) {
* Can the read system call be made without holding the lock?
* Note that if the reads/writes happen out of order, the wake
* mechanism will hang. */
- uint64_t ignored;
- int err = read(p->eventfd, &ignored, sizeof(uint64_t));
- (void)err; // TODO: check for error
+ (void)read_uint64(p->eventfd);
p->wakes_in_progress = false;
}
}
@@ -1673,6 +1671,7 @@ static bool proactor_remove(pcontext_t *ctx) {
static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) {
if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */
+ (void)read_uint64(p->interruptfd);
rearm(p, &p->epoll_interrupt);
return proactor_process(p, PN_PROACTOR_INTERRUPT);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/4] qpid-proton git commit: PROTON-1495: c proactor error naming
and formatting
Posted by ac...@apache.org.
PROTON-1495: c proactor error naming and formatting
Use consistent condition name "proton:io" for IO-related error conditions.
Consistent error formmatting for proactor implementations
receive.c example minor fix - missing return code
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ad52e3ab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ad52e3ab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ad52e3ab
Branch: refs/heads/master
Commit: ad52e3abb49da97436269ee78c07aba20ccaf742
Parents: 0850526
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jun 1 15:42:33 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 7 09:59:29 2017 -0400
----------------------------------------------------------------------
examples/c/proactor/receive.c | 1 +
proton-c/src/proactor/epoll.c | 5 ++---
proton-c/src/proactor/libuv.c | 13 +++++--------
proton-c/src/proactor/proactor-internal.c | 10 ++++++++++
proton-c/src/proactor/proactor-internal.h | 16 +++++++++++++++-
5 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index ddf0a35..1b9e3f9 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -184,4 +184,5 @@ int main(int argc, char **argv) {
pn_proactor_connect(app.proactor, pn_connection(), addr);
run(&app);
pn_proactor_free(app.proactor);
+ return exit_code;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 3e5327e..3a0ea29 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -245,7 +245,6 @@ pn_timestamp_t pn_i_now2(void)
// Proactor common code
// ========================================================================
-const char *COND_NAME = "proactor";
const char *AMQP_PORT = "5672";
const char *AMQP_PORT_NAME = "amqp";
@@ -586,13 +585,13 @@ static void psocket_error(psocket_t *ps, int err, const char* what) {
if (!ps->listener) {
pn_connection_driver_t *driver = &psocket_pconnection(ps)->driver;
pn_connection_driver_bind(driver); /* Bind so errors will be reported */
- pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
+ pn_connection_driver_errorf(driver, PNI_IO_CONDITION, "%s %s:%s: %s",
what, ps->host, ps->port,
strerror(err));
pn_connection_driver_close(driver);
} else {
pn_listener_t *l = psocket_listener(ps);
- pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
+ pn_condition_format(l->condition, PNI_IO_CONDITION, "%s %s:%s: %s",
what, ps->host, ps->port,
strerror(err));
listener_begin_close(l);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index e632f10..a851c4e 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -456,9 +456,8 @@ static void pconnection_set_error(pconnection_t *pc, int err, const char* what)
pn_connection_driver_t *driver = &pc->driver;
pn_connection_driver_bind(driver); /* Make sure we are bound so errors will be reported */
if (!pn_condition_is_set(pn_transport_condition(driver->transport))) {
- pn_connection_driver_errorf(driver, uv_err_name(err), "%s %s:%s: %s",
- what, pc->addr.host, pc->addr.port,
- uv_strerror(err));
+ pni_proactor_set_cond(pn_transport_condition(driver->transport),
+ what, pc->addr.host , pc->addr.port, uv_strerror(err));
}
}
@@ -473,9 +472,7 @@ static void pconnection_error(pconnection_t *pc, int err, const char* what) {
static void listener_error_lh(pn_listener_t *l, int err, const char* what) {
assert(err);
if (!pn_condition_is_set(l->condition)) {
- pn_condition_format(l->condition, uv_err_name(err), "%s %s:%s: %s",
- what, l->addr.host, l->addr.port,
- uv_strerror(err));
+ pni_proactor_set_cond(l->condition, what, l->addr.host, l->addr.port, uv_strerror(err));
}
listener_close_lh(l);
}
@@ -659,11 +656,11 @@ static void leader_listen_lh(pn_listener_t *l) {
err = 0;
}
}
- /* Always put an OPEN event for symmetry, even if we immediately close with err */
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
if (err) {
listener_error_lh(l, err, "listening on");
}
+ /* Always put an OPEN event for symmetry, even if we have an error. */
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
}
void pn_listener_free(pn_listener_t *l) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/proton-c/src/proactor/proactor-internal.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/proactor-internal.c b/proton-c/src/proactor/proactor-internal.c
index af7b057..3525c71 100644
--- a/proton-c/src/proactor/proactor-internal.c
+++ b/proton-c/src/proactor/proactor-internal.c
@@ -34,6 +34,8 @@ static const char *AMQP_PORT_NAME = "amqp";
static const char *AMQPS_PORT = "5671";
static const char *AMQPS_PORT_NAME = "amqps";
+const char *PNI_IO_CONDITION = "proton:io";
+
int pn_proactor_addr(char *buf, size_t len, const char *host, const char *port) {
return snprintf(buf, len, "%s:%s", host ? host : "", port ? port : "");
}
@@ -64,3 +66,11 @@ int pni_parse_addr(const char *addr, char *buf, size_t len, const char **host, c
}
return 0;
}
+
+static inline const char *nonull(const char *str) { return str ? str : ""; }
+
+void pni_proactor_set_cond(
+ pn_condition_t *cond, const char *what, const char *host, const char *port, const char *msg)
+{
+ pn_condition_format(cond, PNI_IO_CONDITION, "%s - %s %s:%s", msg, what, nonull(host), nonull(port));
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad52e3ab/proton-c/src/proactor/proactor-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/proactor-internal.h b/proton-c/src/proactor/proactor-internal.h
index 894cb5b..5307a45 100644
--- a/proton-c/src/proactor/proactor-internal.h
+++ b/proton-c/src/proactor/proactor-internal.h
@@ -22,8 +22,11 @@
#include <proton/type_compat.h>
#include <proton/import_export.h>
+#include <proton/condition.h>
-/*
+/* NOTE PNP_EXTERN is for use by proton-internal tests */
+
+/**
* Parse a pn_proactor_addr string, copy data into buf as necessary.
* Set *host and *port to point to the host and port strings.
*
@@ -34,4 +37,15 @@
*/
PNP_EXTERN int pni_parse_addr(const char *addr, char *buf, size_t len, const char **host, const char **port);
+/**
+ * Condition name for error conditions related to proton-IO.
+ */
+extern const char *PNI_IO_CONDITION;
+
+/**
+ * Format a proactor error condition with message "<what> (<host>:<port>): <msg>"
+ */
+void pni_proactor_set_cond(
+ pn_condition_t *cond, const char *what, const char *msg, const char *host, const char *port);
+
#endif // PROACTOR_NETADDR_INTERNAL_H
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org