You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/27 18:04:51 UTC
[3/6] qpid-proton git commit: PROTON-1403: c libuv proactor fixes and
tests
PROTON-1403: c libuv proactor fixes and tests
- c-proactor-tests run to transport closed
- c-proactor-tests allows running selected tests
- c-proactor-tests added more tests
- fix casting error, race conditions
- re-orginaize get/wait loops
- log proactor and listener events via PN_TRACE_LOG
exampletest.py grep for "listening" in output instead of connect retry:
faster than dummy connects and avoids confusing "no protocol header" errors.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/85585202
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/85585202
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/85585202
Branch: refs/heads/master
Commit: 855852029e1b4d8b5d4c939f5b9c616808701813
Parents: 54e8099
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Feb 21 10:55:32 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 11:32:13 2017 -0500
----------------------------------------------------------------------
examples/c/proactor/broker.c | 1 +
examples/c/proactor/direct.c | 1 +
examples/c/proactor/test.py | 17 +-
examples/exampletest.py | 38 ++--
proton-c/src/proactor/libuv.c | 331 ++++++++++++++++++-----------------
proton-c/src/tests/proactor.c | 72 ++++++--
proton-c/src/tests/test_tools.h | 13 ++
7 files changed, 261 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index a7d8863..370d2e8 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -472,6 +472,7 @@ int main(int argc, char **argv) {
pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
+ fflush(stdout);
if (url) pn_url_free(url);
if (b.threads <= 0) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index de2d6aa..126ae4f 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -344,6 +344,7 @@ int main(int argc, char **argv) {
app.proactor = pn_proactor();
pn_proactor_listen(app.proactor, pn_listener(), host, port, 16);
printf("listening on '%s:%s'\n", host, port);
+ fflush(stdout);
if (url) pn_url_free(url);
do {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 7d6f3fc..f6a6562 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -53,23 +53,13 @@ class CExampleTest(BrokerTestCase):
r = self.proc(["receive", "-a", self.addr, "-m3"])
self.assertEqual(receive_expect(3), r.wait_out())
- def retry(self, args, max=10):
- """Run until output does not contain "connection refused", up to max retries"""
- while True:
- try:
- return self.proc(args).wait_out()
- except ProcError, e:
- if "connection refused" in e.args[0] and max > 0:
- max -= 1
- continue
- raise
-
def test_send_direct(self):
"""Send to direct server"""
with bind0() as sock:
addr = "127.0.0.1:%s/examples" % sock.port()
d = self.proc(["direct", "-a", addr])
- self.assertEqual("100 messages sent and acknowledged\n", self.retry(["send", "-a", addr]))
+ d.wait_re("listening")
+ self.assertEqual("100 messages sent and acknowledged\n", self.proc(["send", "-a", addr]).wait_out())
self.assertIn(receive_expect(100), d.wait_out())
def test_receive_direct(self):
@@ -77,7 +67,8 @@ class CExampleTest(BrokerTestCase):
with bind0() as sock:
addr = "127.0.0.1:%s/examples" % sock.port()
d = self.proc(["direct", "-a", addr])
- self.assertEqual(receive_expect(100), self.retry(["receive", "-a", addr]))
+ d.wait_re("listenin")
+ self.assertEqual(receive_expect(100), self.proc(["receive", "-a", addr]).wait_out())
self.assertIn("100 messages sent and acknowledged\n", d.wait_out())
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
index 8dbd1ed..546b426 100644
--- a/examples/exampletest.py
+++ b/examples/exampletest.py
@@ -104,6 +104,20 @@ class Proc(Popen):
raise ProcError(self)
return self.out
+ def wait_re(self, regexp, timeout=10):
+ """
+ Wait for regexp to appear in the output, returns the re.search match result.
+ The target process should flush() important output to ensure it appears.
+ """
+ if timeout:
+ deadline = time.time() + timeout
+ while timeout is None or time.time() < deadline:
+ match = re.search(regexp, self.out)
+ if match:
+ return match
+ time.sleep(0.01) # Not very efficient
+ raise ProcError(self, "gave up waiting for '%s' after %ss" % (regexp, timeout))
+
# Work-around older python unittest that lacks setUpClass.
if hasattr(unittest.TestCase, 'setUpClass') and hasattr(unittest.TestCase, 'tearDownClass'):
TestCase = unittest.TestCase
@@ -144,21 +158,6 @@ class ExampleTestCase(TestCase):
self.procs.append(p)
return p
-def wait_port(port, timeout=10):
- """Wait up to timeout for port to be connectable."""
- if timeout:
- deadline = time.time() + timeout
- while (timeout is None or time.time() < deadline):
- try:
- s = socket.create_connection((None, port), timeout) # Works for IPv6 and v4
- s.close()
- return
- except socket.error, e:
- if e.errno != errno.ECONNREFUSED: # Only retry on connection refused error.
- raise
- raise socket.timeout("waiting for port %s" % port)
-
-
class BrokerTestCase(ExampleTestCase):
"""
ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
@@ -171,13 +170,14 @@ class BrokerTestCase(ExampleTestCase):
cls.port = sock.port()
cls.addr = "127.0.0.1:%s/examples" % (cls.port)
cls.broker = None # In case Proc throws, create the attribute.
- cls.broker = Proc(cls.broker_exe + ["-a", cls.addr])
+ cls.broker = Proc(cls.broker_exe + ["-a", cls.addr], bufsize=0)
try:
- wait_port(cls.port)
+ cls.broker.wait_re("listening")
except Exception, e:
cls.broker.kill()
- raise ProcError(cls.broker, "timed out waiting for port")
- sock.close()
+ raise
+ finally:
+ sock.close()
@classmethod
def tearDownClass(cls):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 8e2ff05..322f353 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -44,7 +44,7 @@
/*
libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe
- "wakeup" that can wake the uv_loop from another thread.
+ call that we use to make uv_run() return.
To provide concurrency proactor uses a "leader-worker-follower" model, threads take
turns at the roles:
@@ -97,6 +97,9 @@ typedef enum {
typedef struct psocket_t {
/* Immutable */
pn_proactor_t *proactor;
+ char host[NI_MAXHOST];
+ char port[NI_MAXSERV];
+ bool is_conn;
/* Protected by proactor.lock */
struct psocket_t* next;
@@ -106,9 +109,6 @@ typedef struct psocket_t {
/* Only used by leader thread when it owns the psocket */
uv_tcp_t tcp;
- char host[NI_MAXHOST];
- char port[NI_MAXSERV];
- bool is_conn;
} psocket_t;
/* Special value for psocket.next pointer when socket is not on any any list. */
@@ -165,7 +165,6 @@ struct pn_listener_t {
pn_record_t *attachments;
void *context;
size_t backlog;
- bool closing; /* close requested or closed by error */
/* Only used in leader thread */
size_t connections; /* number of connections waiting to be accepted */
@@ -201,15 +200,18 @@ struct pn_proactor_t {
};
/* Push ps to back of q. Must not be on a different queue */
-static void push_lh(queue *q, psocket_t *ps) {
- assert(ps->next == &UNLISTED);
- ps->next = NULL;
- if (!q->front) {
- q->front = q->back = ps;
- } else {
- q->back->next = ps;
- q->back = ps;
+static bool push_lh(queue *q, psocket_t *ps) {
+ if (ps->next == &UNLISTED) {
+ ps->next = NULL;
+ if (!q->front) {
+ q->front = q->back = ps;
+ } else {
+ q->back->next = ps;
+ q->back = ps;
+ }
+ return true;
}
+ return false;
}
/* Pop returns front of q or NULL if empty */
@@ -222,23 +224,33 @@ static psocket_t* pop_lh(queue *q) {
return ps;
}
+/* Notify the leader thread that there is something to do outside of uv_run() */
+static inline void notify(pn_proactor_t* p) {
+ uv_async_send(&p->async);
+}
+
+static void to_leader_lh(psocket_t *ps) {
+ if (push_lh(&ps->proactor->leader_q, ps)) {
+ ps->state = ON_LEADER;
+ }
+}
+
/* Queue an action for the leader thread */
static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
uv_mutex_lock(&ps->proactor->lock);
ps->action = action;
- if (ps->next == &UNLISTED) {
- ps->state = ON_LEADER;
- push_lh(&ps->proactor->leader_q, ps);
- }
+ to_leader_lh(ps);
uv_mutex_unlock(&ps->proactor->lock);
- uv_async_send(&ps->proactor->async); /* Wake leader */
+ notify(ps->proactor);
}
/* Push to the worker thread */
static void to_worker(psocket_t *ps) {
uv_mutex_lock(&ps->proactor->lock);
- ps->state = ON_WORKER;
- push_lh(&ps->proactor->worker_q, ps);
+ if (push_lh(&ps->proactor->worker_q, ps)) {
+ ps->state = ON_WORKER;
+ }
+ notify(ps->proactor);
uv_mutex_unlock(&ps->proactor->lock);
}
@@ -260,8 +272,8 @@ static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
push_lh(&ps->proactor->leader_q, ps);
ps->state = ON_LEADER; /* Otherwise notify the leader */
}
- uv_mutex_unlock(&ps->proactor->lock);
uv_async_send(&ps->proactor->async); /* Wake leader */
+ uv_mutex_unlock(&ps->proactor->lock);
}
static inline pconnection_t *as_pconnection(psocket_t* ps) {
@@ -311,7 +323,7 @@ static void leader_count(pn_proactor_t *p, int change) {
p->count += change;
if (p->count == 0) {
p->inactive = true;
- uv_async_send(&p->async); /* Wake leader */
+ notify(p);
}
uv_mutex_unlock(&p->lock);
}
@@ -409,10 +421,8 @@ static int leader_init(psocket_t *ps) {
/* Outgoing connection */
static void on_connect(uv_connect_t *connect, int err) {
pconnection_t *pc = (pconnection_t*)connect->data;
- if (!err) {
+ if (!pconnection_error(pc, err, "on connect to")) {
pconnection_to_worker(pc);
- } else {
- pconnection_error(pc, err, "on connect to");
}
}
@@ -552,7 +562,7 @@ static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
static void pconnection_to_uv(pconnection_t *pc) {
to_uv(&pc->psocket); /* Assume we're going to UV unless sent elsewhere */
if (pn_connection_driver_finished(&pc->driver)) {
- if (!uv_is_closing((uv_handle_t*)&pc->psocket)) {
+ if (!uv_is_closing((uv_handle_t*)&pc->psocket.tcp)) {
uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
}
return;
@@ -653,8 +663,64 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
return &p->batch;
}
+void leader_wake_connection(psocket_t *ps) {
+ assert(ps->state == ON_LEADER);
+ pconnection_t *pc = as_pconnection(ps);
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+ pconnection_to_worker(pc);
+}
+
+static void on_stopping(uv_handle_t* h, void* v) {
+ /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
+ if (h->type == UV_TCP && !uv_is_closing(h)) {
+ uv_close(h, on_close_psocket);
+ }
+}
+
+static pn_event_t *log_event(void* p, pn_event_t *e) {
+ if (e) {
+ pn_logf("[%p]:(%s)\n", (void*)p, pn_event_type_name(pn_event_type(e)));
+ }
+ return e;
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+ pn_listener_t *l = batch_listener(batch);
+ assert(l->psocket.state == ON_WORKER);
+ pn_event_t *prev = pn_collector_prev(l->collector);
+ if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
+ l->err = UV_EOF;
+ }
+ return log_event(l, pn_collector_next(l->collector));
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+ pn_proactor_t *p = batch_proactor(batch);
+ assert(p->batch_working);
+ return log_event(p, pn_collector_next(p->collector));
+}
+
+static void pn_listener_free(pn_listener_t *l) {
+ /* No assert(l->psocket.state == ON_WORKER); can be called during shutdown */
+ if (l) {
+ if (l->collector) pn_collector_free(l->collector);
+ if (l->condition) pn_condition_free(l->condition);
+ if (l->attachments) pn_free(l->attachments);
+ free(l);
+ }
+}
+
+void leader_listener_close(psocket_t *ps) {
+ assert(ps->state = ON_LEADER);
+ pn_listener_t *l = (pn_listener_t*)ps;
+ l->err = UV_EOF;
+ listener_to_uv(l);
+}
+
/* Return the next event batch or 0 if no events are available in the worker_q */
-static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
+static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
+ /* FIXME aconway 2017-02-21: generate these in parallel? */
if (!p->batch_working) { /* Can generate proactor events */
if (p->inactive) {
p->inactive = false;
@@ -680,55 +746,9 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
return 0;
}
-pn_listener_t *pn_event_listener(pn_event_t *e) {
- return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
-}
+/* Process the leader_q and the UV loop, in the leader thread */
+static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
-pn_proactor_t *pn_event_proactor(pn_event_t *e) {
- if (pn_event_class(e) == pn_proactor__class()) {
- return (pn_proactor_t*)pn_event_context(e);
- }
- pn_listener_t *l = pn_event_listener(e);
- if (l) {
- return l->psocket.proactor;
- }
- pn_connection_t *c = pn_event_connection(e);
- if (c) {
- return pn_connection_proactor(pn_event_connection(e));
- }
- return NULL;
-}
-
-void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
- pconnection_t *pc = batch_pconnection(batch);
- if (pc) {
- assert(pc->psocket.state == ON_WORKER);
- if (pn_connection_driver_has_event(&pc->driver)) {
- /* Process all events before going back to leader */
- pconnection_to_worker(pc);
- } else {
- to_leader(&pc->psocket, psocket_to_uv);
- }
- return;
- }
- pn_listener_t *l = batch_listener(batch);
- if (l) {
- assert(l->psocket.state == ON_WORKER);
- to_leader(&l->psocket, psocket_to_uv);
- return;
- }
- pn_proactor_t *bp = batch_proactor(batch);
- if (bp == p) {
- uv_mutex_lock(&p->lock);
- p->batch_working = false;
- uv_mutex_unlock(&p->lock);
- uv_async_send(&p->async); /* Wake leader */
- return;
- }
-}
-
-/* Process the leader_q, in the leader thread */
-static void leader_process_lh(pn_proactor_t *p) {
if (p->timeout_request) {
p->timeout_request = false;
if (p->timeout) {
@@ -738,7 +758,6 @@ static void leader_process_lh(pn_proactor_t *p) {
}
}
for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
- assert(ps->state == ON_LEADER);
if (ps->action) {
uv_mutex_unlock(&p->lock);
ps->action(ps);
@@ -750,68 +769,111 @@ static void leader_process_lh(pn_proactor_t *p) {
ps->wakeup = NULL;
uv_mutex_lock(&p->lock);
}
+ pn_event_batch_t *batch = get_batch_lh(p);
+ if (batch) return batch;
}
+ uv_mutex_unlock(&p->lock);
+ uv_run(&p->loop, mode);
+ uv_mutex_lock(&p->lock);
+ return get_batch_lh(p);
}
-/* Run follower/leader loop till we can return an event and be a worker */
-pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+/* ==== public API ==== */
+
+pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
- /* Try to grab work immediately. */
pn_event_batch_t *batch = get_batch_lh(p);
- if (batch == NULL) {
- /* No work available, follow the leader */
- while (p->has_leader) {
- uv_cond_wait(&p->cond, &p->lock);
- }
- /* Lead till there is work to do. */
+ if (batch == NULL && !p->has_leader) {
+ /* Try a non-blocking lead to generate some work */
p->has_leader = true;
- while (batch == NULL) {
- leader_process_lh(p);
- batch = get_batch_lh(p);
- if (batch == NULL) {
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, UV_RUN_ONCE);
- uv_mutex_lock(&p->lock);
- }
- }
- /* Signal the next leader and go to work */
+ batch = leader_lead_lh(p, UV_RUN_NOWAIT);
p->has_leader = false;
- uv_cond_signal(&p->cond);
+ uv_cond_signal(&p->cond); /* Notify next leader */
}
uv_mutex_unlock(&p->lock);
return batch;
}
-pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
pn_event_batch_t *batch = get_batch_lh(p);
- if (batch == NULL && !p->has_leader) {
- /* If there is no leader, try a non-waiting lead to generate some work */
- p->has_leader = true;
- leader_process_lh(p);
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, UV_RUN_NOWAIT);
- uv_mutex_lock(&p->lock);
+ while (!batch && p->has_leader) {
+ uv_cond_wait(&p->cond, &p->lock); /* Follow the leader */
batch = get_batch_lh(p);
+ }
+ if (!batch) { /* Become leader */
+ assert(!p->has_leader); /* Implied by loop condition */
+ p->has_leader = true;
+ do {
+ batch = leader_lead_lh(p, UV_RUN_ONCE);
+ } while (!batch);
p->has_leader = false;
+ uv_cond_signal(&p->cond);
}
uv_mutex_unlock(&p->lock);
return batch;
}
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ if (pc) {
+ assert(pc->psocket.state == ON_WORKER);
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ /* Process all events before going back to leader */
+ pconnection_to_worker(pc);
+ } else {
+ to_leader(&pc->psocket, psocket_to_uv);
+ }
+ return;
+ }
+ pn_listener_t *l = batch_listener(batch);
+ if (l) {
+ assert(l->psocket.state == ON_WORKER);
+ to_leader(&l->psocket, psocket_to_uv);
+ return;
+ }
+ pn_proactor_t *bp = batch_proactor(batch);
+ if (bp == p) {
+ uv_mutex_lock(&p->lock);
+ p->batch_working = false;
+ notify(p);
+ uv_mutex_unlock(&p->lock);
+ return;
+ }
+}
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+ return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+ if (pn_event_class(e) == pn_proactor__class()) {
+ return (pn_proactor_t*)pn_event_context(e);
+ }
+ pn_listener_t *l = pn_event_listener(e);
+ if (l) {
+ return l->psocket.proactor;
+ }
+ pn_connection_t *c = pn_event_connection(e);
+ if (c) {
+ return pn_connection_proactor(pn_event_connection(e));
+ }
+ return NULL;
+}
+
void pn_proactor_interrupt(pn_proactor_t *p) {
uv_mutex_lock(&p->lock);
++p->interrupt;
+ notify(p);
uv_mutex_unlock(&p->lock);
- uv_async_send(&p->async); /* Wake the UV loop */
}
void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
uv_mutex_lock(&p->lock);
p->timeout = t;
p->timeout_request = true;
+ notify(p);
uv_mutex_unlock(&p->lock);
- uv_async_send(&p->async); /* Wake the UV loop */
}
int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
@@ -836,14 +898,6 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
return pc ? pc->psocket.proactor : NULL;
}
-void leader_wake_connection(psocket_t *ps) {
- assert(ps->state == ON_LEADER);
- pconnection_t *pc = as_pconnection(ps);
- pn_connection_t *c = pc->driver.connection;
- pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
- pconnection_to_worker(pc);
-}
-
void pn_connection_wake(pn_connection_t* c) {
/* May be called from any thread */
wakeup(&get_pconnection(c)->psocket, leader_wake_connection);
@@ -858,18 +912,11 @@ pn_proactor_t *pn_proactor() {
uv_mutex_init(&p->lock);
uv_cond_init(&p->cond);
uv_async_init(&p->loop, &p->async, NULL);
- uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
+ uv_timer_init(&p->loop, &p->timer);
p->timer.data = p;
return p;
}
-static void on_stopping(uv_handle_t* h, void* v) {
- /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
- if (h->type == UV_TCP && !uv_is_closing(h)) {
- uv_close(h, on_close_psocket);
- }
-}
-
void pn_proactor_free(pn_proactor_t *p) {
uv_timer_stop(&p->timer);
uv_close((uv_handle_t*)&p->timer, NULL);
@@ -885,39 +932,6 @@ void pn_proactor_free(pn_proactor_t *p) {
free(p);
}
-static pn_event_t *log_event(void* p, pn_event_t *e) {
- if (e) {
- pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
- }
- return e;
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
- pn_listener_t *l = batch_listener(batch);
- assert(l->psocket.state == ON_WORKER);
- pn_event_t *prev = pn_collector_prev(l->collector);
- if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
- l->err = UV_EOF;
- }
- return log_event(l, pn_collector_next(l->collector));
-}
-
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
- pn_proactor_t *p = batch_proactor(batch);
- assert(p->batch_working);
- return log_event(p, pn_collector_next(p->collector));
-}
-
-static void pn_listener_free(pn_listener_t *l) {
- /* No assert(l->psocket.state == ON_WORKER); can be called during shutdown */
- if (l) {
- if (l->collector) pn_collector_free(l->collector);
- if (l->condition) pn_condition_free(l->condition);
- if (l->attachments) pn_free(l->attachments);
- free(l);
- }
-}
-
pn_listener_t *pn_listener(void) {
pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
if (l) {
@@ -933,13 +947,6 @@ pn_listener_t *pn_listener(void) {
return l;
}
-void leader_listener_close(psocket_t *ps) {
- assert(ps->state = ON_LEADER);
- pn_listener_t *l = (pn_listener_t*)ps;
- l->err = UV_EOF;
- listener_to_uv(l);
-}
-
void pn_listener_close(pn_listener_t* l) {
/* This can be called from any thread, not just the owner of l */
wakeup(&l->psocket, leader_listener_close);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 6b95c11..6595a0b 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -98,7 +98,7 @@ static void proactor_test_init(proactor_test_t *pts, size_t n) {
pn_proactor_get. Continue till all handlers return H_FINISHED (and return 0) or one
returns H_FAILED (and return non-0)
*/
-int proactor_test_run(proactor_test_t *pts, size_t n) {
+static int proactor_test_run(proactor_test_t *pts, size_t n) {
/* Make sure pts are initialized */
proactor_test_init(pts, n);
size_t finished = 0;
@@ -127,17 +127,11 @@ int proactor_test_run(proactor_test_t *pts, size_t n) {
/* Handler for test_listen_connect, does both sides of the connection */
-handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
+static handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
pn_connection_t *c = pn_event_connection(e);
pn_listener_t *l = pn_event_listener(e);
switch (pn_event_type(e)) {
- /* Ignore these events */
- case PN_CONNECTION_LOCAL_OPEN:
- case PN_CONNECTION_BOUND:
- case PN_CONNECTION_INIT:
- return H_CONTINUE;
-
/* Act on these events */
case PN_LISTENER_ACCEPT: {
pn_connection_t *accepted = pn_connection();
@@ -149,20 +143,22 @@ handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
case PN_CONNECTION_REMOTE_OPEN:
if (pn_connection_state(c) | PN_LOCAL_ACTIVE) { /* Client is fully open - the test is done */
pn_connection_close(c);
- return H_FINISHED;
} else { /* Server returns the open */
pn_connection_open(c);
}
+ return H_CONTINUE;
case PN_CONNECTION_REMOTE_CLOSE:
if (pn_connection_state(c) | PN_LOCAL_ACTIVE) {
pn_connection_close(c); /* Return the close */
}
+ return H_CONTINUE;
+
+ case PN_TRANSPORT_CLOSED:
return H_FINISHED;
default:
- TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
- return H_FAILED;
+ return H_CONTINUE;
break;
}
}
@@ -172,13 +168,13 @@ static void test_early_error(test_t *t) {
pn_proactor_t *p = pn_proactor();
pn_proactor_set_timeout(p, timeout); /* In case of hang */
pn_connection_t *c = pn_connection();
- pn_proactor_connect(p, c, "badhost", "amqp");
+ pn_proactor_connect(p, c, localhost, "1"); /* Bad port */
pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
pn_listener_t *l = pn_listener();
- pn_proactor_listen(p, l, "badhost", "amqp", 1);
+ pn_proactor_listen(p, l, localhost, "1", 1); /* Bad port */
etype = wait_for(p, PN_LISTENER_CLOSE);
TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
@@ -204,6 +200,45 @@ static void test_listen_connect(test_t *t) {
pn_proactor_free(server);
}
+static handler_state_t connection_wakeup_handler(test_t *t, pn_event_t *e) {
+ pn_connection_t *c = pn_event_connection(e);
+ switch (pn_event_type(e)) {
+
+ case PN_CONNECTION_REMOTE_OPEN:
+ if (pn_connection_state(c) | PN_LOCAL_UNINIT) {
+ pn_connection_open(c); /* Server returns the open */
+ }
+ return H_FINISHED; /* Finish when open at both ends */
+
+ default:
+ /* Otherwise same as listen_connect_handler */
+ return listen_connect_handler(t, e);
+ }
+}
+
+/* Test waking up a connection that is idle */
+static void test_connection_wakeup(test_t *t) {
+ proactor_test_t pts[] = { { t, connection_wakeup_handler }, { t, connection_wakeup_handler } };
+ proactor_test_init(pts, 2);
+ pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+ test_port_t port = test_port(); /* Hold a port */
+ pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
+ pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
+ if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+ sock_close(port.sock);
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(client, c, localhost, port.str);
+ proactor_test_run(pts, 2); /* Will finish when client is connected */
+ TEST_CHECK(t, NULL == pn_proactor_get(client), ""); /* Should be idle */
+ pn_connection_wake(c);
+ etype = wait_next(client);
+ /* FIXME aconway 2017-02-21: TEST_EVENT_TYPE */
+ TEST_CHECK(t, PN_CONNECTION_WAKE == etype, pn_event_type_name(etype));
+ }
+ pn_proactor_free(client);
+ pn_proactor_free(server);
+}
+
/* Test that INACTIVE event is generated when last connections/listeners closes. */
static void test_inactive(test_t *t) {
proactor_test_t pts[] = { { t, listen_connect_handler }, { t, listen_connect_handler }};
@@ -226,11 +261,12 @@ static void test_inactive(test_t *t) {
pn_proactor_free(server);
}
-int main(int argv, char** argc) {
+int main(int argc, char **argv) {
int failed = 0;
- RUN_TEST(failed, t, test_inactive(&t));
- RUN_TEST(failed, t, test_interrupt_timeout(&t));
- RUN_TEST(failed, t, test_early_error(&t));
- RUN_TEST(failed, t, test_listen_connect(&t));
+ RUN_ARGV_TEST(failed, t, test_inactive(&t));
+ RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
+ RUN_ARGV_TEST(failed, t, test_early_error(&t));
+ RUN_ARGV_TEST(failed, t, test_listen_connect(&t));
+ RUN_ARGV_TEST(failed, t, test_connection_wakeup(&t));
return failed;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 2663dcf..7006334 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -104,6 +104,19 @@ static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const ch
} \
} while(0)
+/* Like RUN_TEST but only if one of the argv strings is found in the test EXPR */
+#define RUN_ARGV_TEST(FAILED, T, EXPR) do { \
+ if (argc == 1) { \
+ RUN_TEST(FAILED, T, EXPR); \
+ } else { \
+ for (int i = 1; i < argc; ++i) { \
+ if (strstr(#EXPR, argv[i])) { \
+ RUN_TEST(FAILED, T, EXPR); \
+ break; \
+ } \
+ } \
+ } \
+ } while(0)
/* Some very simple platform-secifics to acquire an unused socket */
#if defined(WIN32)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org