You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/01/14 00:32:45 UTC
qpid-proton git commit: added pn_event_reactor
Repository: qpid-proton
Updated Branches:
refs/heads/master 99df0a33d -> fe31dcea7
added pn_event_reactor
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fe31dcea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fe31dcea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fe31dcea
Branch: refs/heads/master
Commit: fe31dcea7033eea2faf2700d287ae9f94d78abda
Parents: 99df0a3
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Tue Jan 13 18:32:29 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Tue Jan 13 18:32:29 2015 -0500
----------------------------------------------------------------------
proton-c/include/proton/reactor.h | 2 +
proton-c/src/reactor/acceptor.c | 1 +
proton-c/src/reactor/reactor.c | 77 ++++++++++++++++++++++++++++++++--
proton-c/src/reactor/reactor.h | 2 +
proton-c/src/tests/reactor.c | 27 +++++++-----
5 files changed, 94 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
index e5d953f..1b21195 100644
--- a/proton-c/include/proton/reactor.h
+++ b/proton-c/include/proton/reactor.h
@@ -85,6 +85,8 @@ PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
+PN_EXTERN pn_reactor_t *pn_event_reactor(pn_event_t *event);
+
/** @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c
index 889016c..34446a2 100644
--- a/proton-c/src/reactor/acceptor.c
+++ b/proton-c/src/reactor/acceptor.c
@@ -63,6 +63,7 @@ pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, cons
pn_socket_t socket = pn_listen(pn_reactor_io(reactor), host, port);
pni_selectable_set_fd(sel, socket);
pni_selectable_set_context(sel, reactor);
+ pni_record_init_reactor(pn_selectable_attachments(sel), reactor);
pni_record_init_handler(pn_selectable_attachments(sel), handler);
pn_reactor_update(reactor, sel);
return (pn_acceptor_t *) sel;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index 3857dd7..82b903d 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -23,15 +23,16 @@
#include <proton/io.h>
#include <proton/selector.h>
#include <proton/event.h>
-#include <proton/reactor.h>
#include <proton/transport.h>
#include <proton/connection.h>
#include <proton/session.h>
#include <proton/link.h>
+#include <proton/delivery.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
+#include "reactor.h"
#include "selectable.h"
#include "platform.h"
@@ -173,8 +174,21 @@ void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event);
void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event);
void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event);
-static void pni_reactor_dispatch(pn_reactor_t *reactor, pn_event_t *event) {
+static void pni_reactor_dispatch_pre(pn_reactor_t *reactor, pn_event_t *event) {
assert(reactor);
+ assert(event);
+ switch (pn_event_type(event)) {
+ case PN_CONNECTION_INIT:
+ pni_record_init_reactor(pn_connection_attachments(pn_event_connection(event)), reactor);
+ break;
+ default:
+ break;
+ }
+}
+
+static void pni_reactor_dispatch_post(pn_reactor_t *reactor, pn_event_t *event) {
+ assert(reactor);
+ assert(event);
switch (pn_event_type(event)) {
case PN_TRANSPORT:
pni_handle_transport(reactor, event);
@@ -202,6 +216,58 @@ void pni_record_init_handler(pn_record_t *record, pn_handler_t *handler) {
pn_record_set(record, PN_HANDLER, handler);
}
+static void *pni_reactor = NULL;
+#define PN_REACTOR ((pn_handle_t) &pni_reactor)
+
+pn_reactor_t *pni_record_get_reactor(pn_record_t *record) {
+ return (pn_reactor_t *) pn_record_get(record, PN_REACTOR);
+}
+
+void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor) {
+ pn_record_def(record, PN_REACTOR, PN_WEAKREF);
+ pn_record_set(record, PN_REACTOR, reactor);
+}
+
+static pn_connection_t *pni_object_connection(const pn_class_t *clazz, void *object) {
+ switch (pn_class_id(clazz)) {
+ case CID_pn_delivery:
+ return pn_session_connection(pn_link_session(pn_delivery_link((pn_delivery_t *) object)));
+ case CID_pn_link:
+ return pn_session_connection(pn_link_session((pn_link_t *) object));
+ case CID_pn_session:
+ return pn_session_connection((pn_session_t *) object);
+ case CID_pn_connection:
+ return (pn_connection_t *) object;
+ case CID_pn_transport:
+ return pn_transport_connection((pn_transport_t *) object);
+ default:
+ return NULL;
+ }
+}
+
+pn_reactor_t *pn_event_reactor(pn_event_t *event) {
+ const pn_class_t *clazz = pn_event_class(event);
+ void *context = pn_event_context(event);
+ switch (pn_class_id(clazz)) {
+ case CID_pn_reactor:
+ return (pn_reactor_t *) context;
+ case CID_pn_task:
+ return pni_record_get_reactor(pn_task_attachments((pn_task_t *) context));
+ case CID_pn_delivery:
+ case CID_pn_link:
+ case CID_pn_session:
+ case CID_pn_connection:
+ case CID_pn_transport:
+ {
+ pn_connection_t *conn = pni_object_connection(pn_event_class(event), context);
+ pn_record_t *record = pn_connection_attachments(conn);
+ return pni_record_get_reactor(record);
+ }
+ default:
+ return NULL;
+ }
+}
+
pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler) {
pn_handler_t *handler = NULL;
pn_link_t *link = pn_event_link(event);
@@ -229,7 +295,9 @@ pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler)
pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *handler) {
pn_timer_t *timer = pni_timer(reactor->timer);
pn_task_t *task = pn_timer_schedule(timer, reactor->now + delay);
- pni_record_init_handler(pn_task_attachments(task), handler);
+ pn_record_t *record = pn_task_attachments(task);
+ pni_record_init_reactor(record, reactor);
+ pni_record_init_handler(record, handler);
pn_reactor_update(reactor, reactor->timer);
return task;
}
@@ -238,9 +306,10 @@ void pn_reactor_process(pn_reactor_t *reactor) {
assert(reactor);
pn_event_t *event;
while ((event = pn_collector_peek(reactor->collector))) {
+ pni_reactor_dispatch_pre(reactor, event);
pn_handler_t *handler = pn_event_handler(event, reactor->handler);
pn_handler_dispatch(handler, event);
- pni_reactor_dispatch(reactor, event);
+ pni_reactor_dispatch_post(reactor, event);
pn_collector_pop(reactor->collector);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/reactor/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.h b/proton-c/src/reactor/reactor.h
index e21274d..398eb8b 100644
--- a/proton-c/src/reactor/reactor.h
+++ b/proton-c/src/reactor/reactor.h
@@ -26,5 +26,7 @@
pn_handler_t *pni_record_get_handler(pn_record_t *record);
void pni_record_init_handler(pn_record_t *record, pn_handler_t *handler);
+void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor);
+
#endif /* src/reactor.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index 4eff533..ce8c5cf 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -46,6 +46,7 @@ static void test_reactor_run(void) {
}
typedef struct {
+ pn_reactor_t *reactor;
pn_list_t *events;
} pni_test_handler_t;
@@ -54,11 +55,15 @@ pni_test_handler_t *thmem(pn_handler_t *handler) {
}
void test_dispatch(pn_handler_t *handler, pn_event_t *event) {
- pn_list_add(thmem(handler)->events, (void *) pn_event_type(event));
+ pni_test_handler_t *th = thmem(handler);
+ pn_reactor_t *reactor = pn_event_reactor(event);
+ assert(reactor == th->reactor);
+ pn_list_add(th->events, (void *) pn_event_type(event));
}
-pn_handler_t *test_handler(pn_list_t *events) {
+pn_handler_t *test_handler(pn_reactor_t *reactor, pn_list_t *events) {
pn_handler_t *handler = pn_handler_new(test_dispatch, sizeof(pni_test_handler_t), NULL);
+ thmem(handler)->reactor = reactor;
thmem(handler)->events = events;
return handler;
}
@@ -89,7 +94,7 @@ static void test_reactor_handler(void) {
pn_handler_t *handler = pn_reactor_handler(reactor);
assert(handler);
pn_list_t *events = pn_list(PN_VOID, 0);
- pn_handler_t *th = test_handler(events);
+ pn_handler_t *th = test_handler(reactor, events);
pn_handler_add(handler, th);
pn_decref(th);
pn_free(reactor);
@@ -103,7 +108,7 @@ static void test_reactor_handler_free(void) {
pn_handler_t *handler = pn_reactor_handler(reactor);
assert(handler);
pn_list_t *events = pn_list(PN_VOID, 0);
- pn_handler_add(handler, test_handler(events));
+ pn_handler_add(handler, test_handler(reactor, events));
pn_reactor_free(reactor);
expect(events, END);
pn_free(events);
@@ -115,7 +120,7 @@ static void test_reactor_handler_run(void) {
pn_handler_t *handler = pn_reactor_handler(reactor);
assert(handler);
pn_list_t *events = pn_list(PN_VOID, 0);
- pn_handler_t *th = test_handler(events);
+ pn_handler_t *th = test_handler(reactor, events);
pn_handler_add(handler, th);
pn_reactor_run(reactor);
expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
@@ -130,7 +135,7 @@ static void test_reactor_handler_run_free(void) {
pn_handler_t *handler = pn_reactor_handler(reactor);
assert(handler);
pn_list_t *events = pn_list(PN_VOID, 0);
- pn_handler_add(handler, test_handler(events));
+ pn_handler_add(handler, test_handler(reactor, events));
pn_reactor_run(reactor);
expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
pn_reactor_free(reactor);
@@ -141,12 +146,12 @@ static void test_reactor_connection(void) {
pn_reactor_t *reactor = pn_reactor();
assert(reactor);
pn_list_t *cevents = pn_list(PN_VOID, 0);
- pn_handler_t *tch = test_handler(cevents);
+ pn_handler_t *tch = test_handler(reactor, cevents);
pn_connection_t *connection = pn_reactor_connection(reactor, tch);
assert(connection);
pn_handler_t *root = pn_reactor_handler(reactor);
pn_list_t *revents = pn_list(PN_VOID, 0);
- pn_handler_add(root, test_handler(revents));
+ pn_handler_add(root, test_handler(reactor, revents));
pn_reactor_run(reactor);
expect(revents, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
expect(cevents, PN_CONNECTION_INIT, END);
@@ -394,7 +399,7 @@ static void test_reactor_schedule(void) {
pn_reactor_t *reactor = pn_reactor();
pn_handler_t *root = pn_reactor_handler(reactor);
pn_list_t *events = pn_list(PN_VOID, 0);
- pn_handler_add(root, test_handler(events));
+ pn_handler_add(root, test_handler(reactor, events));
pn_reactor_schedule(reactor, 0, NULL);
pn_reactor_run(reactor);
pn_reactor_free(reactor);
@@ -407,8 +412,8 @@ static void test_reactor_schedule_handler(void) {
pn_handler_t *root = pn_reactor_handler(reactor);
pn_list_t *events = pn_list(PN_VOID, 0);
pn_list_t *tevents = pn_list(PN_VOID, 0);
- pn_handler_add(root, test_handler(events));
- pn_handler_t *th = test_handler(tevents);
+ pn_handler_add(root, test_handler(reactor, events));
+ pn_handler_t *th = test_handler(reactor, tevents);
pn_reactor_schedule(reactor, 0, th);
pn_reactor_run(reactor);
pn_reactor_free(reactor);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org