You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/01 15:04:15 UTC

[47/50] qpid-proton git commit: PROTON-1568: c++ enable race detection for self-tests

PROTON-1568: c++ enable race detection for self-tests

- update proctest.py to enable `valgrind --tool=helgrind` tests
- enable helgrind for C++ multithreaded_client* tests
- set valgrind options for max speed, run separately for more debugging info
- fix race in proactor/epoll.c shown by helgrind
- clarify lock scopes in proactor_container_impl.cpp


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/17d2a6f4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/17d2a6f4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/17d2a6f4

Branch: refs/heads/go1
Commit: 17d2a6f48898a20020d2509e63e9736a946d6b96
Parents: ab8ae87
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Aug 29 18:07:03 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Aug 31 16:13:34 2017 -0400

----------------------------------------------------------------------
 examples/cpp/example_test.py                    | 67 +++++++++---------
 .../cpp/src/proactor_container_impl.cpp         | 27 ++++++--
 proton-c/src/proactor/epoll.c                   | 51 +++++++++++---
 proton-c/src/tests/fdlimit.py                   |  2 +-
 tests/perf/quick_perf.py                        |  5 +-
 tools/py/proctest.py                            | 72 ++++++++++++++------
 6 files changed, 147 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/17d2a6f4/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 98f1d90..248e596 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -22,7 +22,7 @@
 import unittest
 import os, sys, socket, time, re, inspect
 from proctest import *
-from  random import randrange
+from random import randrange
 from subprocess import Popen, PIPE, STDOUT, call
 from copy import copy
 import platform
@@ -32,25 +32,11 @@ from string import Template
 
 createdSASLDb = False
 
-def findfileinpath(filename, searchpath):
-    """Find filename in the searchpath
-        return absolute path to the file or None
-    """
-    paths = searchpath.split(os.pathsep)
-    for path in paths:
-        if os.path.exists(os.path.join(path, filename)):
-            return os.path.abspath(os.path.join(path, filename))
-    return None
-
 def _cyrusSetup(conf_dir):
-  """Write out simple SASL config.
+  """Write out simple SASL config.tests
   """
-  saslpasswd = ""
-  if 'SASLPASSWD' in os.environ:
-    saslpasswd = os.environ['SASLPASSWD']
-  else:
-    saslpasswd = findfileinpath('saslpasswd2', os.getenv('PATH')) or ""
-  if os.path.exists(saslpasswd):
+  saslpasswd = os.getenv('SASLPASSWD') or find_file('saslpasswd2', os.getenv('PATH'))
+  if saslpasswd:
     t = Template("""sasldb_path: ${db}
 mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS
 """)
@@ -87,7 +73,6 @@ class BrokerTestCase(ProcTestCase):
     ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
     Subclasses must set `broker_exe` class variable with the name of the broker executable.
     """
-
     @classmethod
     def setUpClass(cls):
         cls.broker = None       # In case Proc throws, create the attribute.
@@ -147,7 +132,8 @@ class ContainerExampleTest(BrokerTestCase):
     def test_simple_send_direct_recv(self):
         with TestPort() as tp:
             addr = "%s/examples" % tp.addr
-            recv = self.proc(["direct_recv", "-a", addr], "listening")
+            recv = self.proc(["direct_recv", "-a", addr])
+            recv.wait_re("listening")
             self.assertMultiLineEqual("all messages confirmed\n",
                              self.proc(["simple_send", "-a", addr]).wait_exit())
             self.assertMultiLineEqual(recv_expect("direct_recv", addr), recv.wait_exit())
@@ -155,7 +141,8 @@ class ContainerExampleTest(BrokerTestCase):
     def test_simple_recv_direct_send(self):
         with TestPort() as tp:
             addr = "%s/examples" % tp.addr
-            send = self.proc(["direct_send", "-a", addr], "listening")
+            send = self.proc(["direct_send", "-a", addr])
+            send.wait_re("listening")
             self.assertMultiLineEqual(recv_expect("simple_recv", addr),
                              self.proc(["simple_recv", "-a", addr]).wait_exit())
             self.assertMultiLineEqual(
@@ -163,14 +150,16 @@ class ContainerExampleTest(BrokerTestCase):
                 send.wait_exit())
 
     def test_request_response(self):
-        server = self.proc(["server", "-a", self.addr], "connected")
+        server = self.proc(["server", "-a", self.addr])
+        server.wait_re("connected")
         self.assertMultiLineEqual(CLIENT_EXPECT,
                          self.proc(["client", "-a", self.addr]).wait_exit())
 
     def test_request_response_direct(self):
         with TestPort() as tp:
             addr = "%s/examples" % tp.addr
-            server = self.proc(["server_direct", "-a", addr], "listening")
+            server = self.proc(["server_direct", "-a", addr])
+            server.wait_re("listening")
             self.assertMultiLineEqual(CLIENT_EXPECT,
                              self.proc(["client", "-a", addr]).wait_exit())
 
@@ -216,13 +205,11 @@ map{string(k1):int(42), symbol(k2):boolean(0)}
         self.assertTrue(len(out) > 0);
         self.assertEqual(["send"]*len(out), out)
 
+    @unittest.skipUnless(find_exes('scheduled_send'), "not a  C++11 build")
     def test_scheduled_send(self):
-        try:
-            out = self.proc(["scheduled_send", "-a", self.addr+"scheduled_send", "-t", "0.1", "-i", "0.001"]).wait_exit().split()
-            self.assertTrue(len(out) > 0);
-            self.assertEqual(["send"]*len(out), out)
-        except ProcError:       # File not found, not a C++11 build.
-            pass
+        out = self.proc(["scheduled_send", "-a", self.addr+"scheduled_send", "-t", "0.1", "-i", "0.001"]).wait_exit().split()
+        self.assertTrue(len(out) > 0);
+        self.assertEqual(["send"]*len(out), out)
 
     def test_message_properties(self):
         expect="""using put/get: short=123 string=foo symbol=sym
@@ -236,20 +223,28 @@ expected conversion_error: "unexpected type, want: uint got: string"
 """
         self.assertMultiLineEqual(expect, self.proc(["message_properties"]).wait_exit())
 
+    @unittest.skipUnless(find_exes('multithreaded_client'), "not a  C++11 build")
+    def test_multithreaded_client(self):
+        got = self.proc(["multithreaded_client", self.addr, "examples", "10"], helgrind=True).wait_exit()
+        self.maxDiff = None
+        self.assertRegexpMatches(got, "10 messages sent and received");
 
+    @unittest.skipUnless(find_exes('multithreaded_client_flow_control'), "not a  C++11 build")
+    def test_multithreaded_client_flow_control(self):
+        got = self.proc(["multithreaded_client_flow_control", self.addr, "examples", "10", "2"], helgrind=True).wait_exit()
+        self.maxDiff = None
+        self.assertRegexpMatches(got, "20 messages sent and received");
 
 class ContainerExampleSSLTest(BrokerTestCase):
     """Run the SSL container examples, verify they behave as expected."""
 
     broker_exe = "broker"
+    valgrind = False            # Disable for all tests, including inherited
 
     def setUp(self):
         super(ContainerExampleSSLTest, self).setUp()
-        self.vg_args = Proc.vg_args
-        Proc.vg_args = []       # Disable
 
     def tearDown(self):
-        Proc.vg_args = self.vg_args
         super(ContainerExampleSSLTest, self).tearDown()
 
     def ssl_certs_dir(self):
@@ -262,7 +257,7 @@ class ContainerExampleSSLTest(BrokerTestCase):
         with TestPort() as tp:
             addr = "amqps://%s/examples" % tp.addr
             # Disable valgrind when using OpenSSL
-            out = self.proc(["ssl", "-a", addr, "-c", self.ssl_certs_dir()], skip_valgrind=True).wait_exit()
+            out = self.proc(["ssl", "-a", addr, "-c", self.ssl_certs_dir()]).wait_exit()
             expect = "Outgoing client connection connected via SSL.  Server certificate identity CN=test_server\nHello World!"
             expect_found = (out.find(expect) >= 0)
             self.assertEqual(expect_found, True)
@@ -272,7 +267,7 @@ class ContainerExampleSSLTest(BrokerTestCase):
         with TestPort() as tp:
             addr = "amqps://%s/examples" % tp.addr
             # Disable valgrind when using OpenSSL
-            out = self.proc(["ssl", "-a", addr, "-c", self.ssl_certs_dir(), "-v", "noname"], skip_valgrind=True).wait_exit()
+            out = self.proc(["ssl", "-a", addr, "-c", self.ssl_certs_dir(), "-v", "noname"], valgrind=False).wait_exit()
             expect = "Outgoing client connection connected via SSL.  Server certificate identity CN=test_server\nHello World!"
             expect_found = (out.find(expect) >= 0)
             self.assertEqual(expect_found, True)
@@ -282,7 +277,7 @@ class ContainerExampleSSLTest(BrokerTestCase):
         with TestPort() as tp:
             addr = "amqps://%s/examples" % tp.addr
             # Disable valgrind when using OpenSSL
-            out = self.proc(["ssl", "-a", addr, "-c", self.ssl_certs_dir(), "-v", "fail"], skip_valgrind=True).wait_exit()
+            out = self.proc(["ssl", "-a", addr, "-c", self.ssl_certs_dir(), "-v", "fail"]).wait_exit()
             expect = "Expected failure of connection with wrong peer name"
             expect_found = (out.find(expect) >= 0)
             self.assertEqual(expect_found, True)
@@ -296,7 +291,7 @@ Hello World!
         with TestPort() as tp:
             addr = "amqps://%s/examples" % tp.addr
             # Disable valgrind when using OpenSSL
-            out = self.proc(["ssl_client_cert", addr, self.ssl_certs_dir()], skip_valgrind=True).wait_exit()
+            out = self.proc(["ssl_client_cert", addr, self.ssl_certs_dir()]).wait_exit()
             expect_found = (out.find(expect) >= 0)
             self.assertEqual(expect_found, True)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/17d2a6f4/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index ff4d4bb..7f5edfe 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -187,6 +187,16 @@ pn_connection_t* container::impl::make_connection_lh(
     return pnc;                 // 1 refcount from pn_connection()
 }
 
+// Takes ownership of pnc
+//
+// NOTE: After the call to start_connecton() pnc is active in a proactor thread,
+// and may even have been freed already. It is undefined to use pnc (or any
+// object belonging to it) except in appropriate handlers.
+//
+// SUBTLE NOTE: There must not be any proton::object wrappers in scope when
+// start_connecton() is called. The wrapper destructor will call pn_decref()
+// after start_connecton() which is undefined!
+//
 void container::impl::start_connection(const url& url, pn_connection_t *pnc) {
     char caddr[PN_MAX_ADDR];
     pn_proactor_addr(caddr, sizeof(caddr), url.host().c_str(), url.port().c_str());
@@ -261,10 +271,13 @@ returned<connection> container::impl::connect(
     const std::string& addr,
     const proton::connection_options& user_opts)
 {
-    GUARD(lock_);
     proton::url url(addr);
-    pn_connection_t *pnc = make_connection_lh(url, user_opts);
-    start_connection(url, pnc);
+    pn_connection_t* pnc = 0;
+    {
+        GUARD(lock_);
+        pnc = make_connection_lh(url, user_opts);
+    }
+    start_connection(url, pnc); // See comment on start_connection
     return make_returned<proton::connection>(pnc);
 }
 
@@ -280,8 +293,8 @@ returned<sender> container::impl::open_sender(const std::string &urlstr, const p
         pnc = make_connection_lh(url, o2);
         connection conn(make_wrapper(pnc));
         pnl = unwrap(conn.default_session().open_sender(url.path(), lopts));
-    }                                   // There must be no refcounting after here
-    start_connection(url, pnc);         // Takes ownership of pnc
+    }
+    start_connection(url, pnc); // See comment on start_connection
     return make_returned<sender>(pnl);  // Unsafe returned pointer
 }
 
@@ -296,8 +309,8 @@ returned<receiver> container::impl::open_receiver(const std::string &urlstr, con
         pnc = make_connection_lh(url, o2);
         connection conn(make_wrapper(pnc));
         pnl = unwrap(conn.default_session().open_receiver(url.path(), lopts));
-    }                                   // There must be no refcounting after here
-    start_connection(url, pnc);
+    }
+    start_connection(url, pnc); // See comment on start_connection
     return make_returned<receiver>(pnl);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/17d2a6f4/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 89cad31..46effcc 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -121,6 +121,11 @@ typedef enum {
 
 // Data to use with epoll.
 typedef struct epoll_extended_t {
+  /* epoll_ctl()/epoll_wake() do not form a memory barrier, so cached memory
+     writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
+     visible to epoll_wait() thread. Lock use of epoll_extended_t to be safe.
+  */
+  pmutex mutex;
   struct psocket_t *psocket;  // pconnection, listener, or NULL -> proactor
   int fd;
   epoll_type_t type;   // io/timer/wakeup
@@ -277,24 +282,33 @@ PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 static bool start_polling(epoll_extended_t *ee, int epollfd) {
   if (ee->polling)
     return false;
+  pmutex_init(&ee->mutex);
+  lock(&ee->mutex);
   ee->polling = true;
   struct epoll_event ev;
   ev.data.ptr = ee;
   ev.events = ee->wanted | EPOLLONESHOT;
-  return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
+  int fd = ee->fd;
+  unlock(&ee->mutex);
+  return (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0);
 }
 
 static void stop_polling(epoll_extended_t *ee, int epollfd) {
   // TODO: check for error, return bool or just log?
+  lock(&ee->mutex);
   if (ee->fd == -1 || !ee->polling || epollfd == -1)
     return;
   struct epoll_event ev;
   ev.data.ptr = ee;
   ev.events = 0;
+  unlock(&ee->mutex);
   if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1)
     EPOLL_FATAL("EPOLL_CTL_DEL", errno);
+  lock(&ee->mutex);
   ee->fd = -1;
   ee->polling = false;
+  unlock(&ee->mutex);
+  pmutex_finalize(&ee->mutex);
 }
 
 /*
@@ -628,8 +642,11 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) {
 static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
   struct epoll_event ev;
   ev.data.ptr = ee;
+  lock(&ee->mutex);
   ev.events = ee->wanted | EPOLLONESHOT;
-  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
+  int fd = ee->fd;
+  unlock(&ee->mutex);
+  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1)
     EPOLL_FATAL("arming polled file descriptor", errno);
 }
 
@@ -1168,9 +1185,11 @@ static void pconnection_start(pconnection_t *pc) {
 
   start_polling(&pc->timer.epoll_io, efd);  // TODO: check for error
   epoll_extended_t *ee = &pc->psocket.epoll_io;
+  lock(&ee->mutex);
   ee->fd = pc->psocket.sockfd;
   ee->wanted = EPOLLIN | EPOLLOUT;
   ee->polling = false;
+  unlock(&ee->mutex);
   start_polling(ee, efd);  // TODO: check for error
 }
 
@@ -1237,8 +1256,6 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
 
   lock(&pc->context.mutex);
   proactor_add(&pc->context);
-  pn_connection_open(pc->driver.connection); /* Auto-open */
-
   bool notify = false;
   bool notify_proactor = false;
 
@@ -1604,11 +1621,14 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
 
 /* Set up an epoll_extended_t to be used for wakeup or interrupts */
 static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
+  pmutex_init(&ee->mutex);
+  lock(&ee->mutex);
   ee->psocket = NULL;
   ee->fd = eventfd;
   ee->type = WAKE;
   ee->wanted = EPOLLIN;
   ee->polling = false;
+  unlock(&ee->mutex);
   start_polling(ee, epollfd);  // TODO: check for error
 }
 
@@ -1620,6 +1640,7 @@ pn_proactor_t *pn_proactor() {
   pmutex_init(&p->eventfd_mutex);
   pmutex_init(&p->bind_mutex);
   ptimer_init(&p->timer, 0);
+  pmutex_init(&p->overflow_mutex);
 
   if ((p->epollfd = epoll_create(1)) >= 0) {
     if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
@@ -1801,7 +1822,11 @@ static bool proactor_remove(pcontext_t *ctx) {
 }
 
 static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) {
-  if  (ee->fd == p->interruptfd) {        /* Interrupts have their own dedicated eventfd */
+  lock(&ee->mutex);
+  int fd = ee->fd;
+  unlock(&ee->mutex);
+
+  if  (fd == p->interruptfd) {        /* Interrupts have their own dedicated eventfd */
     (void)read_uint64(p->interruptfd);
     rearm(p, &p->epoll_interrupt);
     return proactor_process(p, PN_PROACTOR_INTERRUPT);
@@ -1847,23 +1872,27 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo
     assert(n == 1);
     epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
 
-    if (ee->type == WAKE) {
+    lock(&ee->mutex);
+    epoll_type_t type = ee->type;
+    psocket_t* psocket = ee->psocket;
+    unlock(&ee->mutex);
+    if (type == WAKE) {
       batch = process_inbound_wake(p, ee);
-    } else if (ee->type == PROACTOR_TIMER) {
+    } else if (type == PROACTOR_TIMER) {
       batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
     } else {
-      pconnection_t *pc = psocket_pconnection(ee->psocket);
+      pconnection_t *pc = psocket_pconnection(psocket);
       if (pc) {
-        if (ee->type == PCONNECTION_IO) {
+        if (type == PCONNECTION_IO) {
           batch = pconnection_process(pc, ev.events, false, false);
         } else {
-          assert(ee->type == PCONNECTION_TIMER);
+          assert(type == PCONNECTION_TIMER);
           batch = pconnection_process(pc, 0, true, false);
         }
       }
       else {
         // TODO: can any of the listener processing be parallelized like IOCP?
-        batch = listener_process(ee->psocket, ev.events);
+        batch = listener_process(psocket, ev.events);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/17d2a6f4/proton-c/src/tests/fdlimit.py
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/fdlimit.py b/proton-c/src/tests/fdlimit.py
index a31fa09..4f4a342 100644
--- a/proton-c/src/tests/fdlimit.py
+++ b/proton-c/src/tests/fdlimit.py
@@ -55,7 +55,7 @@ class FdLimitTest(ProcTestCase):
 
     def proc(self, *args, **kwargs):
         """Skip valgrind for all processes started by this test"""
-        return super(FdLimitTest, self).proc(*args, skip_valgrind=True, **kwargs)
+        return super(FdLimitTest, self).proc(*args, valgrind=False, **kwargs)
 
     def test_fd_limit_broker(self):
         """Check behaviour when running out of file descriptors on accept"""

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/17d2a6f4/tests/perf/quick_perf.py
----------------------------------------------------------------------
diff --git a/tests/perf/quick_perf.py b/tests/perf/quick_perf.py
index 59b47a2..7af57d2 100644
--- a/tests/perf/quick_perf.py
+++ b/tests/perf/quick_perf.py
@@ -61,11 +61,10 @@ except:
 
 
 # Use Proton-C reactor-recv as a relatively fast loopback "broker" for these tests
-server = Proc(["reactor-recv", "-X", "listening", "-a", linkaddr, "-c", str(mcount), "-R"], ready="listening", 
-              skip_valgrind=True, timeout=300)
+server = Proc(["reactor-recv", "-X", "listening", "-a", linkaddr, "-c", str(mcount), "-R"], ready="listening", valgrind=False, timeout=300)
 try:
     start = time.time()
-    client = Proc(perf_target, skip_valgrind=True, timeout=300)
+    client = Proc(perf_target, valgrind=False, timeout=300)
     print client.wait_exit()
     server.wait_exit()
     end = time.time()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/17d2a6f4/tools/py/proctest.py
----------------------------------------------------------------------
diff --git a/tools/py/proctest.py b/tools/py/proctest.py
index ebd7db2..fc4450f 100644
--- a/tools/py/proctest.py
+++ b/tools/py/proctest.py
@@ -1,4 +1,4 @@
-#
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -58,12 +58,18 @@ class ProcError(Exception):
     """An exception that displays failed process output"""
     def __init__(self, proc, what="bad exit status"):
         self.out = proc.out.strip()
+        returncode = getattr(proc, 'returncode') # Can be missing in some cases
+        if returncode == proc.valgrind_exit:
+            self.out = \
+            "\n==NOTE== valgrind options set for speed, for more detail run command with" \
+            "\n==NOTE== e.g. --leak-check=full --history-level=full --num-callers=40" \
+            + self.out
+        msg = "%s (exit=%s) command:\n%s" % (what, returncode, " ".join(proc.args))
         if self.out:
-            msgtail = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % self.out
+            msg += "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^" % self.out
         else:
-            msgtail = ", no output"
-        super(Exception, self, ).__init__(
-            "%s %s, code=%s%s" % (proc.args, what, getattr(proc, 'returncode', 'noreturn'), msgtail))
+            msg += "\n<<no output>>"
+        super(ProcError, self, ).__init__(msg)
 
 class NotFoundError(ProcError):
     pass
@@ -73,11 +79,7 @@ class Proc(Popen):
     'ready' pattern' Use self.out to access output (combined stdout and stderr).
     You can't set the Popen stdout and stderr arguments, they will be overwritten.
     """
-
-    if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
-        vg_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"]
-    else:
-        vg_args = []
+    valgrind_exit = 42          # Special exit code for valgrind errors
 
     @property
     def out(self):
@@ -85,18 +87,23 @@ class Proc(Popen):
         # Normalize line endings, os.tmpfile() opens in binary mode.
         return self._out.read().replace('\r\n','\n').replace('\r','\n')
 
-    def __init__(self, args, skip_valgrind=False, **kwargs):
+    def __init__(self, args, valgrind=True, helgrind=False, **kwargs):
         """Start an example process"""
-        args = list(args)
-        if skip_valgrind:
-            self.args = args
-        else:
-            self.args = self.vg_args + args
+        self.args = list(args)
         self.kwargs = kwargs
         self._out = tempfile.TemporaryFile()
+        valgrind_exe = valgrind and os.getenv("VALGRIND")
+        if valgrind_exe:
+            # run valgrind for speed, not for detailed information
+            vg = [valgrind_exe, "--quiet", "--num-callers=2",
+                  "--error-exitcode=%s" % self.valgrind_exit]
+            if helgrind:
+                vg += ["--tool=helgrind", "--history-level=none"]
+            else:
+                vg += ["--tool=memcheck", "--leak-check=full",  "--leak-resolution=low"]
+            self.args = vg + self.args
+            self._out.flush()
         try:
-            if (os.getenv("PROCTEST_VERBOSE")):
-                sys.stderr.write("\nstart proc: %s\n" % self.args)
             Popen.__init__(self, self.args, stdout=self._out, stderr=STDOUT, **kwargs)
         except OSError, e:
             if e.errno == errno.ENOENT:
@@ -161,9 +168,16 @@ class ProcTestCase(unittest.TestCase):
             p.kill()
         super(ProcTestCase, self).tearDown()
 
+    # Default value for valgrind= in proc() function if not explicitly set.
+    # Override by setting a "valgrind" member in subclass or instance.
+    valgrind=True
+
     def proc(self, *args, **kwargs):
         """Return a Proc() that will be automatically killed on teardown"""
-        p = Proc(*args, **kwargs)
+        if 'valgrind' in kwargs:
+            p = Proc(*args, **kwargs)
+        else:
+            p = Proc(*args, valgrind=self.valgrind, **kwargs)
         self.procs.append(p)
         return p
 
@@ -201,6 +215,26 @@ class ProcTestCase(unittest.TestCase):
         def assertMultiLineEqual(self, a, b):
             self.assertEqual(a, b)
 
+def find_file(filename, path):
+    """
+    Find filename in path. Path is a list of directory names or OS path strings
+    separated with os.pathsep. return absolute path to the file or None
+
+    """
+    dirs = reduce((lambda x,y: x+y), (p.split(os.pathsep) for p in path))
+    for d in dirs:
+        if os.path.exists(os.path.join(d, filename)):
+            return os.path.abspath(os.path.join(d, filename))
+    return None
+
+def find_exes(*filenames):
+    """
+    True if all filenames in the list are found on the system PATH.
+    """
+    for f in filenames:
+        if not find_file(f, os.getenv('PATH')): return False
+    return True
+
 from unittest import main
 if __name__ == "__main__":
     main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org