You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/04/22 20:49:49 UTC
qpid-proton git commit: PROTON-1460: epoll test fixes: log_event,
listener setup, EOS detection
Repository: qpid-proton
Updated Branches:
refs/heads/master d16794ba0 -> 80def6f9a
PROTON-1460: epoll test fixes: log_event, listener setup, EOS detection
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/80def6f9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/80def6f9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/80def6f9
Branch: refs/heads/master
Commit: 80def6f9a5ae4c37b69d5da2b6db650ec1b63821
Parents: d16794b
Author: Clifford Jansen <cl...@apache.org>
Authored: Sat Apr 22 13:49:22 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Sat Apr 22 13:49:22 2017 -0700
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 53 +++++++++++++++++++++++++++++---------
1 file changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/80def6f9/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 7e2ee2c..126a7e2 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -232,11 +232,16 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
* connection, each listener, the proactor itself. The serialization
* is presented to the application via each associated event batch.
*
+ * Multiple threads can be trying to do work on a single context
+ * (i.e. socket IO is ready and wakeup at same time). Mutexes are used
+ * to manage contention. Some vars are only ever touched by one
+ * "working" thread and are accessed without holding the mutex.
+ *
* Currently internal wakeups (via wake()/wake_notify()) are used to
* force a context to check if it has work to do. To minimize trips
- * through the kernel, wake() is a no-op if the context is working.
- * Conversely, a context must never stop working without checking if
- * it has newly arrived work.
+ * through the kernel, wake() is a no-op if the context has a working
+ * thread. Conversely, a thread must never stop working without
+ * checking if it has newly arrived work.
*
* External wake operations, like pn_connection_wake() and
* pn_proactor_interrupt(), are built on top of the internal wake
@@ -506,6 +511,13 @@ static inline bool proactor_has_event(pn_proactor_t *p) {
return (p->cached_event || (p->cached_event = pn_collector_next(p->collector)));
}
+static pn_event_t *log_event(void* p, pn_event_t *e) {
+ if (e) {
+ pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
+ }
+ return e;
+}
+
static void psocket_error(psocket_t *ps, int err, const char* what) {
if (ps->is_conn) {
pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
@@ -622,6 +634,7 @@ void pconnection_begin_close(pconnection_t *pc) {
pc->psocket.closing = true;
pc->read_closed = pc->write_closed = true;
stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
+ pc->current_arm = 0;
pn_connection_driver_close(&pc->driver);
ptimer_set(&pc->timer, 0);
}
@@ -856,7 +869,11 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
if (!pc->read_closed) {
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
- if (rbuf.size > 0 && (!pc->read_blocked)) {
+ if (rbuf.size == 0) {
+ if (pn_connection_driver_read_closed(&pc->driver))
+ pc->read_closed = true;
+ }
+ else if (!pc->read_blocked) {
ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size);
if (n > 0) {
@@ -932,6 +949,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
goto retry; // TODO: get rid of goto without adding more locking
pc->context.working = false;
+ pc->hog_count = 0;
bool rearm = pconnection_rearm_check(pc);
unlock(&pc->context.mutex);
@@ -971,6 +989,7 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
// TODO: check case of proactor shutting down
lock(&pc->context.mutex);
proactor_add(&pc->psocket);
+ pn_connection_open(pc->driver.connection); /* Auto-open */
struct addrinfo *ai = NULL;
int fd = -1;
@@ -1026,6 +1045,12 @@ void pn_connection_wake(pn_connection_t* c) {
if (notify) wake_notify(&pc->context);
}
+void pn_proactor_release_connection(pn_connection_t *c) {
+ pconnection_t *pc = get_pconnection(c);
+ if (pc) {
+ pn_connection_driver_release_connection(&pc->driver);
+ }
+}
// ========================================================================
// listener
@@ -1046,6 +1071,8 @@ pn_listener_t *pn_listener() {
pn_listener_free(l);
return NULL;
}
+ pn_proactor_t *unknown = NULL; // won't know until pn_proactor_listen
+ pcontext_init(&l->context, LISTENER, unknown, l);
}
return l;
}
@@ -1056,12 +1083,15 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
assert(buf); // TODO: memory safety
char *scheme, *user, *pass, *host, *port, *path;
pni_parse_url(buf, &scheme, &user, &pass, &host, &port, &path);
- pcontext_init(&l->context, LISTENER, p, l);
+ // TODO: check listener not already listening for this or another proactor
+ lock(&l->context.mutex);
+ l->context.proactor = p;;
psocket_init(&l->psocket, p, false, host, port);
l->backlog = backlog;
proactor_add(&l->psocket);
/* Always put an OPEN event for symmetry, even if we immediately close with err */
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+ bool notify = wake(&l->context);
struct addrinfo *ai = NULL;
int fd = -1;
@@ -1076,6 +1106,8 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
l->psocket.epoll_io.fd = fd;
l->psocket.epoll_io.wanted = EPOLLIN;
start_polling(&l->psocket.epoll_io, l->psocket.proactor->epollfd); // TODO: check for error
+ unlock(&l->context.mutex);
+ if (notify) wake_notify(&l->context);
free(buf);
return;
}
@@ -1083,8 +1115,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
}
psocket_error(&l->psocket, errno, "listen on");
+ unlock(&l->context.mutex);
+ if (notify) wake_notify(&l->context);
if (ai) freeaddrinfo(ai);
- wake(&l->context);
free(buf);
return;
}
@@ -1190,7 +1223,7 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
if (e && pn_event_type(e) == PN_LISTENER_CLOSE)
l->close_dispatched = true;
unlock(&l->context.mutex);
- return e;
+ return log_event(l, e);
}
static void listener_done(pn_listener_t *l) {
@@ -1396,7 +1429,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
e = p->cached_event;
unlock(&p->context.mutex);
p->cached_event = NULL;
- return e;
+ return log_event(p, e);
}
static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) {
@@ -1711,7 +1744,3 @@ size_t pn_proactor_addr_str(const struct pn_proactor_addr_t* addr, char *buf, si
}
}
-/* FIXME aconway 2017-04-21: dummy to make test link, needs implementation */
-void pn_proactor_release_connection(pn_connection_t *connection) {
- abort();
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org