You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/27 18:04:49 UTC
[1/6] qpid-proton git commit: PROTON-1418: c pn_logf() leaks memory
on each call
Repository: qpid-proton
Updated Branches:
refs/heads/master 3cb2ace54 -> 2dae68d6a
PROTON-1418: c pn_logf() leaks memory on each call
Fixed and made more efficient - no allocation.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/57e6abae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/57e6abae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/57e6abae
Branch: refs/heads/master
Commit: 57e6abaec80b812b55be6317e948494ec93220da
Parents: 3cb2ace
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 11:25:37 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 11:25:37 2017 -0500
----------------------------------------------------------------------
proton-c/src/core/log.c | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/57e6abae/proton-c/src/core/log.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/log.c b/proton-c/src/core/log.c
index ff96ff0..60c72e7 100644
--- a/proton-c/src/core/log.c
+++ b/proton-c/src/core/log.c
@@ -49,9 +49,7 @@ void pn_log_logger(pn_logger_t new_logger) {
}
void pn_vlogf_impl(const char *fmt, va_list ap) {
- pn_string_t *msg = pn_string("");
- pn_string_vformat(msg, fmt, ap);
- fprintf(stderr, "%s\n", pn_string_get(msg));
+ vfprintf(stderr, fmt, ap);
}
/**@internal
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/6] qpid-proton git commit: PROTON-1403: c libuv proactor fixes and
tests
Posted by ac...@apache.org.
PROTON-1403: c libuv proactor fixes and tests
- c-proactor-tests run to transport closed
- c-proactor-tests allows running selected tests
- c-proactor-tests added more tests
- fix casting error, race conditions
- re-orginaize get/wait loops
- log proactor and listener events via PN_TRACE_LOG
exampletest.py grep for "listening" in output instead of connect retry:
faster than dummy connects and avoids confusing "no protocol header" errors.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/85585202
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/85585202
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/85585202
Branch: refs/heads/master
Commit: 855852029e1b4d8b5d4c939f5b9c616808701813
Parents: 54e8099
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Feb 21 10:55:32 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 11:32:13 2017 -0500
----------------------------------------------------------------------
examples/c/proactor/broker.c | 1 +
examples/c/proactor/direct.c | 1 +
examples/c/proactor/test.py | 17 +-
examples/exampletest.py | 38 ++--
proton-c/src/proactor/libuv.c | 331 ++++++++++++++++++-----------------
proton-c/src/tests/proactor.c | 72 ++++++--
proton-c/src/tests/test_tools.h | 13 ++
7 files changed, 261 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index a7d8863..370d2e8 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -472,6 +472,7 @@ int main(int argc, char **argv) {
pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
+ fflush(stdout);
if (url) pn_url_free(url);
if (b.threads <= 0) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index de2d6aa..126ae4f 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -344,6 +344,7 @@ int main(int argc, char **argv) {
app.proactor = pn_proactor();
pn_proactor_listen(app.proactor, pn_listener(), host, port, 16);
printf("listening on '%s:%s'\n", host, port);
+ fflush(stdout);
if (url) pn_url_free(url);
do {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 7d6f3fc..f6a6562 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -53,23 +53,13 @@ class CExampleTest(BrokerTestCase):
r = self.proc(["receive", "-a", self.addr, "-m3"])
self.assertEqual(receive_expect(3), r.wait_out())
- def retry(self, args, max=10):
- """Run until output does not contain "connection refused", up to max retries"""
- while True:
- try:
- return self.proc(args).wait_out()
- except ProcError, e:
- if "connection refused" in e.args[0] and max > 0:
- max -= 1
- continue
- raise
-
def test_send_direct(self):
"""Send to direct server"""
with bind0() as sock:
addr = "127.0.0.1:%s/examples" % sock.port()
d = self.proc(["direct", "-a", addr])
- self.assertEqual("100 messages sent and acknowledged\n", self.retry(["send", "-a", addr]))
+ d.wait_re("listening")
+ self.assertEqual("100 messages sent and acknowledged\n", self.proc(["send", "-a", addr]).wait_out())
self.assertIn(receive_expect(100), d.wait_out())
def test_receive_direct(self):
@@ -77,7 +67,8 @@ class CExampleTest(BrokerTestCase):
with bind0() as sock:
addr = "127.0.0.1:%s/examples" % sock.port()
d = self.proc(["direct", "-a", addr])
- self.assertEqual(receive_expect(100), self.retry(["receive", "-a", addr]))
+ d.wait_re("listenin")
+ self.assertEqual(receive_expect(100), self.proc(["receive", "-a", addr]).wait_out())
self.assertIn("100 messages sent and acknowledged\n", d.wait_out())
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
index 8dbd1ed..546b426 100644
--- a/examples/exampletest.py
+++ b/examples/exampletest.py
@@ -104,6 +104,20 @@ class Proc(Popen):
raise ProcError(self)
return self.out
+ def wait_re(self, regexp, timeout=10):
+ """
+ Wait for regexp to appear in the output, returns the re.search match result.
+ The target process should flush() important output to ensure it appears.
+ """
+ if timeout:
+ deadline = time.time() + timeout
+ while timeout is None or time.time() < deadline:
+ match = re.search(regexp, self.out)
+ if match:
+ return match
+ time.sleep(0.01) # Not very efficient
+ raise ProcError(self, "gave up waiting for '%s' after %ss" % (regexp, timeout))
+
# Work-around older python unittest that lacks setUpClass.
if hasattr(unittest.TestCase, 'setUpClass') and hasattr(unittest.TestCase, 'tearDownClass'):
TestCase = unittest.TestCase
@@ -144,21 +158,6 @@ class ExampleTestCase(TestCase):
self.procs.append(p)
return p
-def wait_port(port, timeout=10):
- """Wait up to timeout for port to be connectable."""
- if timeout:
- deadline = time.time() + timeout
- while (timeout is None or time.time() < deadline):
- try:
- s = socket.create_connection((None, port), timeout) # Works for IPv6 and v4
- s.close()
- return
- except socket.error, e:
- if e.errno != errno.ECONNREFUSED: # Only retry on connection refused error.
- raise
- raise socket.timeout("waiting for port %s" % port)
-
-
class BrokerTestCase(ExampleTestCase):
"""
ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
@@ -171,13 +170,14 @@ class BrokerTestCase(ExampleTestCase):
cls.port = sock.port()
cls.addr = "127.0.0.1:%s/examples" % (cls.port)
cls.broker = None # In case Proc throws, create the attribute.
- cls.broker = Proc(cls.broker_exe + ["-a", cls.addr])
+ cls.broker = Proc(cls.broker_exe + ["-a", cls.addr], bufsize=0)
try:
- wait_port(cls.port)
+ cls.broker.wait_re("listening")
except Exception, e:
cls.broker.kill()
- raise ProcError(cls.broker, "timed out waiting for port")
- sock.close()
+ raise
+ finally:
+ sock.close()
@classmethod
def tearDownClass(cls):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 8e2ff05..322f353 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -44,7 +44,7 @@
/*
libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe
- "wakeup" that can wake the uv_loop from another thread.
+ call that we use to make uv_run() return.
To provide concurrency proactor uses a "leader-worker-follower" model, threads take
turns at the roles:
@@ -97,6 +97,9 @@ typedef enum {
typedef struct psocket_t {
/* Immutable */
pn_proactor_t *proactor;
+ char host[NI_MAXHOST];
+ char port[NI_MAXSERV];
+ bool is_conn;
/* Protected by proactor.lock */
struct psocket_t* next;
@@ -106,9 +109,6 @@ typedef struct psocket_t {
/* Only used by leader thread when it owns the psocket */
uv_tcp_t tcp;
- char host[NI_MAXHOST];
- char port[NI_MAXSERV];
- bool is_conn;
} psocket_t;
/* Special value for psocket.next pointer when socket is not on any any list. */
@@ -165,7 +165,6 @@ struct pn_listener_t {
pn_record_t *attachments;
void *context;
size_t backlog;
- bool closing; /* close requested or closed by error */
/* Only used in leader thread */
size_t connections; /* number of connections waiting to be accepted */
@@ -201,15 +200,18 @@ struct pn_proactor_t {
};
/* Push ps to back of q. Must not be on a different queue */
-static void push_lh(queue *q, psocket_t *ps) {
- assert(ps->next == &UNLISTED);
- ps->next = NULL;
- if (!q->front) {
- q->front = q->back = ps;
- } else {
- q->back->next = ps;
- q->back = ps;
+static bool push_lh(queue *q, psocket_t *ps) {
+ if (ps->next == &UNLISTED) {
+ ps->next = NULL;
+ if (!q->front) {
+ q->front = q->back = ps;
+ } else {
+ q->back->next = ps;
+ q->back = ps;
+ }
+ return true;
}
+ return false;
}
/* Pop returns front of q or NULL if empty */
@@ -222,23 +224,33 @@ static psocket_t* pop_lh(queue *q) {
return ps;
}
+/* Notify the leader thread that there is something to do outside of uv_run() */
+static inline void notify(pn_proactor_t* p) {
+ uv_async_send(&p->async);
+}
+
+static void to_leader_lh(psocket_t *ps) {
+ if (push_lh(&ps->proactor->leader_q, ps)) {
+ ps->state = ON_LEADER;
+ }
+}
+
/* Queue an action for the leader thread */
static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
uv_mutex_lock(&ps->proactor->lock);
ps->action = action;
- if (ps->next == &UNLISTED) {
- ps->state = ON_LEADER;
- push_lh(&ps->proactor->leader_q, ps);
- }
+ to_leader_lh(ps);
uv_mutex_unlock(&ps->proactor->lock);
- uv_async_send(&ps->proactor->async); /* Wake leader */
+ notify(ps->proactor);
}
/* Push to the worker thread */
static void to_worker(psocket_t *ps) {
uv_mutex_lock(&ps->proactor->lock);
- ps->state = ON_WORKER;
- push_lh(&ps->proactor->worker_q, ps);
+ if (push_lh(&ps->proactor->worker_q, ps)) {
+ ps->state = ON_WORKER;
+ }
+ notify(ps->proactor);
uv_mutex_unlock(&ps->proactor->lock);
}
@@ -260,8 +272,8 @@ static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
push_lh(&ps->proactor->leader_q, ps);
ps->state = ON_LEADER; /* Otherwise notify the leader */
}
- uv_mutex_unlock(&ps->proactor->lock);
uv_async_send(&ps->proactor->async); /* Wake leader */
+ uv_mutex_unlock(&ps->proactor->lock);
}
static inline pconnection_t *as_pconnection(psocket_t* ps) {
@@ -311,7 +323,7 @@ static void leader_count(pn_proactor_t *p, int change) {
p->count += change;
if (p->count == 0) {
p->inactive = true;
- uv_async_send(&p->async); /* Wake leader */
+ notify(p);
}
uv_mutex_unlock(&p->lock);
}
@@ -409,10 +421,8 @@ static int leader_init(psocket_t *ps) {
/* Outgoing connection */
static void on_connect(uv_connect_t *connect, int err) {
pconnection_t *pc = (pconnection_t*)connect->data;
- if (!err) {
+ if (!pconnection_error(pc, err, "on connect to")) {
pconnection_to_worker(pc);
- } else {
- pconnection_error(pc, err, "on connect to");
}
}
@@ -552,7 +562,7 @@ static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
static void pconnection_to_uv(pconnection_t *pc) {
to_uv(&pc->psocket); /* Assume we're going to UV unless sent elsewhere */
if (pn_connection_driver_finished(&pc->driver)) {
- if (!uv_is_closing((uv_handle_t*)&pc->psocket)) {
+ if (!uv_is_closing((uv_handle_t*)&pc->psocket.tcp)) {
uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
}
return;
@@ -653,8 +663,64 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
return &p->batch;
}
+void leader_wake_connection(psocket_t *ps) {
+ assert(ps->state == ON_LEADER);
+ pconnection_t *pc = as_pconnection(ps);
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+ pconnection_to_worker(pc);
+}
+
+static void on_stopping(uv_handle_t* h, void* v) {
+ /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
+ if (h->type == UV_TCP && !uv_is_closing(h)) {
+ uv_close(h, on_close_psocket);
+ }
+}
+
+static pn_event_t *log_event(void* p, pn_event_t *e) {
+ if (e) {
+ pn_logf("[%p]:(%s)\n", (void*)p, pn_event_type_name(pn_event_type(e)));
+ }
+ return e;
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+ pn_listener_t *l = batch_listener(batch);
+ assert(l->psocket.state == ON_WORKER);
+ pn_event_t *prev = pn_collector_prev(l->collector);
+ if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
+ l->err = UV_EOF;
+ }
+ return log_event(l, pn_collector_next(l->collector));
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+ pn_proactor_t *p = batch_proactor(batch);
+ assert(p->batch_working);
+ return log_event(p, pn_collector_next(p->collector));
+}
+
+static void pn_listener_free(pn_listener_t *l) {
+ /* No assert(l->psocket.state == ON_WORKER); can be called during shutdown */
+ if (l) {
+ if (l->collector) pn_collector_free(l->collector);
+ if (l->condition) pn_condition_free(l->condition);
+ if (l->attachments) pn_free(l->attachments);
+ free(l);
+ }
+}
+
+void leader_listener_close(psocket_t *ps) {
+ assert(ps->state = ON_LEADER);
+ pn_listener_t *l = (pn_listener_t*)ps;
+ l->err = UV_EOF;
+ listener_to_uv(l);
+}
+
/* Return the next event batch or 0 if no events are available in the worker_q */
-static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
+static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
+ /* FIXME aconway 2017-02-21: generate these in parallel? */
if (!p->batch_working) { /* Can generate proactor events */
if (p->inactive) {
p->inactive = false;
@@ -680,55 +746,9 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
return 0;
}
-pn_listener_t *pn_event_listener(pn_event_t *e) {
- return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
-}
+/* Process the leader_q and the UV loop, in the leader thread */
+static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
-pn_proactor_t *pn_event_proactor(pn_event_t *e) {
- if (pn_event_class(e) == pn_proactor__class()) {
- return (pn_proactor_t*)pn_event_context(e);
- }
- pn_listener_t *l = pn_event_listener(e);
- if (l) {
- return l->psocket.proactor;
- }
- pn_connection_t *c = pn_event_connection(e);
- if (c) {
- return pn_connection_proactor(pn_event_connection(e));
- }
- return NULL;
-}
-
-void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
- pconnection_t *pc = batch_pconnection(batch);
- if (pc) {
- assert(pc->psocket.state == ON_WORKER);
- if (pn_connection_driver_has_event(&pc->driver)) {
- /* Process all events before going back to leader */
- pconnection_to_worker(pc);
- } else {
- to_leader(&pc->psocket, psocket_to_uv);
- }
- return;
- }
- pn_listener_t *l = batch_listener(batch);
- if (l) {
- assert(l->psocket.state == ON_WORKER);
- to_leader(&l->psocket, psocket_to_uv);
- return;
- }
- pn_proactor_t *bp = batch_proactor(batch);
- if (bp == p) {
- uv_mutex_lock(&p->lock);
- p->batch_working = false;
- uv_mutex_unlock(&p->lock);
- uv_async_send(&p->async); /* Wake leader */
- return;
- }
-}
-
-/* Process the leader_q, in the leader thread */
-static void leader_process_lh(pn_proactor_t *p) {
if (p->timeout_request) {
p->timeout_request = false;
if (p->timeout) {
@@ -738,7 +758,6 @@ static void leader_process_lh(pn_proactor_t *p) {
}
}
for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
- assert(ps->state == ON_LEADER);
if (ps->action) {
uv_mutex_unlock(&p->lock);
ps->action(ps);
@@ -750,68 +769,111 @@ static void leader_process_lh(pn_proactor_t *p) {
ps->wakeup = NULL;
uv_mutex_lock(&p->lock);
}
+ pn_event_batch_t *batch = get_batch_lh(p);
+ if (batch) return batch;
}
+ uv_mutex_unlock(&p->lock);
+ uv_run(&p->loop, mode);
+ uv_mutex_lock(&p->lock);
+ return get_batch_lh(p);
}
-/* Run follower/leader loop till we can return an event and be a worker */
-pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+/* ==== public API ==== */
+
+pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
- /* Try to grab work immediately. */
pn_event_batch_t *batch = get_batch_lh(p);
- if (batch == NULL) {
- /* No work available, follow the leader */
- while (p->has_leader) {
- uv_cond_wait(&p->cond, &p->lock);
- }
- /* Lead till there is work to do. */
+ if (batch == NULL && !p->has_leader) {
+ /* Try a non-blocking lead to generate some work */
p->has_leader = true;
- while (batch == NULL) {
- leader_process_lh(p);
- batch = get_batch_lh(p);
- if (batch == NULL) {
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, UV_RUN_ONCE);
- uv_mutex_lock(&p->lock);
- }
- }
- /* Signal the next leader and go to work */
+ batch = leader_lead_lh(p, UV_RUN_NOWAIT);
p->has_leader = false;
- uv_cond_signal(&p->cond);
+ uv_cond_signal(&p->cond); /* Notify next leader */
}
uv_mutex_unlock(&p->lock);
return batch;
}
-pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
pn_event_batch_t *batch = get_batch_lh(p);
- if (batch == NULL && !p->has_leader) {
- /* If there is no leader, try a non-waiting lead to generate some work */
- p->has_leader = true;
- leader_process_lh(p);
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, UV_RUN_NOWAIT);
- uv_mutex_lock(&p->lock);
+ while (!batch && p->has_leader) {
+ uv_cond_wait(&p->cond, &p->lock); /* Follow the leader */
batch = get_batch_lh(p);
+ }
+ if (!batch) { /* Become leader */
+ assert(!p->has_leader); /* Implied by loop condition */
+ p->has_leader = true;
+ do {
+ batch = leader_lead_lh(p, UV_RUN_ONCE);
+ } while (!batch);
p->has_leader = false;
+ uv_cond_signal(&p->cond);
}
uv_mutex_unlock(&p->lock);
return batch;
}
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ if (pc) {
+ assert(pc->psocket.state == ON_WORKER);
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ /* Process all events before going back to leader */
+ pconnection_to_worker(pc);
+ } else {
+ to_leader(&pc->psocket, psocket_to_uv);
+ }
+ return;
+ }
+ pn_listener_t *l = batch_listener(batch);
+ if (l) {
+ assert(l->psocket.state == ON_WORKER);
+ to_leader(&l->psocket, psocket_to_uv);
+ return;
+ }
+ pn_proactor_t *bp = batch_proactor(batch);
+ if (bp == p) {
+ uv_mutex_lock(&p->lock);
+ p->batch_working = false;
+ notify(p);
+ uv_mutex_unlock(&p->lock);
+ return;
+ }
+}
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+ return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+ if (pn_event_class(e) == pn_proactor__class()) {
+ return (pn_proactor_t*)pn_event_context(e);
+ }
+ pn_listener_t *l = pn_event_listener(e);
+ if (l) {
+ return l->psocket.proactor;
+ }
+ pn_connection_t *c = pn_event_connection(e);
+ if (c) {
+ return pn_connection_proactor(pn_event_connection(e));
+ }
+ return NULL;
+}
+
void pn_proactor_interrupt(pn_proactor_t *p) {
uv_mutex_lock(&p->lock);
++p->interrupt;
+ notify(p);
uv_mutex_unlock(&p->lock);
- uv_async_send(&p->async); /* Wake the UV loop */
}
void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
uv_mutex_lock(&p->lock);
p->timeout = t;
p->timeout_request = true;
+ notify(p);
uv_mutex_unlock(&p->lock);
- uv_async_send(&p->async); /* Wake the UV loop */
}
int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
@@ -836,14 +898,6 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
return pc ? pc->psocket.proactor : NULL;
}
-void leader_wake_connection(psocket_t *ps) {
- assert(ps->state == ON_LEADER);
- pconnection_t *pc = as_pconnection(ps);
- pn_connection_t *c = pc->driver.connection;
- pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
- pconnection_to_worker(pc);
-}
-
void pn_connection_wake(pn_connection_t* c) {
/* May be called from any thread */
wakeup(&get_pconnection(c)->psocket, leader_wake_connection);
@@ -858,18 +912,11 @@ pn_proactor_t *pn_proactor() {
uv_mutex_init(&p->lock);
uv_cond_init(&p->cond);
uv_async_init(&p->loop, &p->async, NULL);
- uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
+ uv_timer_init(&p->loop, &p->timer);
p->timer.data = p;
return p;
}
-static void on_stopping(uv_handle_t* h, void* v) {
- /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
- if (h->type == UV_TCP && !uv_is_closing(h)) {
- uv_close(h, on_close_psocket);
- }
-}
-
void pn_proactor_free(pn_proactor_t *p) {
uv_timer_stop(&p->timer);
uv_close((uv_handle_t*)&p->timer, NULL);
@@ -885,39 +932,6 @@ void pn_proactor_free(pn_proactor_t *p) {
free(p);
}
-static pn_event_t *log_event(void* p, pn_event_t *e) {
- if (e) {
- pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
- }
- return e;
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
- pn_listener_t *l = batch_listener(batch);
- assert(l->psocket.state == ON_WORKER);
- pn_event_t *prev = pn_collector_prev(l->collector);
- if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
- l->err = UV_EOF;
- }
- return log_event(l, pn_collector_next(l->collector));
-}
-
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
- pn_proactor_t *p = batch_proactor(batch);
- assert(p->batch_working);
- return log_event(p, pn_collector_next(p->collector));
-}
-
-static void pn_listener_free(pn_listener_t *l) {
- /* No assert(l->psocket.state == ON_WORKER); can be called during shutdown */
- if (l) {
- if (l->collector) pn_collector_free(l->collector);
- if (l->condition) pn_condition_free(l->condition);
- if (l->attachments) pn_free(l->attachments);
- free(l);
- }
-}
-
pn_listener_t *pn_listener(void) {
pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
if (l) {
@@ -933,13 +947,6 @@ pn_listener_t *pn_listener(void) {
return l;
}
-void leader_listener_close(psocket_t *ps) {
- assert(ps->state = ON_LEADER);
- pn_listener_t *l = (pn_listener_t*)ps;
- l->err = UV_EOF;
- listener_to_uv(l);
-}
-
void pn_listener_close(pn_listener_t* l) {
/* This can be called from any thread, not just the owner of l */
wakeup(&l->psocket, leader_listener_close);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 6b95c11..6595a0b 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -98,7 +98,7 @@ static void proactor_test_init(proactor_test_t *pts, size_t n) {
pn_proactor_get. Continue till all handlers return H_FINISHED (and return 0) or one
returns H_FAILED (and return non-0)
*/
-int proactor_test_run(proactor_test_t *pts, size_t n) {
+static int proactor_test_run(proactor_test_t *pts, size_t n) {
/* Make sure pts are initialized */
proactor_test_init(pts, n);
size_t finished = 0;
@@ -127,17 +127,11 @@ int proactor_test_run(proactor_test_t *pts, size_t n) {
/* Handler for test_listen_connect, does both sides of the connection */
-handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
+static handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
pn_connection_t *c = pn_event_connection(e);
pn_listener_t *l = pn_event_listener(e);
switch (pn_event_type(e)) {
- /* Ignore these events */
- case PN_CONNECTION_LOCAL_OPEN:
- case PN_CONNECTION_BOUND:
- case PN_CONNECTION_INIT:
- return H_CONTINUE;
-
/* Act on these events */
case PN_LISTENER_ACCEPT: {
pn_connection_t *accepted = pn_connection();
@@ -149,20 +143,22 @@ handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
case PN_CONNECTION_REMOTE_OPEN:
if (pn_connection_state(c) | PN_LOCAL_ACTIVE) { /* Client is fully open - the test is done */
pn_connection_close(c);
- return H_FINISHED;
} else { /* Server returns the open */
pn_connection_open(c);
}
+ return H_CONTINUE;
case PN_CONNECTION_REMOTE_CLOSE:
if (pn_connection_state(c) | PN_LOCAL_ACTIVE) {
pn_connection_close(c); /* Return the close */
}
+ return H_CONTINUE;
+
+ case PN_TRANSPORT_CLOSED:
return H_FINISHED;
default:
- TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
- return H_FAILED;
+ return H_CONTINUE;
break;
}
}
@@ -172,13 +168,13 @@ static void test_early_error(test_t *t) {
pn_proactor_t *p = pn_proactor();
pn_proactor_set_timeout(p, timeout); /* In case of hang */
pn_connection_t *c = pn_connection();
- pn_proactor_connect(p, c, "badhost", "amqp");
+ pn_proactor_connect(p, c, localhost, "1"); /* Bad port */
pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
pn_listener_t *l = pn_listener();
- pn_proactor_listen(p, l, "badhost", "amqp", 1);
+ pn_proactor_listen(p, l, localhost, "1", 1); /* Bad port */
etype = wait_for(p, PN_LISTENER_CLOSE);
TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
@@ -204,6 +200,45 @@ static void test_listen_connect(test_t *t) {
pn_proactor_free(server);
}
+static handler_state_t connection_wakeup_handler(test_t *t, pn_event_t *e) {
+ pn_connection_t *c = pn_event_connection(e);
+ switch (pn_event_type(e)) {
+
+ case PN_CONNECTION_REMOTE_OPEN:
+ if (pn_connection_state(c) | PN_LOCAL_UNINIT) {
+ pn_connection_open(c); /* Server returns the open */
+ }
+ return H_FINISHED; /* Finish when open at both ends */
+
+ default:
+ /* Otherwise same as listen_connect_handler */
+ return listen_connect_handler(t, e);
+ }
+}
+
+/* Test waking up a connection that is idle */
+static void test_connection_wakeup(test_t *t) {
+ proactor_test_t pts[] = { { t, connection_wakeup_handler }, { t, connection_wakeup_handler } };
+ proactor_test_init(pts, 2);
+ pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+ test_port_t port = test_port(); /* Hold a port */
+ pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
+ pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
+ if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+ sock_close(port.sock);
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(client, c, localhost, port.str);
+ proactor_test_run(pts, 2); /* Will finish when client is connected */
+ TEST_CHECK(t, NULL == pn_proactor_get(client), ""); /* Should be idle */
+ pn_connection_wake(c);
+ etype = wait_next(client);
+ /* FIXME aconway 2017-02-21: TEST_EVENT_TYPE */
+ TEST_CHECK(t, PN_CONNECTION_WAKE == etype, pn_event_type_name(etype));
+ }
+ pn_proactor_free(client);
+ pn_proactor_free(server);
+}
+
/* Test that INACTIVE event is generated when last connections/listeners closes. */
static void test_inactive(test_t *t) {
proactor_test_t pts[] = { { t, listen_connect_handler }, { t, listen_connect_handler }};
@@ -226,11 +261,12 @@ static void test_inactive(test_t *t) {
pn_proactor_free(server);
}
-int main(int argv, char** argc) {
+int main(int argc, char **argv) {
int failed = 0;
- RUN_TEST(failed, t, test_inactive(&t));
- RUN_TEST(failed, t, test_interrupt_timeout(&t));
- RUN_TEST(failed, t, test_early_error(&t));
- RUN_TEST(failed, t, test_listen_connect(&t));
+ RUN_ARGV_TEST(failed, t, test_inactive(&t));
+ RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
+ RUN_ARGV_TEST(failed, t, test_early_error(&t));
+ RUN_ARGV_TEST(failed, t, test_listen_connect(&t));
+ RUN_ARGV_TEST(failed, t, test_connection_wakeup(&t));
return failed;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 2663dcf..7006334 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -104,6 +104,19 @@ static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const ch
} \
} while(0)
+/* Like RUN_TEST but only if one of the argv strings is found in the test EXPR */
+#define RUN_ARGV_TEST(FAILED, T, EXPR) do { \
+ if (argc == 1) { \
+ RUN_TEST(FAILED, T, EXPR); \
+ } else { \
+ for (int i = 1; i < argc; ++i) { \
+ if (strstr(#EXPR, argv[i])) { \
+ RUN_TEST(FAILED, T, EXPR); \
+ break; \
+ } \
+ } \
+ } \
+ } while(0)
/* Some very simple platform-secifics to acquire an unused socket */
#if defined(WIN32)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/6] qpid-proton git commit: PROTON-1419: c libuv proactor log all
events
Posted by ac...@apache.org.
PROTON-1419: c libuv proactor log all events
Use the pn_log() function to log listener and proactor events when PN_TRACE_LOG is set.
The current logging features are not ideal but this fix is consistent with the
status quo so if/when we upgrade the logging framework this will be included.
Add missing function definition for pn_connection_driver_logf
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/54e8099e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/54e8099e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/54e8099e
Branch: refs/heads/master
Commit: 54e8099e274938db1a3d458eff18c374af149945
Parents: 57e6aba
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 11:30:37 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 11:30:37 2017 -0500
----------------------------------------------------------------------
proton-c/include/proton/connection_driver.h | 2 +-
proton-c/src/core/connection_driver.c | 9 ++++++---
proton-c/src/core/log_private.h | 6 +++---
proton-c/src/proactor/libuv.c | 13 +++++++++++--
4 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/54e8099e/proton-c/include/proton/connection_driver.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_driver.h b/proton-c/include/proton/connection_driver.h
index 53a9c56..5632050 100644
--- a/proton-c/include/proton/connection_driver.h
+++ b/proton-c/include/proton/connection_driver.h
@@ -227,7 +227,7 @@ PN_EXTERN void pn_connection_driver_log(pn_connection_driver_t *d, const char *m
/**
* Log a printf formatted message using the connection's transport log.
*/
-PN_EXTERN void pn_connection_driver_logf(pn_connection_driver_t *d, char *fmt, ...);
+PN_EXTERN void pn_connection_driver_logf(pn_connection_driver_t *d, const char *fmt, ...);
/**
* Log a printf formatted message using the connection's transport log.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/54e8099e/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index 3393e64..b4d8178 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -153,12 +153,15 @@ void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg) {
pn_transport_log(d->transport, msg);
}
-void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap) {
+void pn_connection_driver_logf(pn_connection_driver_t *d, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
pn_transport_vlogf(d->transport, fmt, ap);
+ va_end(ap);
}
-void pn_connection_driver_vlog(pn_connection_driver_t *d, const char *msg) {
- pn_transport_log(d->transport, msg);
+void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap) {
+ pn_transport_vlogf(d->transport, fmt, ap);
}
pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/54e8099e/proton-c/src/core/log_private.h
----------------------------------------------------------------------
diff --git a/proton-c/src/core/log_private.h b/proton-c/src/core/log_private.h
index 4725045..828e189 100644
--- a/proton-c/src/core/log_private.h
+++ b/proton-c/src/core/log_private.h
@@ -42,12 +42,12 @@
} while(0)
/** Return true if logging is enabled. */
-bool pn_log_enabled(void);
+PN_EXTERN bool pn_log_enabled(void);
/**@internal*/
-void pn_logf_impl(const char* fmt, ...);
+PN_EXTERN void pn_logf_impl(const char* fmt, ...);
/**@internal*/
-void pn_vlogf_impl(const char *fmt, va_list ap);
+PN_EXTERN void pn_vlogf_impl(const char *fmt, va_list ap);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/54e8099e/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 61239ac..8e2ff05 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -19,6 +19,8 @@
*
*/
+#include "../core/log_private.h"
+
#include <proton/condition.h>
#include <proton/connection_driver.h>
#include <proton/engine.h>
@@ -883,6 +885,13 @@ void pn_proactor_free(pn_proactor_t *p) {
free(p);
}
+static pn_event_t *log_event(void* p, pn_event_t *e) {
+ if (e) {
+ pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
+ }
+ return e;
+}
+
static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
pn_listener_t *l = batch_listener(batch);
assert(l->psocket.state == ON_WORKER);
@@ -890,13 +899,13 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
l->err = UV_EOF;
}
- return pn_collector_next(l->collector);
+ return log_event(l, pn_collector_next(l->collector));
}
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
pn_proactor_t *p = batch_proactor(batch);
assert(p->batch_working);
- return pn_collector_next(p->collector);
+ return log_event(p, pn_collector_next(p->collector));
}
static void pn_listener_free(pn_listener_t *l) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/6] qpid-proton git commit: NO-JIRA: Fix unreliable timeout test in
engine.py
Posted by ac...@apache.org.
NO-JIRA: Fix unreliable timeout test in engine.py
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a6b41641
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a6b41641
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a6b41641
Branch: refs/heads/master
Commit: a6b416411e2e87334e61fb9cbfe3d9840fef2c4a
Parents: 8558520
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Feb 24 01:48:44 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 11:33:13 2017 -0500
----------------------------------------------------------------------
tests/python/proton_tests/engine.py | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a6b41641/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index 05e9f8f..7bb5f78 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -2603,9 +2603,9 @@ class IdleTimeoutEventTest(PeerTest):
self.transport.idle_timeout = self.delay
self.connection.open()
self.half_pump()
- self.transport.tick(time())
- sleep(self.delay*2)
- self.transport.tick(time())
+ t = time()
+ self.transport.tick(t)
+ self.transport.tick(t + self.delay*4)
self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[6/6] qpid-proton git commit: PROTON-1413: c proactor fix assertion
errors, simplify code
Posted by ac...@apache.org.
PROTON-1413: c proactor fix assertion errors, simplify code
- expanded & improved tests/proactor.c tests and tests/test_tools.h framework
- drop wakeup/action callbacks
- simpler listening logic using locks for concurrent leader/worker access
- centralize logic for socket processing and error handling
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2dae68d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2dae68d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2dae68d6
Branch: refs/heads/master
Commit: 2dae68d6a2a98f457ca7691f74d56296431de866
Parents: 105b939
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 12:54:15 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 12:55:12 2017 -0500
----------------------------------------------------------------------
proton-c/src/proactor/libuv.c | 713 +++++++++++++++++------------------
proton-c/src/tests/proactor.c | 348 +++++++++--------
proton-c/src/tests/test_tools.h | 99 +++--
3 files changed, 584 insertions(+), 576 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2dae68d6/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 322f353..2fafbb3 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -43,34 +43,25 @@
#include <string.h>
/*
- libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe
- call that we use to make uv_run() return.
+ libuv functions are thread unsafe, we use a"leader-worker-follower" model as follows:
- To provide concurrency proactor uses a "leader-worker-follower" model, threads take
- turns at the roles:
+ - At most one thread at a time is the "leader". The leader runs the UV loop till there
+ are events to process and then becomes a "worker"n
- - a single "leader" thread uses libuv, it runs the uv_loop the in short bursts to
- generate work. Once there is work it becomes becomes a "worker" thread, another thread
- takes over as leader.
+ - Concurrent "worker" threads process events for separate connections or listeners.
+ When they run out of work they become "followers"
- - "workers" handle events for separate connections or listeners concurrently. They do as
- much work as they can, when none is left they become "followers"
+ - A "follower" is idle, waiting for work. When the leader becomes a worker, one follower
+ takes over as the new leader.
- - "followers" wait for the leader to generate work. One follower becomes the new leader,
- the others become workers or continue to follow till they can get work.
-
- Any thread in a pool can take on any role necessary at run-time. All the work generated
- by an IO wake-up for a single connection can be processed in a single single worker
- thread to minimize context switching.
+ Any thread that calls pn_proactor_wait() or pn_proactor_get() can take on any of the
+ roles as required at run-time. Monitored sockets (connections or listeners) are passed
+ between threads on thread-safe queues.
Function naming:
- - on_* - called in leader thread via uv_run().
- - leader_* - called in leader thread (either leader_q processing or from an on_ function)
+ - on_*() - libuv callbacks, called in leader thread via uv_run().
+ - leader_* - only called in leader thread from
- *_lh - called with the relevant lock held
-
- LIFECYCLE: pconnection_t and pn_listener_t objects must not be deleted until all their
- UV handles have received a close callback. Freeing resources is initiated by uv_close()
- of the uv_tcp_t handle, and executed in an on_close() handler when it is safe.
*/
const char *AMQP_PORT = "5672";
@@ -86,13 +77,6 @@ PN_HANDLE(PN_PROACTOR)
PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
-/* A psocket (connection or listener) has the following mutually exclusive states. */
-typedef enum {
- ON_WORKER, /* On worker_q or in use by user code in worker thread */
- ON_LEADER, /* On leader_q or in use the leader loop */
- ON_UV /* Scheduled for a UV event, or in use by leader thread in on_ handler*/
-} psocket_state_t;
-
/* common to connection and listener */
typedef struct psocket_t {
/* Immutable */
@@ -103,14 +87,15 @@ typedef struct psocket_t {
/* Protected by proactor.lock */
struct psocket_t* next;
- psocket_state_t state;
+ bool working; /* Owned by a worker thread */
void (*action)(struct psocket_t*); /* deferred action for leader */
- void (*wakeup)(struct psocket_t*); /* wakeup action for leader */
/* Only used by leader thread when it owns the psocket */
uv_tcp_t tcp;
} psocket_t;
+typedef struct queue { psocket_t *front, *back; } queue;
+
/* Special value for psocket.next pointer when socket is not on any any list. */
psocket_t UNLISTED;
@@ -118,8 +103,8 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const ch
ps->proactor = p;
ps->next = &UNLISTED;
ps->is_conn = is_conn;
- ps->tcp.data = ps;
- ps->state = ON_WORKER;
+ ps->tcp.data = NULL; /* Set in leader_init */
+ ps->working = true;
/* For platforms that don't know about "amqp" and "amqps" service names. */
if (port && strcmp(port, AMQP_PORT_NAME) == 0)
@@ -136,7 +121,7 @@ static inline const char* fixstr(const char* str) {
return str[0] == '\001' ? NULL : str;
}
-/* Holds a psocket and a pn_connection_driver */
+/* a connection socket */
typedef struct pconnection_t {
psocket_t psocket;
@@ -149,31 +134,33 @@ typedef struct pconnection_t {
uv_write_t write;
uv_shutdown_t shutdown;
size_t writing; /* size of pending write request, 0 if none pending */
- bool server; /* accepting not connecting */
+
+ /* Locked for thread-safe access */
+ uv_mutex_t lock;
+ bool wake; /* pn_connection_wake() was called */
} pconnection_t;
-/* pn_listener_t with a psocket_t */
+/* a listener socket */
struct pn_listener_t {
psocket_t psocket;
/* Only used by owner thread */
- pconnection_t *accepting; /* set in worker, used in UV loop for accept */
- pn_condition_t *condition;
- pn_collector_t *collector;
pn_event_batch_t batch;
pn_record_t *attachments;
void *context;
size_t backlog;
- /* Only used in leader thread */
- size_t connections; /* number of connections waiting to be accepted */
- int err; /* uv error code, 0 = OK, UV_EOF = closed */
- const char *what; /* static description string */
+ /* Locked for thread-safe access. uv_listen can't be stopped or cancelled so we can't
+ * detach a listener from the UV loop to prevent concurrent access.
+ */
+ uv_mutex_t lock;
+ pn_condition_t *condition;
+ pn_collector_t *collector;
+ queue accept; /* pconnection_t for uv_accept() */
+ bool closed;
};
-typedef struct queue { psocket_t *front, *back; } queue;
-
struct pn_proactor_t {
/* Leader thread */
uv_cond_t cond;
@@ -199,19 +186,15 @@ struct pn_proactor_t {
bool batch_working; /* batch is being processed in a worker thread */
};
-/* Push ps to back of q. Must not be on a different queue */
-static bool push_lh(queue *q, psocket_t *ps) {
- if (ps->next == &UNLISTED) {
- ps->next = NULL;
- if (!q->front) {
- q->front = q->back = ps;
- } else {
- q->back->next = ps;
- q->back = ps;
- }
- return true;
+static void push_lh(queue *q, psocket_t *ps) {
+ assert(ps->next == &UNLISTED);
+ ps->next = NULL;
+ if (!q->front) {
+ q->front = q->back = ps;
+ } else {
+ q->back->next = ps;
+ q->back = ps;
}
- return false;
}
/* Pop returns front of q or NULL if empty */
@@ -229,51 +212,26 @@ static inline void notify(pn_proactor_t* p) {
uv_async_send(&p->async);
}
-static void to_leader_lh(psocket_t *ps) {
- if (push_lh(&ps->proactor->leader_q, ps)) {
- ps->state = ON_LEADER;
- }
-}
-
-/* Queue an action for the leader thread */
-static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- ps->action = action;
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
- notify(ps->proactor);
-}
-
-/* Push to the worker thread */
-static void to_worker(psocket_t *ps) {
+/* Notify that this socket needs attention from the leader at the next opportunity */
+static void psocket_notify(psocket_t *ps) {
uv_mutex_lock(&ps->proactor->lock);
- if (push_lh(&ps->proactor->worker_q, ps)) {
- ps->state = ON_WORKER;
- }
- notify(ps->proactor);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Set state to ON_UV */
-static void to_uv(psocket_t *ps) {
- uv_mutex_lock(&ps->proactor->lock);
- if (ps->next == &UNLISTED) {
- ps->state = ON_UV;
+ /* Only queue if not working and not already queued */
+ if (!ps->working && ps->next == &UNLISTED) {
+ push_lh(&ps->proactor->leader_q, ps);
+ notify(ps->proactor);
}
uv_mutex_unlock(&ps->proactor->lock);
}
-/* Called in any thread to set a wakeup action */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
+/* Notify the leader of a newly-created socket */
+static void psocket_start(psocket_t *ps) {
uv_mutex_lock(&ps->proactor->lock);
- ps->wakeup = action;
- /* If ON_WORKER we'll do the wakeup in pn_proactor_done() */
- if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
+ if (ps->next == &UNLISTED) { /* No-op if already queued */
+ ps->working = false;
push_lh(&ps->proactor->leader_q, ps);
- ps->state = ON_LEADER; /* Otherwise notify the leader */
+ notify(ps->proactor);
+ uv_mutex_unlock(&ps->proactor->lock);
}
- uv_async_send(&ps->proactor->async); /* Wake leader */
- uv_mutex_unlock(&ps->proactor->lock);
}
static inline pconnection_t *as_pconnection(psocket_t* ps) {
@@ -318,6 +276,14 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
}
+static inline psocket_t *batch_psocket(pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ if (pc) return &pc->psocket;
+ pn_listener_t *l = batch_listener(batch);
+ if (l) return &l->psocket;
+ return NULL;
+}
+
static void leader_count(pn_proactor_t *p, int change) {
uv_mutex_lock(&p->lock);
p->count += change;
@@ -340,6 +306,12 @@ static void on_close_pconnection_final(uv_handle_t *h) {
pconnection_free((pconnection_t*)h->data);
}
+static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) {
+ if (!uv_is_closing(h)) {
+ uv_close(h, cb);
+ }
+}
+
/* Close event for uv_tcp_t of a psocket_t */
static void on_close_psocket(uv_handle_t *h) {
psocket_t *ps = (psocket_t*)h->data;
@@ -348,7 +320,7 @@ static void on_close_psocket(uv_handle_t *h) {
pconnection_t *pc = as_pconnection(ps);
uv_timer_stop(&pc->timer);
/* Delay the free till the timer handle is also closed */
- uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
+ uv_safe_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
} else {
pn_listener_free(as_listener(ps));
}
@@ -362,48 +334,49 @@ static pconnection_t *get_pconnection(pn_connection_t* c) {
return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
}
-static void pconnection_to_worker(pconnection_t *pc);
-static void listener_to_worker(pn_listener_t *l);
-
-int pconnection_error(pconnection_t *pc, int err, const char* what) {
- if (err) {
- pn_connection_driver_t *driver = &pc->driver;
- pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+static void pconnection_error(pconnection_t *pc, int err, const char* what) {
+ assert(err);
+ pn_connection_driver_t *driver = &pc->driver;
+ pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+ if (!pn_condition_is_set(pn_transport_condition(driver->transport))) {
pn_connection_driver_errorf(driver, uv_err_name(err), "%s %s:%s: %s",
what, fixstr(pc->psocket.host), fixstr(pc->psocket.port),
uv_strerror(err));
- pn_connection_driver_close(driver);
- pconnection_to_worker(pc);
}
- return err;
+ pn_connection_driver_close(driver);
}
-static int listener_error(pn_listener_t *l, int err, const char* what) {
- if (err) {
- l->err = err;
- l->what = what;
- listener_to_worker(l);
+static void listener_error(pn_listener_t *l, int err, const char* what) {
+ assert(err);
+ uv_mutex_lock(&l->lock);
+ if (!pn_condition_is_set(l->condition)) {
+ pn_condition_format(l->condition, uv_err_name(err), "%s %s:%s: %s",
+ what, fixstr(l->psocket.host), fixstr(l->psocket.port),
+ uv_strerror(err));
}
- return err;
+ uv_mutex_unlock(&l->lock);
+ pn_listener_close(l);
}
-static int psocket_error(psocket_t *ps, int err, const char* what) {
- if (err) {
- if (ps->is_conn) {
- pconnection_error(as_pconnection(ps), err, "initialization");
- } else {
- listener_error(as_listener(ps), err, "initialization");
- }
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+ if (ps->is_conn) {
+ pconnection_error(as_pconnection(ps), err, "initialization");
+ } else {
+ listener_error(as_listener(ps), err, "initialization");
}
- return err;
}
+/* FIXME aconway 2017-02-25: split socket/queue */
+
/* psocket uv-initialization */
static int leader_init(psocket_t *ps) {
- ps->state = ON_LEADER;
+ ps->working = false;
+ ps->tcp.data = ps;
leader_count(ps->proactor, +1);
int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
- if (!err) {
+ if (err) {
+ psocket_error(ps, err, "initialization");
+ } else {
pconnection_t *pc = as_pconnection(ps);
if (pc) {
pc->connect.data = ps;
@@ -412,50 +385,63 @@ static int leader_init(psocket_t *ps) {
pc->timer.data = ps;
}
}
- } else {
- psocket_error(ps, err, "initialization");
}
return err;
}
+/* Check if a pconnection has work for a worker thread. Called by owning thread. */
+static bool pconnection_needs_work(pconnection_t *pc) {
+ if (!pc->writing) { /* Can't detach for work while write is pending */
+ /* Check for wake requests */
+ uv_mutex_lock(&pc->lock);
+ bool wake = pc->wake;
+ pc->wake = false;
+ uv_mutex_unlock(&pc->lock);
+ if (wake) {
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+ }
+ return pn_connection_driver_has_event(&pc->driver);
+ }
+ return false;
+}
+
+/* Detach a connection from the UV loop so it can be used safely by a worker */
+void pconnection_detach(pconnection_t *pc) {
+ uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+ uv_timer_stop(&pc->timer);
+ psocket_notify(&pc->psocket);
+}
+
/* Outgoing connection */
static void on_connect(uv_connect_t *connect, int err) {
pconnection_t *pc = (pconnection_t*)connect->data;
- if (!pconnection_error(pc, err, "on connect to")) {
- pconnection_to_worker(pc);
- }
+ assert(!pc->psocket.working);
+ if (err) pconnection_error(pc, err, "on connect to");
+ pconnection_detach(pc); /* FIXME aconway 2017-02-25: detach AFTER error or vv */
}
/* Incoming connection ready to be accepted */
static void on_connection(uv_stream_t* server, int err) {
- /* Unlike most on_* functions, this one can be called by the leader thread when the
- * listener is ON_WORKER, because there's no way to stop libuv from calling
- * on_connection(). Just increase a counter and generate events in to_worker.
+ /* Unlike most on_* functions, this can be called by the leader thread when the listener
+ * is ON_WORKER or ON_LEADER, because there's no way to stop libuv from calling
+ * on_connection(). Update the state of the listener and queue it for leader attention.
*/
pn_listener_t *l = (pn_listener_t*) server->data;
- l->err = err;
- if (!err) ++l->connections;
- listener_to_worker(l); /* If already ON_WORKER it will stay there */
-}
-
-static void leader_accept(pn_listener_t * l) {
- assert(l->accepting);
- pconnection_t *pc = l->accepting;
- l->accepting = NULL;
- int err = leader_init(&pc->psocket);
- if (!err) {
- err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
- }
- if (!err) {
- pconnection_to_worker(pc);
+ if (err) {
+ listener_error(l, err, "on incoming connection");
} else {
- pconnection_error(pc, err, "accepting from");
- listener_error(l, err, "accepting from");
+ uv_mutex_lock(&l->lock);
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ uv_mutex_unlock(&l->lock);
+ psocket_notify(&l->psocket);
}
}
+// #error FIXME REVIW UPWARDS FROM HERE ^^^^
+
+/* Common address resolution for leader_listen and leader_connect */
static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
- assert(ps->state == ON_LEADER);
int err = leader_init(ps);
struct addrinfo hints = { 0 };
if (server) hints.ai_flags = AI_PASSIVE;
@@ -465,27 +451,23 @@ static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
return err;
}
-static void leader_connect(psocket_t *ps) {
- assert(ps->state == ON_LEADER);
- pconnection_t *pc = as_pconnection(ps);
+static void leader_connect(pconnection_t *pc) {
uv_getaddrinfo_t info;
- int err = leader_resolve(ps, &info, false);
+ int err = leader_resolve(&pc->psocket, &info, false);
if (!err) {
err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
uv_freeaddrinfo(info.addrinfo);
}
- if (!err) {
- ps->state = ON_UV;
- } else {
+ if (err) {
pconnection_error(pc, err, "connecting to");
+ } else {
+ pn_connection_open(pc->driver.connection);
}
}
-static void leader_listen(psocket_t *ps) {
- assert(ps->state == ON_LEADER);
- pn_listener_t *l = as_listener(ps);
+static void leader_listen(pn_listener_t *l) {
uv_getaddrinfo_t info;
- int err = leader_resolve(ps, &info, true);
+ int err = leader_resolve(&l->psocket, &info, true);
if (!err) {
err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
uv_freeaddrinfo(info.addrinfo);
@@ -493,17 +475,53 @@ static void leader_listen(psocket_t *ps) {
if (!err) {
err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection);
}
- if (!err) {
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
- listener_to_worker(l); /* Let worker see the OPEN event */
+ uv_mutex_lock(&l->lock);
+ /* Always put an OPEN event for symmetry, even if we immediately close with err */
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+ uv_mutex_unlock(&l->lock);
+ if (err) listener_error(l, err, "listening on");
+}
+
+static bool listener_needs_work(pn_listener_t *l) {
+ uv_mutex_lock(&l->lock);
+ bool needs_work = pn_collector_peek(l->collector);
+ uv_mutex_unlock(&l->lock);
+ return needs_work;
+}
+
+static bool listener_finished_lh(pn_listener_t *l) {
+ return l->closed && !pn_collector_peek(l->collector) && !l->accept.front;
+}
+
+static bool leader_process_listener(pn_listener_t * l) {
+ if (l->psocket.tcp.data == NULL) {
+ leader_listen(l);
} else {
- listener_error(l, err, "listening on");
+ uv_mutex_lock(&l->lock);
+ pconnection_t *pc;
+ while (!listener_finished_lh(l) && (pc = (pconnection_t*)pop_lh(&l->accept))) {
+ uv_mutex_unlock(&l->lock);
+ int err = leader_init(&pc->psocket);
+ if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+ if (err) {
+ listener_error(l, err, "accepting from");
+ psocket_notify(&l->psocket);
+ pconnection_error(pc, err, "accepting from");
+ }
+ psocket_start(&pc->psocket);
+ uv_mutex_lock(&l->lock);
+ }
+ if (listener_finished_lh(l)) {
+ uv_safe_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
+ }
+ uv_mutex_unlock(&l->lock);
}
+ return listener_needs_work(l);
}
/* Generate tick events and return millis till next tick or 0 if no tick is required */
static pn_millis_t leader_tick(pconnection_t *pc) {
- assert(pc->psocket.state != ON_WORKER);
+ assert(!pc->psocket.working);
uint64_t now = uv_now(pc->timer.loop);
uint64_t next = pn_transport_tick(pc->driver.transport, now);
return next ? next - now : 0;
@@ -511,38 +529,36 @@ static pn_millis_t leader_tick(pconnection_t *pc) {
static void on_tick(uv_timer_t *timer) {
pconnection_t *pc = (pconnection_t*)timer->data;
- pn_millis_t next = leader_tick(pc); /* May generate events */
- if (pn_connection_driver_has_event(&pc->driver)) {
- pconnection_to_worker(pc);
- } else if (next) {
- uv_timer_start(&pc->timer, on_tick, next, 0);
- }
+ assert(!pc->psocket.working);
+ leader_tick(pc);
+ pconnection_detach(pc);
+ /* FIXME aconway 2017-02-25: optimize - don't detach if no work. Need to check for finished? */
}
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
+ assert(!pc->psocket.working);
if (nread >= 0) {
pn_connection_driver_read_done(&pc->driver, nread);
- pconnection_to_worker(pc);
} else if (nread == UV_EOF) { /* hangup */
pn_connection_driver_read_close(&pc->driver);
- pconnection_to_worker(pc);
} else {
pconnection_error(pc, nread, "on read from");
}
+ pconnection_detach(pc);
}
static void on_write(uv_write_t* write, int err) {
pconnection_t *pc = (pconnection_t*)write->data;
- if (err == 0) {
- pn_connection_driver_write_done(&pc->driver, pc->writing);
- pconnection_to_worker(pc);
- } else if (err == UV_ECANCELED) {
- pconnection_to_worker(pc);
- } else {
+ assert(!pc->psocket.working);
+ size_t size = pc->writing;
+ pc->writing = 0;
+ if (err) {
pconnection_error(pc, err, "on write to");
+ } else {
+ pn_connection_driver_write_done(&pc->driver, size);
}
- pc->writing = 0;
+ pconnection_detach(pc);
}
static void on_timeout(uv_timer_t *timer) {
@@ -552,110 +568,13 @@ static void on_timeout(uv_timer_t *timer) {
uv_mutex_unlock(&p->lock);
}
-// Read buffer allocation function for uv, just returns the transports read buffer.
+/* Read buffer allocation function for uv, just returns the transports read buffer. */
static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
*buf = uv_buf_init(rbuf.start, rbuf.size);
}
-static void pconnection_to_uv(pconnection_t *pc) {
- to_uv(&pc->psocket); /* Assume we're going to UV unless sent elsewhere */
- if (pn_connection_driver_finished(&pc->driver)) {
- if (!uv_is_closing((uv_handle_t*)&pc->psocket.tcp)) {
- uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
- }
- return;
- }
- pn_millis_t next_tick = leader_tick(pc);
- pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
- pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
- if (pn_connection_driver_has_event(&pc->driver)) {
- to_worker(&pc->psocket); /* Ticks/buffer checks generated events */
- return;
- }
- if (next_tick &&
- pconnection_error(pc, uv_timer_start(&pc->timer, on_tick, next_tick, 0), "timer start")) {
- return;
- }
- if (wbuf.size > 0) {
- uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
- if (pconnection_error(
- pc, uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write), "write"))
- return;
- pc->writing = wbuf.size;
- } else if (pn_connection_driver_write_closed(&pc->driver)) {
- pc->shutdown.data = &pc->psocket;
- if (pconnection_error(
- pc, uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL), "shutdown write"))
- return;
- }
- if (rbuf.size > 0) {
- if (pconnection_error(
- pc, uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read), "read"))
- return;
- }
-}
-
-static void listener_to_uv(pn_listener_t *l) {
- to_uv(&l->psocket); /* Assume we're going to UV unless sent elsewhere */
- if (l->err) {
- if (!uv_is_closing((uv_handle_t*)&l->psocket.tcp)) {
- uv_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
- }
- } else {
- if (l->accepting) {
- leader_accept(l);
- }
- if (l->connections) {
- listener_to_worker(l);
- }
- }
-}
-
-/* Monitor a psocket_t in the UV loop */
-static void psocket_to_uv(psocket_t *ps) {
- if (ps->is_conn) {
- pconnection_to_uv(as_pconnection(ps));
- } else {
- listener_to_uv(as_listener(ps));
- }
-}
-
-/* Detach a connection from IO and put it on the worker queue */
-static void pconnection_to_worker(pconnection_t *pc) {
- /* Can't go to worker if a write is outstanding or the batch is empty */
- if (!pc->writing && pn_connection_driver_has_event(&pc->driver)) {
- uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
- uv_timer_stop(&pc->timer);
- }
- to_worker(&pc->psocket);
-}
-
-/* Can't really detach a listener, as on_connection can always be called.
- Generate events here safely.
-*/
-static void listener_to_worker(pn_listener_t *l) {
- if (pn_collector_peek(l->collector)) { /* Already have events */
- to_worker(&l->psocket);
- } else if (l->err) {
- if (l->err != UV_EOF) {
- pn_condition_format(l->condition, uv_err_name(l->err), "%s %s:%s: %s",
- l->what, fixstr(l->psocket.host), fixstr(l->psocket.port),
- uv_strerror(l->err));
- }
- l->err = 0;
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
- to_worker(&l->psocket);
- } else if (l->connections) { /* Generate accept events one at a time */
- --l->connections;
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
- to_worker(&l->psocket);
- } else {
- listener_to_uv(l);
- }
-}
-
/* Set the event in the proactor's batch */
static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
pn_collector_put(p->collector, pn_proactor__class(), p, t);
@@ -663,36 +582,27 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
return &p->batch;
}
-void leader_wake_connection(psocket_t *ps) {
- assert(ps->state == ON_LEADER);
- pconnection_t *pc = as_pconnection(ps);
- pn_connection_t *c = pc->driver.connection;
- pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
- pconnection_to_worker(pc);
-}
-
static void on_stopping(uv_handle_t* h, void* v) {
/* Close all the TCP handles. on_close_psocket will close any other handles if needed */
- if (h->type == UV_TCP && !uv_is_closing(h)) {
- uv_close(h, on_close_psocket);
+ if (h->type == UV_TCP) {
+ uv_safe_close(h, on_close_psocket);
}
}
static pn_event_t *log_event(void* p, pn_event_t *e) {
if (e) {
- pn_logf("[%p]:(%s)\n", (void*)p, pn_event_type_name(pn_event_type(e)));
+ pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
}
return e;
}
static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
pn_listener_t *l = batch_listener(batch);
- assert(l->psocket.state == ON_WORKER);
- pn_event_t *prev = pn_collector_prev(l->collector);
- if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
- l->err = UV_EOF;
- }
- return log_event(l, pn_collector_next(l->collector));
+ assert(l->psocket.working);
+ uv_mutex_lock(&l->lock);
+ pn_event_t *e = pn_collector_next(l->collector);
+ uv_mutex_unlock(&l->lock);
+ return log_event(l, e);
}
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
@@ -702,25 +612,18 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
}
static void pn_listener_free(pn_listener_t *l) {
- /* No assert(l->psocket.state == ON_WORKER); can be called during shutdown */
+ /* No assert(l->psocket.working); can be called during shutdown */
if (l) {
if (l->collector) pn_collector_free(l->collector);
if (l->condition) pn_condition_free(l->condition);
if (l->attachments) pn_free(l->attachments);
+ assert(!l->accept.front);
free(l);
}
}
-void leader_listener_close(psocket_t *ps) {
- assert(ps->state = ON_LEADER);
- pn_listener_t *l = (pn_listener_t*)ps;
- l->err = UV_EOF;
- listener_to_uv(l);
-}
-
-/* Return the next event batch or 0 if no events are available in the worker_q */
+/* Return the next event batch or NULL if no events are available */
static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
- /* FIXME aconway 2017-02-21: generate these in parallel? */
if (!p->batch_working) { /* Can generate proactor events */
if (p->inactive) {
p->inactive = false;
@@ -736,49 +639,99 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
}
}
for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
- assert(ps->state == ON_WORKER);
+ assert(ps->working);
if (ps->is_conn) {
return &as_pconnection(ps)->driver.batch;
} else { /* Listener */
return &as_listener(ps)->batch;
}
}
- return 0;
+ return NULL;
+}
+
+/* Process a pconnection, return true if it has work */
+static bool leader_process_pconnection(pconnection_t *pc) {
+ if (pc->psocket.tcp.data == NULL) {
+ leader_connect(pc);
+ }
+ pn_millis_t next_tick = leader_tick(pc);
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+ if (pconnection_needs_work(pc)) {
+ return true; /* Don't wait on IO while there is pending work */
+ }
+ if (pn_connection_driver_finished(&pc->driver)) {
+ uv_safe_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
+ return false;
+ }
+ /* Issue async IO requests */
+ int err = 0;
+ const char *what = NULL;
+
+ if (!err && next_tick) {
+ what = "connection timer start";
+ err = uv_timer_start(&pc->timer, on_tick, next_tick, 0);
+ }
+ if (!err && !pc->writing) {
+ what = "write";
+ if (wbuf.size > 0) {
+ uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+ err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+ if (!err) {
+ pc->writing = wbuf.size;
+ }
+ } else if (pn_connection_driver_write_closed(&pc->driver)) {
+ uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+ }
+ }
+ if (!err && rbuf.size > 0) {
+ what = "read";
+ err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+ }
+ if (err) {
+ pconnection_detach(pc);
+ pconnection_error(pc, err, what);
+ return true;
+ }
+ return false;
}
/* Process the leader_q and the UV loop, in the leader thread */
static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
+ pn_event_batch_t *batch = NULL;
+ for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
+ assert(!ps->working);
- if (p->timeout_request) {
- p->timeout_request = false;
- if (p->timeout) {
- uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
- } else {
- uv_timer_stop(&p->timer);
+ uv_mutex_unlock(&p->lock); /* Unlock to process each item, may add more items to leader_q */
+ bool needs_work = ps->is_conn ?
+ leader_process_pconnection(as_pconnection(ps)) :
+ leader_process_listener(as_listener(ps));
+ uv_mutex_lock(&p->lock);
+
+ if (needs_work && !ps->working && ps->next == &UNLISTED) {
+ ps->working = true;
+ push_lh(&p->worker_q, ps);
}
}
- for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
- if (ps->action) {
- uv_mutex_unlock(&p->lock);
- ps->action(ps);
- ps->action = NULL;
- uv_mutex_lock(&p->lock);
- } else if (ps->wakeup) {
- uv_mutex_unlock(&p->lock);
- ps->wakeup(ps);
- ps->wakeup = NULL;
- uv_mutex_lock(&p->lock);
+ batch = get_batch_lh(p); /* Check for work */
+ if (!batch) { /* No work, run the UV loop */
+ /* Set timeout timer before uv_run */
+ if (p->timeout_request) {
+ p->timeout_request = false;
+ uv_timer_stop(&p->timer);
+ if (p->timeout) {
+ uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+ }
}
- pn_event_batch_t *batch = get_batch_lh(p);
- if (batch) return batch;
+ uv_mutex_unlock(&p->lock); /* Unlock to run UV loop */
+ uv_run(&p->loop, mode);
+ uv_mutex_lock(&p->lock);
+ batch = get_batch_lh(p);
}
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, mode);
- uv_mutex_lock(&p->lock);
- return get_batch_lh(p);
+ return batch;
}
-/* ==== public API ==== */
+/**** public API ****/
pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
@@ -788,7 +741,7 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
p->has_leader = true;
batch = leader_lead_lh(p, UV_RUN_NOWAIT);
p->has_leader = false;
- uv_cond_signal(&p->cond); /* Notify next leader */
+ uv_cond_broadcast(&p->cond); /* Signal followers for possible work */
}
uv_mutex_unlock(&p->lock);
return batch;
@@ -802,44 +755,32 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
batch = get_batch_lh(p);
}
if (!batch) { /* Become leader */
- assert(!p->has_leader); /* Implied by loop condition */
p->has_leader = true;
do {
batch = leader_lead_lh(p, UV_RUN_ONCE);
} while (!batch);
p->has_leader = false;
- uv_cond_signal(&p->cond);
+ uv_cond_broadcast(&p->cond); /* Signal a followers. One takes over, many can work. */
}
uv_mutex_unlock(&p->lock);
return batch;
}
void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
- pconnection_t *pc = batch_pconnection(batch);
- if (pc) {
- assert(pc->psocket.state == ON_WORKER);
- if (pn_connection_driver_has_event(&pc->driver)) {
- /* Process all events before going back to leader */
- pconnection_to_worker(pc);
- } else {
- to_leader(&pc->psocket, psocket_to_uv);
- }
- return;
- }
- pn_listener_t *l = batch_listener(batch);
- if (l) {
- assert(l->psocket.state == ON_WORKER);
- to_leader(&l->psocket, psocket_to_uv);
- return;
+ uv_mutex_lock(&p->lock);
+ psocket_t *ps = batch_psocket(batch); /* FIXME aconway 2017-02-26: replace with switch? */
+ if (ps) {
+ assert(ps->working);
+ assert(ps->next == &UNLISTED);
+ ps->working = false;
+ push_lh(&p->leader_q, ps);
}
- pn_proactor_t *bp = batch_proactor(batch);
+ pn_proactor_t *bp = batch_proactor(batch); /* Proactor events */
if (bp == p) {
- uv_mutex_lock(&p->lock);
p->batch_working = false;
- notify(p);
- uv_mutex_unlock(&p->lock);
- return;
}
+ uv_mutex_unlock(&p->lock);
+ notify(p);
}
pn_listener_t *pn_event_listener(pn_event_t *e) {
@@ -864,16 +805,16 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
void pn_proactor_interrupt(pn_proactor_t *p) {
uv_mutex_lock(&p->lock);
++p->interrupt;
- notify(p);
uv_mutex_unlock(&p->lock);
+ notify(p);
}
void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
uv_mutex_lock(&p->lock);
p->timeout = t;
p->timeout_request = true;
- notify(p);
uv_mutex_unlock(&p->lock);
+ notify(p);
}
int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
@@ -881,28 +822,19 @@ int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host,
if (!pc) {
return PN_OUT_OF_MEMORY;
}
- to_leader(&pc->psocket, leader_connect);
+ psocket_start(&pc->psocket);
return 0;
}
int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
{
+ assert(!l->closed);
psocket_init(&l->psocket, p, false, host, port);
l->backlog = backlog;
- to_leader(&l->psocket, leader_listen);
+ psocket_start(&l->psocket);
return 0;
}
-pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
- pconnection_t *pc = get_pconnection(c);
- return pc ? pc->psocket.proactor : NULL;
-}
-
-void pn_connection_wake(pn_connection_t* c) {
- /* May be called from any thread */
- wakeup(&get_pconnection(c)->psocket, leader_wake_connection);
-}
-
pn_proactor_t *pn_proactor() {
pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
p->collector = pn_collector();
@@ -919,8 +851,8 @@ pn_proactor_t *pn_proactor() {
void pn_proactor_free(pn_proactor_t *p) {
uv_timer_stop(&p->timer);
- uv_close((uv_handle_t*)&p->timer, NULL);
- uv_close((uv_handle_t*)&p->async, NULL);
+ uv_safe_close((uv_handle_t*)&p->timer, NULL);
+ uv_safe_close((uv_handle_t*)&p->async, NULL);
uv_walk(&p->loop, on_stopping, NULL); /* Close all TCP handles */
while (uv_loop_alive(&p->loop)) {
uv_run(&p->loop, UV_RUN_ONCE); /* Run till all handles closed */
@@ -929,9 +861,28 @@ void pn_proactor_free(pn_proactor_t *p) {
uv_mutex_destroy(&p->lock);
uv_cond_destroy(&p->cond);
pn_collector_free(p->collector);
+ /* FIXME aconway 2017-02-25: restore */
+ /* assert(!p->worker_q.front); */
+ /* assert(!p->leader_q.front); */
free(p);
}
+pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
+ pconnection_t *pc = get_pconnection(c);
+ return pc ? pc->psocket.proactor : NULL;
+}
+
+void pn_connection_wake(pn_connection_t* c) {
+ /* May be called from any thread */
+ pconnection_t *pc = get_pconnection(c);
+ if (pc) {
+ uv_mutex_lock(&pc->lock);
+ pc->wake = true;
+ uv_mutex_unlock(&pc->lock);
+ psocket_notify(&pc->psocket);
+ }
+}
+
pn_listener_t *pn_listener(void) {
pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
if (l) {
@@ -948,8 +899,14 @@ pn_listener_t *pn_listener(void) {
}
void pn_listener_close(pn_listener_t* l) {
- /* This can be called from any thread, not just the owner of l */
- wakeup(&l->psocket, leader_listener_close);
+ /* May be called from any thread */
+ uv_mutex_lock(&l->lock);
+ if (!l->closed) {
+ l->closed = true;
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+ }
+ uv_mutex_unlock(&l->lock);
+ psocket_notify(&l->psocket);
}
pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
@@ -973,13 +930,15 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
}
int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
- assert(l->psocket.state == ON_WORKER);
- if (l->accepting) {
- return PN_STATE_ERR; /* Only one at a time */
- }
- l->accepting = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
- if (!l->accepting) {
- return UV_ENOMEM;
+ assert(l->psocket.working);
+ assert(!l->closed);
+ uv_mutex_lock(&l->lock);
+ pconnection_t *pc = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+ if (!pc) {
+ return PN_OUT_OF_MEMORY;
}
+ push_lh(&l->accept, &pc->psocket);
+ uv_mutex_unlock(&l->lock);
+ psocket_notify(&l->psocket);
return 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2dae68d6/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 6595a0b..a0ddcda 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -28,245 +28,259 @@
#include <stdlib.h>
#include <string.h>
-static pn_millis_t timeout = 5*1000; /* timeout for hanging tests */
+static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
static const char *localhost = "127.0.0.1"; /* host for connect/listen */
-struct test_events {
- pn_proactor_t *proactor;
- pn_event_batch_t *events;
-};
-
-/* Wait for the next single event, return its type */
-static pn_event_type_t wait_next(pn_proactor_t *proactor) {
- pn_event_batch_t *events = pn_proactor_wait(proactor);
- pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
- pn_proactor_done(proactor, events);
- return etype;
-}
-
-/* Get events until an event of `type` or a PN_TRANSPORT_CLOSED/PN_PROACTOR_TIMEOUT */
-static pn_event_type_t wait_for(pn_proactor_t *proactor, pn_event_type_t etype) {
- while (true) {
- pn_event_type_t t = wait_next(proactor);
- if (t == etype || t == PN_PROACTOR_TIMEOUT) {
- return t;
- }
- }
-}
-
-/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
-static void test_interrupt_timeout(test_t *t) {
- pn_proactor_t *p = pn_proactor();
- pn_proactor_interrupt(p);
- pn_event_type_t etype = wait_next(p);
- TEST_CHECK(t, PN_PROACTOR_INTERRUPT == etype, pn_event_type_name(etype));
- pn_proactor_set_timeout(p, 1); /* very short timeout */
- etype = wait_next(p);
- TEST_CHECK(t, PN_PROACTOR_TIMEOUT == etype, pn_event_type_name(etype));
- pn_proactor_free(p);
-}
-
-/* Test handler return value */
-typedef enum {
- H_CONTINUE, /**@<< handler wants more events */
- H_FINISHED, /**@<< handler completed without error */
- H_FAILED /**@<< handler hit an error and cannot continue */
-} handler_state_t;
-
-typedef handler_state_t (*test_handler_fn)(test_t *, pn_event_t*);
+typedef int (*test_handler_fn)(test_t *, pn_event_t*);
/* Proactor and handler that take part in a test */
typedef struct proactor_test_t {
- test_t *t;
test_handler_fn handler;
+ test_t *t;
pn_proactor_t *proactor;
- handler_state_t state; /* Result of last handler call */
} proactor_test_t;
/* Initialize an array of proactor_test_t */
-static void proactor_test_init(proactor_test_t *pts, size_t n) {
+static void proactor_test_init(proactor_test_t *pts, size_t n, test_t *t) {
for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+ if (!pt->t) pt->t = t;
if (!pt->proactor) pt->proactor = pn_proactor();
pn_proactor_set_timeout(pt->proactor, timeout);
- pt->state = H_CONTINUE;
}
}
-/* Iterate over an array of proactors, draining or handling events with the non-blocking
- pn_proactor_get. Continue till all handlers return H_FINISHED (and return 0) or one
- returns H_FAILED (and return non-0)
-*/
+#define PROACTOR_TEST_INIT(A, T) proactor_test_init(A, sizeof(A)/sizeof(*A), (T))
+
+static void proactor_test_free(proactor_test_t *pts, size_t n) {
+ for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+ pn_proactor_free(pt->proactor);
+ }
+}
+
+#define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
+
+/* Run an array of proactors till a handler returns non-0 */
static int proactor_test_run(proactor_test_t *pts, size_t n) {
- /* Make sure pts are initialized */
- proactor_test_init(pts, n);
- size_t finished = 0;
- do {
- finished = 0;
+ int ret = 0;
+ while (!ret) {
for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
pn_event_batch_t *events = pn_proactor_get(pt->proactor);
if (events) {
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- if (pt->state == H_CONTINUE) {
- pt->state = pt->handler(pt->t, e);
- }
+ pn_event_t *e = pn_event_batch_next(events);
+ TEST_CHECKF(pts->t, e, "empty batch");
+ while (e && !ret) {
+ if (!(ret = pt->handler(pt->t, e)))
+ e = pn_event_batch_next(events);
}
pn_proactor_done(pt->proactor, events);
}
- switch (pt->state) {
- case H_CONTINUE: break;
- case H_FINISHED: ++finished; break;
- case H_FAILED: return 1;
- }
}
- } while (finished < n);
- return 0;
+ }
+ return ret;
}
+#define PROACTOR_TEST_RUN(A) proactor_test_run((A), sizeof(A)/sizeof(*A))
+
+/* Wait for the next single event, return its type */
+static pn_event_type_t wait_next(pn_proactor_t *proactor) {
+ pn_event_batch_t *events = pn_proactor_wait(proactor);
+ pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
+ pn_proactor_done(proactor, events);
+ return etype;
+}
+
+/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
+static void test_interrupt_timeout(test_t *t) {
+ pn_proactor_t *p = pn_proactor();
+ TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+ pn_proactor_interrupt(p);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INTERRUPT, wait_next(p));
+ TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+ pn_proactor_set_timeout(p, 1); /* very short timeout */
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+ pn_proactor_free(p);
+}
-/* Handler for test_listen_connect, does both sides of the connection */
-static handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
+/* Common handler for simple client/server interactions, */
+static int common_handler(test_t *t, pn_event_t *e) {
pn_connection_t *c = pn_event_connection(e);
pn_listener_t *l = pn_event_listener(e);
switch (pn_event_type(e)) {
- /* Act on these events */
- case PN_LISTENER_ACCEPT: {
- pn_connection_t *accepted = pn_connection();
- pn_connection_open(accepted);
- pn_listener_accept(l, accepted); /* Listener takes ownership of accepted */
- return H_CONTINUE;
- }
+
+ /* Stop on these events */
+ case PN_LISTENER_OPEN:
+ case PN_PROACTOR_TIMEOUT:
+ case PN_TRANSPORT_CLOSED:
+ case PN_PROACTOR_INACTIVE:
+ case PN_LISTENER_CLOSE:
+ return pn_event_type(e);
+
+ case PN_LISTENER_ACCEPT:
+ pn_listener_accept(l, pn_connection());
+ return 0;
case PN_CONNECTION_REMOTE_OPEN:
- if (pn_connection_state(c) | PN_LOCAL_ACTIVE) { /* Client is fully open - the test is done */
- pn_connection_close(c);
- } else { /* Server returns the open */
- pn_connection_open(c);
- }
- return H_CONTINUE;
+ pn_connection_open(c); /* Return the open (no-op if already open) */
+ return 0;
case PN_CONNECTION_REMOTE_CLOSE:
- if (pn_connection_state(c) | PN_LOCAL_ACTIVE) {
- pn_connection_close(c); /* Return the close */
- }
- return H_CONTINUE;
-
- case PN_TRANSPORT_CLOSED:
- return H_FINISHED;
+ pn_connection_close(c); /* Return the close */
+ return 0;
+
+ /* Ignored these events */
+ case PN_CONNECTION_INIT:
+ case PN_CONNECTION_BOUND:
+ case PN_CONNECTION_LOCAL_OPEN:
+ case PN_CONNECTION_LOCAL_CLOSE:
+ case PN_TRANSPORT:
+ case PN_TRANSPORT_ERROR:
+ case PN_TRANSPORT_HEAD_CLOSED:
+ case PN_TRANSPORT_TAIL_CLOSED:
+ return 0;
default:
- return H_CONTINUE;
- break;
+ TEST_ERRORF(t, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+ return 0; /* Fail the test but keep going */
}
}
-/* Test bad-address error handling for listen and connect */
-static void test_early_error(test_t *t) {
- pn_proactor_t *p = pn_proactor();
- pn_proactor_set_timeout(p, timeout); /* In case of hang */
- pn_connection_t *c = pn_connection();
- pn_proactor_connect(p, c, localhost, "1"); /* Bad port */
- pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
- TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
- TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
-
- pn_listener_t *l = pn_listener();
- pn_proactor_listen(p, l, localhost, "1", 1); /* Bad port */
- etype = wait_for(p, PN_LISTENER_CLOSE);
- TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
- TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
-
- pn_proactor_free(p);
+/* close a connection when it is remote open */
+static int open_close_handler(test_t *t, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_REMOTE_OPEN:
+ pn_connection_close(pn_event_connection(e));
+ return 0; /* common_handler will finish on TRANSPORT_CLOSED */
+ default:
+ return common_handler(t, e);
+ }
}
-/* Simplest client/server interaction with 2 proactors */
-static void test_listen_connect(test_t *t) {
- proactor_test_t pts[] = { { t, listen_connect_handler }, { t, listen_connect_handler } };
- proactor_test_init(pts, 2);
+/* Simple client/server connection with 2 proactors */
+static void test_client_server(test_t *t) {
+ proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
+ PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
- test_port_t port = test_port(); /* Hold a port */
-
+ test_port_t port = test_port();
pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
- pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
- if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
- sock_close(port.sock);
- pn_proactor_connect(client, pn_connection(), localhost, port.str);
- proactor_test_run(pts, 2);
- }
- pn_proactor_free(client);
- pn_proactor_free(server);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_proactor_connect(client, pn_connection(), localhost, port.str);
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ sock_close(port.sock);
+ PROACTOR_TEST_FREE(pts);
}
-static handler_state_t connection_wakeup_handler(test_t *t, pn_event_t *e) {
- pn_connection_t *c = pn_event_connection(e);
+/* Return on connection open, close and return on wake */
+static int open_wake_handler(test_t *t, pn_event_t *e) {
switch (pn_event_type(e)) {
-
case PN_CONNECTION_REMOTE_OPEN:
- if (pn_connection_state(c) | PN_LOCAL_UNINIT) {
- pn_connection_open(c); /* Server returns the open */
- }
- return H_FINISHED; /* Finish when open at both ends */
-
+ return pn_event_type(e);
+ case PN_CONNECTION_WAKE:
+ pn_connection_close(pn_event_connection(e));
+ return pn_event_type(e);
default:
- /* Otherwise same as listen_connect_handler */
- return listen_connect_handler(t, e);
+ return common_handler(t, e);
}
}
/* Test waking up a connection that is idle */
-static void test_connection_wakeup(test_t *t) {
- proactor_test_t pts[] = { { t, connection_wakeup_handler }, { t, connection_wakeup_handler } };
- proactor_test_init(pts, 2);
+static void test_connection_wake(test_t *t) {
+ proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+ PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
test_port_t port = test_port(); /* Hold a port */
pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
- pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
- if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
- sock_close(port.sock);
- pn_connection_t *c = pn_connection();
- pn_proactor_connect(client, c, localhost, port.str);
- proactor_test_run(pts, 2); /* Will finish when client is connected */
- TEST_CHECK(t, NULL == pn_proactor_get(client), ""); /* Should be idle */
- pn_connection_wake(c);
- etype = wait_next(client);
- /* FIXME aconway 2017-02-21: TEST_EVENT_TYPE */
- TEST_CHECK(t, PN_CONNECTION_WAKE == etype, pn_event_type_name(etype));
- }
- pn_proactor_free(client);
- pn_proactor_free(server);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(client, c, localhost, port.str);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+ TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
+ pn_connection_wake(c);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ sock_close(port.sock);
+ PROACTOR_TEST_FREE(pts);
}
/* Test that INACTIVE event is generated when last connections/listeners closes. */
static void test_inactive(test_t *t) {
- proactor_test_t pts[] = { { t, listen_connect_handler }, { t, listen_connect_handler }};
- proactor_test_init(pts, 2);
+ proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+ PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
test_port_t port = test_port(); /* Hold a port */
pn_listener_t *l = pn_listener();
pn_proactor_listen(server, l, localhost, port.str, 4);
- pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
- if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
- sock_close(port.sock);
- pn_proactor_connect(client, pn_connection(), localhost, port.str);
- proactor_test_run(pts, 2);
- etype = wait_for(client, PN_PROACTOR_INACTIVE);
- pn_listener_close(l);
- etype = wait_for(server, PN_PROACTOR_INACTIVE);
- }
- pn_proactor_free(client);
- pn_proactor_free(server);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(client, c, localhost, port.str);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_connection_wake(c);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+ /* expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+ /* server won't be INACTIVE until listener is closed */
+ TEST_CHECK(t, pn_proactor_get(server) == NULL);
+ pn_listener_close(l);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ sock_close(port.sock);
+ PROACTOR_TEST_FREE(pts);
+}
+
+#define TEST_CHECK_ERROR(T, WANT, COND) do { \
+ TEST_CHECKF((T), pn_condition_is_set(COND), "expecting error"); \
+ const char* description = pn_condition_get_description(COND); \
+ if (!strstr(description, (WANT))) { \
+ TEST_ERRORF((T), "bad error, expected '%s' in '%s'", (WANT), description); \
+ } \
+ } while(0)
+
+/* Tests for error handling */
+static void test_errors(test_t *t) {
+ proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+ PROACTOR_TEST_INIT(pts, t);
+ pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+ test_port_t port = test_port(); /* Hold a port */
+
+ /* Invalid connect/listen parameters */
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(client, c, localhost, "xxx");
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_CHECK_ERROR(t, "xxx", pn_transport_condition(pn_connection_transport(c)));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ pn_listener_t *l = pn_listener();
+ pn_proactor_listen(server, l, localhost, "xxx", 1);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+ TEST_CHECK_ERROR(t, "xxx", pn_listener_condition(l));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ /* Connect with no listener */
+ c = pn_connection();
+ pn_proactor_connect(client, c, localhost, port.str);
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))));
+ TEST_CHECK_ERROR(t, "connection refused", pn_transport_condition(pn_connection_transport(c)));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ sock_close(port.sock);
+ PROACTOR_TEST_FREE(pts);
}
+
int main(int argc, char **argv) {
int failed = 0;
RUN_ARGV_TEST(failed, t, test_inactive(&t));
RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
- RUN_ARGV_TEST(failed, t, test_early_error(&t));
- RUN_ARGV_TEST(failed, t, test_listen_connect(&t));
- RUN_ARGV_TEST(failed, t, test_connection_wakeup(&t));
+ RUN_ARGV_TEST(failed, t, test_errors(&t));
+ RUN_ARGV_TEST(failed, t, test_client_server(&t));
+ RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
return failed;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2dae68d6/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 7006334..97dac3f 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -21,6 +21,7 @@
*/
#include <proton/type_compat.h>
+#include <proton/event.h>
#include <errno.h>
#include <stdarg.h>
@@ -28,24 +29,46 @@
#include <stdlib.h>
#include <string.h>
-/*
- All output from test marcros goes to stdout not stderr, error messages are normal for a test.
- Some errno handling functions are thread-unsafe
- */
-
+/* A struct to collect the results of a test, created by RUN_TEST macro. */
+typedef struct test_t {
+ const char* name;
+ int errors;
+ uintptr_t data; /* Test can store some non-error data here */
+} test_t;
-/* Call via TEST_ASSERT macros */
-static void assert_fail_(const char* cond, const char* file, int line, const char *fmt, ...) {
- printf("%s:%d: Assertion failed: %s", file, line, cond);
+/* Internal, use macros. Print error message and increase the t->errors count.
+ All output from test marcros goes to stdout not stderr, error messages are normal for a test.
+*/
+static void test_vlogf_(test_t *t, const char *prefix, const char* expr,
+ const char* file, int line, const char *fmt, va_list ap)
+{
+ printf("%s:%d", file, line);
+ if (prefix && *prefix) printf(": %s", prefix);
+ if (expr && *expr) printf(": %s", expr);
if (fmt && *fmt) {
- va_list ap;
- va_start(ap, fmt);
- printf(" - ");
+ printf(": ");
vprintf(fmt, ap);
- printf("\n");
- fflush(stdout);
- va_end(ap);
}
+ if (t) printf(" [%s]", t->name);
+ printf("\n");
+ fflush(stdout);
+}
+
+static void test_errorf_(test_t *t, const char *prefix, const char* expr,
+ const char* file, int line, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ ++t->errors;
+ test_vlogf_(t, prefix, expr, file, line, fmt, ap);
+ va_end(ap);
+}
+
+/* Call via TEST_ASSERT macros */
+static void assert_fail_(const char* expr, const char* file, int line, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ test_vlogf_(NULL, "assertion failed", expr, file, line, fmt, ap);
+ va_end(ap);
abort();
}
@@ -63,32 +86,42 @@ static void assert_fail_(const char* cond, const char* file, int line, const cha
TEST_ASSERTF((expr), "%s", strerror(err))
-/* A struct to collect the results of a test.
- * Declare and initialize with TEST_START(t) where t will be declared as a test_t
- */
-typedef struct test_t {
- const char* name;
- int errors;
-} test_t;
-
-/* if !expr print the printf-style error and increment t->errors. Use via macros. Returns expr. */
+/* Internal, use macros */
static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const char *file, int line, const char* fmt, ...) {
if (!expr) {
+ ++t->errors;
va_list ap;
va_start(ap, fmt);
- printf("%s:%d:[%s] check failed: (%s)", file, line, t->name, sexpr);
- if (fmt && *fmt) {
- printf(" - ");
- vprintf(fmt, ap);
- }
- printf("\n");
- fflush(stderr);
- ++t->errors;
+ test_vlogf_(t, "check failed", sexpr, file, line, fmt, ap);
+ va_end(ap);
}
return expr;
}
-#define TEST_CHECK(TEST, EXPR, ...) test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__)
+/* Print a message but don't mark the test as having an error */
+#define TEST_LOGF(TEST, ...) \
+ test_logf_((TEST), "info", NULL, __FILE__, __LINE__, __VA_ARGS__)
+
+/* Print an error with printf-style message, increment TEST->errors */
+#define TEST_ERRORF(TEST, ...) \
+ test_errorf_((TEST), "error", NULL, __FILE__, __LINE__, __VA_ARGS__)
+
+/* If EXPR is false, print and record an error for t */
+#define TEST_CHECKF(TEST, EXPR, ...) \
+ test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__)
+
+/* If EXPR is false, print and record an error for t including EXPR */
+#define TEST_CHECK(TEST, EXPR) \
+ test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, "")
+
+static inline bool test_etype_equal_(test_t *t, int want, int got, const char *file, int line) {
+ return test_check_(t, want == got, NULL, file, line, "want %s got %s",
+ pn_event_type_name((pn_event_type_t)want),
+ pn_event_type_name((pn_event_type_t)got));
+}
+
+#define TEST_ETYPE_EQUAL(TEST, WANT, GOT) \
+ test_etype_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)
/* T is name of a test_t variable, EXPR is the test expression (which should update T)
FAILED is incremented if the test has errors
@@ -166,12 +199,14 @@ static int sock_port(sock_t sock) {
return ntohs(port);
}
+/* Combines includes a sock_t with the int and char* versions of the port for convenience */
typedef struct test_port_t {
sock_t sock;
int port;
char str[256];
} test_port_t;
+/* Create a test_port_t */
static inline test_port_t test_port(void) {
test_port_t tp = {0};
tp.sock = sock_bind0();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[5/6] qpid-proton git commit: PROTON-1403: c example test framework
fixes
Posted by ac...@apache.org.
PROTON-1403: c example test framework fixes
- start broker for each test case
- grep for "listening" in output instead of trying to connect to port
(faster and avoids confusing "no proton header" errors)
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/105b939f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/105b939f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/105b939f
Branch: refs/heads/master
Commit: 105b939f2d361577d23b374a6f85fbf00e20f8da
Parents: a6b4164
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 12:53:40 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 12:53:40 2017 -0500
----------------------------------------------------------------------
examples/c/proactor/test.py | 48 ++++++++++++++++++++++++++++------------
examples/exampletest.py | 40 +++++----------------------------
2 files changed, 39 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/105b939f/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index f6a6562..692e4be 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -29,29 +29,49 @@ def python_cmd(name):
def receive_expect(n):
return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
-class CExampleTest(BrokerTestCase):
- broker_exe = ["broker"]
+class Broker(object):
+ def __init__(self, test):
+ self.test = test
+
+ def __enter__(self):
+ with bind0() as sock:
+ self.addr = "127.0.0.1:%s/examples" % (sock.port())
+ self.proc = self.test.proc(["broker", "-a", self.addr])
+ self.proc.wait_re("listening")
+ return self
+
+ def __exit__(self, *args):
+ b = getattr(self, "proc")
+ if b:
+ if b.poll() != None: # Broker crashed
+ raise ProcError(b, "broker crash")
+ b.kill()
+
+class CExampleTest(ExampleTestCase):
def test_send_receive(self):
"""Send first then receive"""
- s = self.proc(["send", "-a", self.addr])
- self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
- r = self.proc(["receive", "-a", self.addr])
- self.assertEqual(receive_expect(100), r.wait_out())
+ with Broker(self) as b:
+ s = self.proc(["send", "-a", b.addr])
+ self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
+ r = self.proc(["receive", "-a", b.addr])
+ self.assertEqual(receive_expect(100), r.wait_out())
def test_receive_send(self):
"""Start receiving first, then send."""
- r = self.proc(["receive", "-a", self.addr]);
- s = self.proc(["send", "-a", self.addr]);
- self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
- self.assertEqual(receive_expect(100), r.wait_out())
+ with Broker(self) as b:
+ r = self.proc(["receive", "-a", b.addr]);
+ s = self.proc(["send", "-a", b.addr]);
+ self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
+ self.assertEqual(receive_expect(100), r.wait_out())
def test_timed_send(self):
"""Send with timed delay"""
- s = self.proc(["send", "-a", self.addr, "-d100", "-m3"])
- self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
- r = self.proc(["receive", "-a", self.addr, "-m3"])
- self.assertEqual(receive_expect(3), r.wait_out())
+ with Broker(self) as b:
+ s = self.proc(["send", "-a", b.addr, "-d100", "-m3"])
+ self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
+ r = self.proc(["receive", "-a", b.addr, "-m3"])
+ self.assertEqual(receive_expect(3), r.wait_out())
def test_send_direct(self):
"""Send to direct server"""
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/105b939f/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
index 546b426..5c820fd 100644
--- a/examples/exampletest.py
+++ b/examples/exampletest.py
@@ -29,10 +29,12 @@ from copy import copy
import platform
from os.path import dirname as dirname
+DEFAULT_TIMEOUT=10
+
def bind0():
"""Bind a socket with bind(0) and SO_REUSEADDR to get a free port to listen on"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 0))
return sock
@@ -92,7 +94,7 @@ class Proc(Popen):
pass # Already exited.
return self.out
- def wait_out(self, timeout=10, expect=0):
+ def wait_out(self, timeout=DEFAULT_TIMEOUT, expect=0):
"""Wait for process to exit, return output. Raise ProcError on failure."""
t = threading.Thread(target=self.wait)
t.start()
@@ -104,7 +106,7 @@ class Proc(Popen):
raise ProcError(self)
return self.out
- def wait_re(self, regexp, timeout=10):
+ def wait_re(self, regexp, timeout=DEFAULT_TIMEOUT):
"""
Wait for regexp to appear in the output, returns the re.search match result.
The target process should flush() important output to ensure it appears.
@@ -158,37 +160,5 @@ class ExampleTestCase(TestCase):
self.procs.append(p)
return p
-class BrokerTestCase(ExampleTestCase):
- """
- ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
- Subclass must set `broker_exe` class variable with the name of the broker executable.
- """
-
- @classmethod
- def setUpClass(cls):
- sock = bind0()
- cls.port = sock.port()
- cls.addr = "127.0.0.1:%s/examples" % (cls.port)
- cls.broker = None # In case Proc throws, create the attribute.
- cls.broker = Proc(cls.broker_exe + ["-a", cls.addr], bufsize=0)
- try:
- cls.broker.wait_re("listening")
- except Exception, e:
- cls.broker.kill()
- raise
- finally:
- sock.close()
-
- @classmethod
- def tearDownClass(cls):
- if cls.broker: cls.broker.kill()
-
- def tearDown(self):
- b = type(self).broker
- if b and b.poll() != None: # Broker crashed
- type(self).setUpClass() # Start another for the next test.
- raise ProcError(b, "broker crash")
- super(BrokerTestCase, self).tearDown()
-
if __name__ == "__main__":
unittest.main()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org