You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/01/14 00:32:45 UTC

qpid-proton git commit: added pn_event_reactor

Repository: qpid-proton
Updated Branches:
  refs/heads/master 99df0a33d -> fe31dcea7


added pn_event_reactor


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fe31dcea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fe31dcea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fe31dcea

Branch: refs/heads/master
Commit: fe31dcea7033eea2faf2700d287ae9f94d78abda
Parents: 99df0a3
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Tue Jan 13 18:32:29 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Tue Jan 13 18:32:29 2015 -0500

----------------------------------------------------------------------
 proton-c/include/proton/reactor.h |  2 +
 proton-c/src/reactor/acceptor.c   |  1 +
 proton-c/src/reactor/reactor.c    | 77 ++++++++++++++++++++++++++++++++--
 proton-c/src/reactor/reactor.h    |  2 +
 proton-c/src/tests/reactor.c      | 27 +++++++-----
 5 files changed, 94 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
index e5d953f..1b21195 100644
--- a/proton-c/include/proton/reactor.h
+++ b/proton-c/include/proton/reactor.h
@@ -85,6 +85,8 @@ PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
 
 PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
 
+PN_EXTERN pn_reactor_t *pn_event_reactor(pn_event_t *event);
+
 /** @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c
index 889016c..34446a2 100644
--- a/proton-c/src/reactor/acceptor.c
+++ b/proton-c/src/reactor/acceptor.c
@@ -63,6 +63,7 @@ pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, cons
   pn_socket_t socket = pn_listen(pn_reactor_io(reactor), host, port);
   pni_selectable_set_fd(sel, socket);
   pni_selectable_set_context(sel, reactor);
+  pni_record_init_reactor(pn_selectable_attachments(sel), reactor);
   pni_record_init_handler(pn_selectable_attachments(sel), handler);
   pn_reactor_update(reactor, sel);
   return (pn_acceptor_t *) sel;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index 3857dd7..82b903d 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -23,15 +23,16 @@
 #include <proton/io.h>
 #include <proton/selector.h>
 #include <proton/event.h>
-#include <proton/reactor.h>
 #include <proton/transport.h>
 #include <proton/connection.h>
 #include <proton/session.h>
 #include <proton/link.h>
+#include <proton/delivery.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <assert.h>
 
+#include "reactor.h"
 #include "selectable.h"
 #include "platform.h"
 
@@ -173,8 +174,21 @@ void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event);
 void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event);
 void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event);
 
-static void pni_reactor_dispatch(pn_reactor_t *reactor, pn_event_t *event) {
+static void pni_reactor_dispatch_pre(pn_reactor_t *reactor, pn_event_t *event) {
   assert(reactor);
+  assert(event);
+  switch (pn_event_type(event)) {
+  case PN_CONNECTION_INIT:
+    pni_record_init_reactor(pn_connection_attachments(pn_event_connection(event)), reactor);
+    break;
+  default:
+    break;
+  }
+}
+
+static void pni_reactor_dispatch_post(pn_reactor_t *reactor, pn_event_t *event) {
+  assert(reactor);
+  assert(event);
   switch (pn_event_type(event)) {
   case PN_TRANSPORT:
     pni_handle_transport(reactor, event);
@@ -202,6 +216,58 @@ void pni_record_init_handler(pn_record_t *record, pn_handler_t *handler) {
   pn_record_set(record, PN_HANDLER, handler);
 }
 
+static void *pni_reactor = NULL;
+#define PN_REACTOR ((pn_handle_t) &pni_reactor)
+
+pn_reactor_t *pni_record_get_reactor(pn_record_t *record) {
+  return (pn_reactor_t *) pn_record_get(record, PN_REACTOR);
+}
+
+void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor) {
+  pn_record_def(record, PN_REACTOR, PN_WEAKREF);
+  pn_record_set(record, PN_REACTOR, reactor);
+}
+
+static pn_connection_t *pni_object_connection(const pn_class_t *clazz, void *object) {
+  switch (pn_class_id(clazz)) {
+  case CID_pn_delivery:
+    return pn_session_connection(pn_link_session(pn_delivery_link((pn_delivery_t *) object)));
+  case CID_pn_link:
+    return pn_session_connection(pn_link_session((pn_link_t *) object));
+  case CID_pn_session:
+    return pn_session_connection((pn_session_t *) object);
+  case CID_pn_connection:
+    return (pn_connection_t *) object;
+  case CID_pn_transport:
+    return pn_transport_connection((pn_transport_t *) object);
+  default:
+    return NULL;
+  }
+}
+
+pn_reactor_t *pn_event_reactor(pn_event_t *event) {
+  const pn_class_t *clazz = pn_event_class(event);
+  void *context = pn_event_context(event);
+  switch (pn_class_id(clazz)) {
+  case CID_pn_reactor:
+    return (pn_reactor_t *) context;
+  case CID_pn_task:
+    return pni_record_get_reactor(pn_task_attachments((pn_task_t *) context));
+  case CID_pn_delivery:
+  case CID_pn_link:
+  case CID_pn_session:
+  case CID_pn_connection:
+  case CID_pn_transport:
+    {
+      pn_connection_t *conn = pni_object_connection(pn_event_class(event), context);
+      pn_record_t *record = pn_connection_attachments(conn);
+      return pni_record_get_reactor(record);
+    }
+  default:
+    return NULL;
+  }
+}
+
 pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler) {
   pn_handler_t *handler = NULL;
   pn_link_t *link = pn_event_link(event);
@@ -229,7 +295,9 @@ pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler)
 pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *handler) {
   pn_timer_t *timer = pni_timer(reactor->timer);
   pn_task_t *task = pn_timer_schedule(timer, reactor->now + delay);
-  pni_record_init_handler(pn_task_attachments(task), handler);
+  pn_record_t *record = pn_task_attachments(task);
+  pni_record_init_reactor(record, reactor);
+  pni_record_init_handler(record, handler);
   pn_reactor_update(reactor, reactor->timer);
   return task;
 }
@@ -238,9 +306,10 @@ void pn_reactor_process(pn_reactor_t *reactor) {
   assert(reactor);
   pn_event_t *event;
   while ((event = pn_collector_peek(reactor->collector))) {
+    pni_reactor_dispatch_pre(reactor, event);
     pn_handler_t *handler = pn_event_handler(event, reactor->handler);
     pn_handler_dispatch(handler, event);
-    pni_reactor_dispatch(reactor, event);
+    pni_reactor_dispatch_post(reactor, event);
     pn_collector_pop(reactor->collector);
   }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/reactor/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.h b/proton-c/src/reactor/reactor.h
index e21274d..398eb8b 100644
--- a/proton-c/src/reactor/reactor.h
+++ b/proton-c/src/reactor/reactor.h
@@ -26,5 +26,7 @@
 
 pn_handler_t *pni_record_get_handler(pn_record_t *record);
 void pni_record_init_handler(pn_record_t *record, pn_handler_t *handler);
+void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor);
+
 
 #endif /* src/reactor.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index 4eff533..ce8c5cf 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -46,6 +46,7 @@ static void test_reactor_run(void) {
 }
 
 typedef struct {
+  pn_reactor_t *reactor;
   pn_list_t *events;
 } pni_test_handler_t;
 
@@ -54,11 +55,15 @@ pni_test_handler_t *thmem(pn_handler_t *handler) {
 }
 
 void test_dispatch(pn_handler_t *handler, pn_event_t *event) {
-  pn_list_add(thmem(handler)->events, (void *) pn_event_type(event));
+  pni_test_handler_t *th = thmem(handler);
+  pn_reactor_t *reactor = pn_event_reactor(event);
+  assert(reactor == th->reactor);
+  pn_list_add(th->events, (void *) pn_event_type(event));
 }
 
-pn_handler_t *test_handler(pn_list_t *events) {
+pn_handler_t *test_handler(pn_reactor_t *reactor, pn_list_t *events) {
   pn_handler_t *handler = pn_handler_new(test_dispatch, sizeof(pni_test_handler_t), NULL);
+  thmem(handler)->reactor = reactor;
   thmem(handler)->events = events;
   return handler;
 }
@@ -89,7 +94,7 @@ static void test_reactor_handler(void) {
   pn_handler_t *handler = pn_reactor_handler(reactor);
   assert(handler);
   pn_list_t *events = pn_list(PN_VOID, 0);
-  pn_handler_t *th = test_handler(events);
+  pn_handler_t *th = test_handler(reactor, events);
   pn_handler_add(handler, th);
   pn_decref(th);
   pn_free(reactor);
@@ -103,7 +108,7 @@ static void test_reactor_handler_free(void) {
   pn_handler_t *handler = pn_reactor_handler(reactor);
   assert(handler);
   pn_list_t *events = pn_list(PN_VOID, 0);
-  pn_handler_add(handler, test_handler(events));
+  pn_handler_add(handler, test_handler(reactor, events));
   pn_reactor_free(reactor);
   expect(events, END);
   pn_free(events);
@@ -115,7 +120,7 @@ static void test_reactor_handler_run(void) {
   pn_handler_t *handler = pn_reactor_handler(reactor);
   assert(handler);
   pn_list_t *events = pn_list(PN_VOID, 0);
-  pn_handler_t *th = test_handler(events);
+  pn_handler_t *th = test_handler(reactor, events);
   pn_handler_add(handler, th);
   pn_reactor_run(reactor);
   expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
@@ -130,7 +135,7 @@ static void test_reactor_handler_run_free(void) {
   pn_handler_t *handler = pn_reactor_handler(reactor);
   assert(handler);
   pn_list_t *events = pn_list(PN_VOID, 0);
-  pn_handler_add(handler, test_handler(events));
+  pn_handler_add(handler, test_handler(reactor, events));
   pn_reactor_run(reactor);
   expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
   pn_reactor_free(reactor);
@@ -141,12 +146,12 @@ static void test_reactor_connection(void) {
   pn_reactor_t *reactor = pn_reactor();
   assert(reactor);
   pn_list_t *cevents = pn_list(PN_VOID, 0);
-  pn_handler_t *tch = test_handler(cevents);
+  pn_handler_t *tch = test_handler(reactor, cevents);
   pn_connection_t *connection = pn_reactor_connection(reactor, tch);
   assert(connection);
   pn_handler_t *root = pn_reactor_handler(reactor);
   pn_list_t *revents = pn_list(PN_VOID, 0);
-  pn_handler_add(root, test_handler(revents));
+  pn_handler_add(root, test_handler(reactor, revents));
   pn_reactor_run(reactor);
   expect(revents, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
   expect(cevents, PN_CONNECTION_INIT, END);
@@ -394,7 +399,7 @@ static void test_reactor_schedule(void) {
   pn_reactor_t *reactor = pn_reactor();
   pn_handler_t *root = pn_reactor_handler(reactor);
   pn_list_t *events = pn_list(PN_VOID, 0);
-  pn_handler_add(root, test_handler(events));
+  pn_handler_add(root, test_handler(reactor, events));
   pn_reactor_schedule(reactor, 0, NULL);
   pn_reactor_run(reactor);
   pn_reactor_free(reactor);
@@ -407,8 +412,8 @@ static void test_reactor_schedule_handler(void) {
   pn_handler_t *root = pn_reactor_handler(reactor);
   pn_list_t *events = pn_list(PN_VOID, 0);
   pn_list_t *tevents = pn_list(PN_VOID, 0);
-  pn_handler_add(root, test_handler(events));
-  pn_handler_t *th = test_handler(tevents);
+  pn_handler_add(root, test_handler(reactor, events));
+  pn_handler_t *th = test_handler(reactor, tevents);
   pn_reactor_schedule(reactor, 0, th);
   pn_reactor_run(reactor);
   pn_reactor_free(reactor);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org