You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/01/21 18:19:45 UTC

qpid-proton git commit: factored I/O code out into a separate handler and added bindings to allow pure python I/O

Repository: qpid-proton
Updated Branches:
  refs/heads/master 95d04400d -> f023c63b3


factored I/O code out into a separate handler and added bindings to allow pure python I/O


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f023c63b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f023c63b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f023c63b

Branch: refs/heads/master
Commit: f023c63b3af26b27506fd2219811946e65a03e44
Parents: 95d0440
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Wed Jan 21 12:19:31 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Wed Jan 21 12:19:31 2015 -0500

----------------------------------------------------------------------
 proton-c/CMakeLists.txt                     |  1 +
 proton-c/bindings/python/proton/__init__.py | 25 +++---
 proton-c/bindings/python/proton/reactors.py | 40 ++++++++--
 proton-c/include/proton/handlers.h          |  2 +
 proton-c/include/proton/reactor.h           |  4 +
 proton-c/src/handlers/iohandler.c           | 96 ++++++++++++++++++++++++
 proton-c/src/reactor/reactor.c              | 94 ++++++++++-------------
 proton-c/src/tests/reactor.c                |  5 +-
 8 files changed, 193 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 6a7feed..3dd5b86 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -311,6 +311,7 @@ set (qpid-proton-core
   src/reactor/timer.c
 
   src/handlers/handshaker.c
+  src/handlers/iohandler.c
   src/handlers/flowcontroller.c
 
   src/messenger/messenger.c

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 22ffa6d..17cc670 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -33,7 +33,7 @@ The proton APIs consist of the following classes:
 from cproton import *
 from wrapper import Wrapper
 
-import weakref, re, socket
+import weakref, re, socket, sys
 try:
   import uuid
 except ImportError:
@@ -3448,15 +3448,22 @@ class Handler(object):
 
 class _cadapter:
 
-  def __init__(self, handler):
+  def __init__(self, handler, errors=None):
     self.handler = handler
+    self.errors = errors
 
   def __call__(self, cevent):
     ev = Event.wrap(cevent)
-    ev.dispatch(self.handler)
-    if hasattr(self.handler, "handlers"):
-      for h in self.handler.handlers:
-        ev.dispatch(h)
+    try:
+      ev.dispatch(self.handler)
+      if hasattr(self.handler, "handlers"):
+        for h in self.handler.handlers:
+          ev.dispatch(h)
+    except:
+      if self.errors is None:
+        raise
+      else:
+        self.errors.append(sys.exc_info())
 
 class WrappedHandler(Wrapper):
 
@@ -3464,11 +3471,11 @@ class WrappedHandler(Wrapper):
     Wrapper.__init__(self, impl_or_constructor)
 
   def add(self, handler):
-    impl = _chandler(handler)
+    impl = _chandler(handler, getattr(self, "errors", None))
     pn_handler_add(self._impl, impl)
     pn_decref(impl)
 
-def _chandler(obj):
+def _chandler(obj, errors=None):
   if obj is None:
     return None
   elif isinstance(obj, WrappedHandler):
@@ -3476,7 +3483,7 @@ def _chandler(obj):
     pn_incref(impl)
     return impl
   else:
-    return pn_pyhandler(_cadapter(obj))
+    return pn_pyhandler(_cadapter(obj, errors))
 
 ###
 # Driver

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index 612c38b..493af71 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -835,7 +835,8 @@ class Container(object):
     def do_work(self, timeout=None):
         return self.loop.do_work(timeout)
 
-from proton import WrappedHandler, _chandler, Connection, secs2millis, Selectable
+import traceback
+from proton import WrappedHandler, _chandler, Connection, secs2millis, millis2secs, Selectable
 from wrapper import Wrapper
 from cproton import *
 
@@ -862,6 +863,11 @@ class Acceptor(Wrapper):
     def close(self):
         pn_acceptor_close(self._impl)
 
+def _wrap_handler(reactor, impl):
+    wrapped = WrappedHandler(impl)
+    wrapped.__dict__["errors"] = reactor.errors
+    return wrapped
+
 class Reactor(Wrapper):
 
     @staticmethod
@@ -877,31 +883,51 @@ class Reactor(Wrapper):
             self.handler.add(h)
 
     def _init(self):
-        pass
+        self.errors = []
+
+    def global_(self, handler):
+        impl = _chandler(handler, self.errors)
+        pn_reactor_global(self._impl, impl)
+        pn_decref(impl)
+
+    @property
+    def timeout(self):
+        return millis2secs(pn_reactor_timeout(self._impl))
+
+    def yield_(self):
+        pn_reactor_yield(self._impl)
+
+    def mark(self):
+        pn_reactor_mark(self._impl)
 
     @property
     def handler(self):
-        return WrappedHandler(pn_reactor_handler(self._impl))
+        return _wrap_handler(self, pn_reactor_handler(self._impl))
 
     def run(self):
         pn_reactor_start(self._impl)
-        while pn_reactor_work(self._impl, 3142): pass
+        while pn_reactor_work(self._impl, 3142):
+            if self.errors:
+                for exc, value, tb in self.errors[:-1]:
+                    traceback.print_exception(exc, value, tb)
+                exc, value, tb = self.errors[-1]
+                raise exc, value, tb
         pn_reactor_stop(self._impl)
 
     def schedule(self, delay, task):
-        impl = _chandler(task)
+        impl = _chandler(task, self.errors)
         task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
         pn_decref(impl)
         return task
 
     def acceptor(self, host, port, handler=None):
-        impl = _chandler(handler)
+        impl = _chandler(handler, self.errors)
         result = Acceptor(pn_reactor_acceptor(self._impl, host, port, impl))
         pn_decref(impl)
         return result
 
     def connection(self, handler=None):
-        impl = _chandler(handler)
+        impl = _chandler(handler, self.errors)
         result = Connection.wrap(pn_reactor_connection(self._impl, impl))
         pn_decref(impl)
         return result

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/include/proton/handlers.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/handlers.h b/proton-c/include/proton/handlers.h
index 99242d1..304d0e6 100644
--- a/proton-c/include/proton/handlers.h
+++ b/proton-c/include/proton/handlers.h
@@ -41,9 +41,11 @@ extern "C" {
  */
 
 typedef pn_handler_t pn_handshaker_t;
+typedef pn_handler_t pn_iohandler_t;
 typedef pn_handler_t pn_flowcontroller_t;
 
 PN_EXTERN pn_handshaker_t *pn_handshaker(void);
+PN_EXTERN pn_iohandler_t *pn_iohandler(void);
 PN_EXTERN pn_flowcontroller_t *pn_flowcontroller(int window);
 
 /** @}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
index e04ba6d..8447355 100644
--- a/proton-c/include/proton/reactor.h
+++ b/proton-c/include/proton/reactor.h
@@ -57,8 +57,12 @@ PN_EXTERN void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event);
 
 PN_EXTERN pn_reactor_t *pn_reactor(void);
 PN_EXTERN pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor);
+PN_EXTERN int pn_reactor_timeout(pn_reactor_t *reactor);
+PN_EXTERN void pn_reactor_mark(pn_reactor_t *reactor);
+PN_EXTERN void pn_reactor_yield(pn_reactor_t *reactor);
 PN_EXTERN void pn_reactor_free(pn_reactor_t *reactor);
 PN_EXTERN pn_collector_t *pn_reactor_collector(pn_reactor_t *reactor);
+PN_EXTERN void pn_reactor_global(pn_reactor_t *reactor, pn_handler_t *handler);
 PN_EXTERN pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor);
 PN_EXTERN pn_io_t *pn_reactor_io(pn_reactor_t *reactor);
 PN_EXTERN pn_list_t *pn_reactor_children(pn_reactor_t *reactor);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/src/handlers/iohandler.c
----------------------------------------------------------------------
diff --git a/proton-c/src/handlers/iohandler.c b/proton-c/src/handlers/iohandler.c
new file mode 100644
index 0000000..02e0d0b
--- /dev/null
+++ b/proton-c/src/handlers/iohandler.c
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/handlers.h>
+#include <proton/selector.h>
+#include <assert.h>
+
+static void *pni_selector_handle = NULL;
+
+#define PN_SELECTOR ((pn_handle_t) &pni_selector_handle)
+
+void pni_handle_quiesced(pn_reactor_t *reactor, pn_selector_t *selector) {
+  pn_selector_select(selector, pn_reactor_timeout(reactor));
+  pn_selectable_t *sel;
+  int events;
+  pn_reactor_mark(reactor);
+  while ((sel = pn_selector_next(selector, &events))) {
+    if (events & PN_READABLE) {
+      pn_selectable_readable(sel);
+    }
+    if (events & PN_WRITABLE) {
+      pn_selectable_writable(sel);
+    }
+    if (events & PN_EXPIRED) {
+      pn_selectable_expired(sel);
+    }
+  }
+  pn_reactor_yield(reactor);
+}
+
+void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event);
+void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event);
+
+static void pn_iodispatch(pn_iohandler_t *handler, pn_event_t *event) {
+  pn_reactor_t *reactor = pn_event_reactor(event);
+  pn_record_t *record = pn_reactor_attachments(reactor);
+  pn_selector_t *selector = (pn_selector_t *) pn_record_get(record, PN_SELECTOR);
+  if (!selector) {
+    selector = pn_io_selector(pn_reactor_io(reactor));
+    pn_record_def(record, PN_SELECTOR, PN_OBJECT);
+    pn_record_set(record, PN_SELECTOR, selector);
+    pn_decref(selector);
+  }
+  switch (pn_event_type(event)) {
+  case PN_SELECTABLE_INIT:
+    {
+      pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
+      pn_selector_add(selector, sel);
+    }
+    break;
+  case PN_SELECTABLE_UPDATED:
+    {
+      pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
+      if (pn_selectable_is_terminal(sel)) {
+        pn_selector_remove(selector, sel);
+        pn_selectable_release(sel);
+      } else {
+        pn_selector_update(selector, sel);
+      }
+    }
+    break;
+  case PN_CONNECTION_LOCAL_OPEN:
+    pni_handle_open(reactor, event);
+    break;
+  case PN_TRANSPORT:
+    pni_handle_transport(reactor, event);
+    break;
+  case PN_REACTOR_QUIESCED:
+    pni_handle_quiesced(reactor, selector);
+    break;
+  default:
+    break;
+  }
+}
+
+pn_iohandler_t *pn_iohandler(void) {
+  return pn_handler(pn_iodispatch);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index 4f90865..536bb11 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -20,8 +20,8 @@
  */
 
 #include <proton/object.h>
+#include <proton/handlers.h>
 #include <proton/io.h>
-#include <proton/selector.h>
 #include <proton/event.h>
 #include <proton/transport.h>
 #include <proton/connection.h>
@@ -39,8 +39,8 @@
 struct pn_reactor_t {
   pn_record_t *attachments;
   pn_io_t *io;
-  pn_selector_t *selector;
   pn_collector_t *collector;
+  pn_handler_t *global;
   pn_handler_t *handler;
   pn_list_t *children;
   pn_timer_t *timer;
@@ -49,9 +49,10 @@ struct pn_reactor_t {
   pn_timestamp_t now;
   int selectables;
   int timeout;
+  bool yield;
 };
 
-static void pn_reactor_mark(pn_reactor_t *reactor) {
+void pn_reactor_mark(pn_reactor_t *reactor) {
   assert(reactor);
   reactor->now = pn_i_now();
 }
@@ -59,8 +60,8 @@ static void pn_reactor_mark(pn_reactor_t *reactor) {
 static void pn_reactor_initialize(pn_reactor_t *reactor) {
   reactor->attachments = pn_record();
   reactor->io = pn_io();
-  reactor->selector = pn_io_selector(reactor->io);
   reactor->collector = pn_collector();
+  reactor->global = pn_iohandler();
   reactor->handler = pn_handler(NULL);
   reactor->children = pn_list(PN_OBJECT, 0);
   reactor->timer = pn_timer(reactor->collector);
@@ -68,16 +69,17 @@ static void pn_reactor_initialize(pn_reactor_t *reactor) {
   reactor->previous = PN_EVENT_NONE;
   reactor->selectables = 0;
   reactor->timeout = 0;
+  reactor->yield = false;
   pn_reactor_mark(reactor);
 }
 
 static void pn_reactor_finalize(pn_reactor_t *reactor) {
   pn_decref(reactor->attachments);
   pn_decref(reactor->collector);
+  pn_decref(reactor->global);
   pn_decref(reactor->handler);
   pn_decref(reactor->children);
   pn_decref(reactor->timer);
-  pn_decref(reactor->selector);
   pn_decref(reactor->io);
 }
 
@@ -96,6 +98,11 @@ pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor) {
   return reactor->attachments;
 }
 
+int pn_reactor_timeout(pn_reactor_t *reactor) {
+  assert(reactor);
+  return reactor->timeout;
+}
+
 void pn_reactor_free(pn_reactor_t *reactor) {
   if (reactor) {
     pn_collector_release(reactor->collector);
@@ -105,6 +112,13 @@ void pn_reactor_free(pn_reactor_t *reactor) {
   }
 }
 
+void pn_reactor_global(pn_reactor_t *reactor, pn_handler_t *handler) {
+  assert(reactor);
+  pn_decref(reactor->global);
+  reactor->global = handler;
+  pn_incref(reactor->global);
+}
+
 pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor) {
   assert(reactor);
   return reactor->handler;
@@ -150,28 +164,6 @@ void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable) {
   pn_collector_put(reactor->collector, PN_OBJECT, selectable, PN_SELECTABLE_UPDATED);
 }
 
-void pni_handle_quiesced(pn_reactor_t *reactor) {
-  pn_selector_select(reactor->selector, reactor->timeout);
-  pn_selectable_t *sel;
-  int events;
-  pn_reactor_mark(reactor);
-  while ((sel = pn_selector_next(reactor->selector, &events))) {
-    if (events & PN_READABLE) {
-      pn_selectable_readable(sel);
-    }
-    if (events & PN_WRITABLE) {
-      pn_selectable_writable(sel);
-    }
-    if (events & PN_EXPIRED) {
-      pn_selectable_expired(sel);
-    }
-  }
-}
-
-void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event);
-void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event);
-void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event);
-
 static void pni_reactor_dispatch_pre(pn_reactor_t *reactor, pn_event_t *event) {
   assert(reactor);
   assert(event);
@@ -184,39 +176,15 @@ static void pni_reactor_dispatch_pre(pn_reactor_t *reactor, pn_event_t *event) {
   }
 }
 
+void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event);
+
 static void pni_reactor_dispatch_post(pn_reactor_t *reactor, pn_event_t *event) {
   assert(reactor);
   assert(event);
   switch (pn_event_type(event)) {
-  case PN_SELECTABLE_INIT:
-    {
-      pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
-      pn_selector_add(reactor->selector, sel);
-    }
-    break;
-  case PN_SELECTABLE_UPDATED:
-    {
-      pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
-      if (pn_selectable_is_terminal(sel)) {
-        pn_selector_remove(reactor->selector, sel);
-        pn_selectable_release(sel);
-      } else {
-        pn_selector_update(reactor->selector, sel);
-      }
-    }
-    break;
-  case PN_TRANSPORT:
-    pni_handle_transport(reactor, event);
-    break;
-  case PN_CONNECTION_LOCAL_OPEN:
-    pni_handle_open(reactor, event);
-    break;
   case PN_CONNECTION_FINAL:
     pni_handle_final(reactor, event);
     break;
-  case PN_REACTOR_QUIESCED:
-    pni_handle_quiesced(reactor);
-    break;
   default:
     break;
   }
@@ -344,22 +312,34 @@ bool pni_reactor_more(pn_reactor_t *reactor) {
   return pn_timer_tasks(reactor->timer) || reactor->selectables > 1;
 }
 
+void pn_reactor_yield(pn_reactor_t *reactor) {
+  assert(reactor);
+  reactor->yield = true;
+}
+
 bool pn_reactor_process(pn_reactor_t *reactor) {
   assert(reactor);
   pn_reactor_mark(reactor);
+  pn_event_type_t previous = PN_EVENT_NONE;
   while (true) {
     pn_event_t *event = pn_collector_peek(reactor->collector);
-    //    pni_event_print(event);
+    //pni_event_print(event);
     if (event) {
+      if (reactor->yield) {
+        reactor->yield = false;
+        return true;
+      }
+      reactor->yield = false;
       pni_reactor_dispatch_pre(reactor, event);
       pn_handler_t *handler = pn_event_handler(event, reactor->handler);
       pn_handler_dispatch(handler, event);
+      pn_handler_dispatch(reactor->global, event);
       pni_reactor_dispatch_post(reactor, event);
-      reactor->previous = pn_event_type(event);
+      previous = reactor->previous = pn_event_type(event);
       pn_collector_pop(reactor->collector);
     } else {
       if (pni_reactor_more(reactor)) {
-        if (reactor->previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) {
+        if (previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) {
           pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_QUIESCED);
         } else {
           return true;
@@ -380,6 +360,8 @@ bool pn_reactor_process(pn_reactor_t *reactor) {
 static void pni_timer_expired(pn_selectable_t *sel) {
   pn_reactor_t *reactor = pni_reactor(sel);
   pn_timer_tick(reactor->timer, reactor->now);
+  pn_selectable_set_deadline(sel, pn_timer_deadline(reactor->timer));
+  pn_reactor_update(reactor, sel);
 }
 
 pn_selectable_t *pni_timer_selectable(pn_reactor_t *reactor) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index ff83fa7..5c30b1e 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -420,7 +420,8 @@ static void test_reactor_schedule(void) {
   pn_reactor_run(reactor);
   pn_reactor_free(reactor);
   expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED,
-         PN_TIMER_TASK, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
+         PN_TIMER_TASK, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL,
+         END);
   pn_free(events);
 }
 
@@ -436,7 +437,7 @@ static void test_reactor_schedule_handler(void) {
   pn_reactor_free(reactor);
   pn_handler_free(th);
   expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED,
-         PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
+         PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   expect(tevents, PN_TIMER_TASK, END);
   pn_free(events);
   pn_free(tevents);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org