You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/09 01:25:51 UTC
[28/50] [abbrv] qpid-proton git commit: PROTON-1460: C epoll proactor,
deterministic socket IO callbacks on close
PROTON-1460: C epoll proactor, deterministic socket IO callbacks on close
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d25089bf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d25089bf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d25089bf
Branch: refs/heads/go1
Commit: d25089bf2834dc4f650697c39d09dfb95c5509a2
Parents: ec1d1a3
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed May 24 23:48:34 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Wed May 24 23:59:41 2017 -0700
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 55 +++++++++++++++++++++++---------------
1 file changed, 34 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d25089bf/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 2f99cd9..c65fa44 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -682,6 +682,7 @@ static void pconnection_final_free(pconnection_t *pc) {
// call without lock, but only if pconnection_is_final() is true
static void pconnection_cleanup(pconnection_t *pc) {
+ stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
if (pc->psocket.sockfd != -1)
close(pc->psocket.sockfd);
stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
@@ -698,8 +699,16 @@ static void pconnection_cleanup(pconnection_t *pc) {
static void pconnection_begin_close(pconnection_t *pc) {
if (!pc->context.closing) {
pc->context.closing = true;
- stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
- pc->current_arm = 0;
+ if (pc->current_arm != 0 && !pc->new_events) {
+ // Force io callback via hangup
+ if (pc->current_arm != (EPOLLIN | EPOLLOUT)) {
+ pc->current_arm = (EPOLLIN | EPOLLOUT);
+ pc->psocket.epoll_io.wanted = pc->current_arm;;
+ rearm(pc->psocket.proactor, &pc->psocket.epoll_io);
+ }
+ shutdown(pc->psocket.sockfd, SHUT_RDWR);
+ }
+
pn_connection_driver_close(&pc->driver);
if (ptimer_shutdown(&pc->timer, pc->timer_armed))
pc->timer_armed = false; // disarmed in the sense that the timer will never fire again
@@ -713,6 +722,8 @@ static void pconnection_begin_close(pconnection_t *pc) {
static void pconnection_forced_shutdown(pconnection_t *pc) {
// Called by proactor_free, no competing threads, no epoll activity.
+ pc->current_arm = 0;
+ pc->new_events = 0;
pconnection_begin_close(pc);
// pconnection_process will never be called again. Zero everything.
pc->timer_armed = false;
@@ -886,12 +897,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
// Confirmed as working thread. Review state and unlock ASAP.
- if (pc->context.closing && pconnection_is_final(pc)) {
- unlock(&pc->context.mutex);
- pconnection_cleanup(pc);
- return NULL;
- }
-
retry:
if (pc->queued_disconnect) { // From pn_proactor_disconnect()
@@ -919,28 +924,36 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
if (pc->new_events) {
- if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc))
- pconnection_maybe_connect_lh(pc);
- else
- pconnection_connected_lh(pc); /* Non error event means we are connected */
- if (pc->new_events & EPOLLOUT)
- pc->write_blocked = false;
- if (pc->new_events & EPOLLIN)
- pc->read_blocked = false;
+ if (!pc->context.closing) {
+ if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc))
+ pconnection_maybe_connect_lh(pc);
+ else
+ pconnection_connected_lh(pc); /* Non error event means we are connected */
+ if (pc->new_events & EPOLLOUT)
+ pc->write_blocked = false;
+ if (pc->new_events & EPOLLIN)
+ pc->read_blocked = false;
+ }
pc->current_arm = 0;
pc->new_events = 0;
}
bool unarmed = (pc->current_arm == 0);
- if (!pc->timer_armed) {
+
+ if (pc->context.closing && pconnection_is_final(pc)) {
+ unlock(&pc->context.mutex);
+ pconnection_cleanup(pc);
+ return NULL;
+ }
+
+ if (!pc->timer_armed && !pc->timer.shutting_down) {
pc->timer_armed = true; // about to rearm outside the lock
timer_unarmed = true; // so we remember
}
- bool timer_shutting_down = pc->timer.shutting_down;
unlock(&pc->context.mutex);
pc->hog_count++; // working context doing work
- if (timer_unarmed && !timer_shutting_down) {
+ if (timer_unarmed) {
rearm(pc->psocket.proactor, &pc->timer.epoll_io);
timer_unarmed = false;
}
@@ -1279,7 +1292,6 @@ void pn_listener_free(pn_listener_t *l) {
/* Note at this point either the listener has never been used (freed by user)
or it has been closed, so all its sockets are closed.
*/
- // TODO: do we need a QPID DeletionManager equivalent to be safe from inbound connection (accept) epoll events?
if (l) {
bool can_free = true;
if (l->collector) pn_collector_free(l->collector);
@@ -1296,6 +1308,7 @@ void pn_listener_free(pn_listener_t *l) {
}
static void listener_begin_close(pn_listener_t* l) {
+ // TODO: switch to shutdown(sock, SHUT_RD) and wait for HUP callback per listener socket (analogous to pconnection)
if (!l->context.closing) {
l->context.closing = true;
/* Close all listening sockets */
@@ -1338,7 +1351,7 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
pn_listener_t *l = psocket_listener(ps);
lock(&l->context.mutex);
if (events) {
- l->armed = false;
+ l->armed = false; // TODO: armed logic should be per socket not per aggregate listener
if (events & EPOLLRDHUP) {
/* Calls listener_begin_close which closes all the listener's sockets */
psocket_error(ps, errno, "listener epoll");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org