You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/01/21 18:19:45 UTC
qpid-proton git commit: factored I/O code out into a separate handler
and added bindings to allow pure python I/O
Repository: qpid-proton
Updated Branches:
refs/heads/master 95d04400d -> f023c63b3
factored I/O code out into a separate handler and added bindings to allow pure python I/O
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f023c63b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f023c63b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f023c63b
Branch: refs/heads/master
Commit: f023c63b3af26b27506fd2219811946e65a03e44
Parents: 95d0440
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Wed Jan 21 12:19:31 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Wed Jan 21 12:19:31 2015 -0500
----------------------------------------------------------------------
proton-c/CMakeLists.txt | 1 +
proton-c/bindings/python/proton/__init__.py | 25 +++---
proton-c/bindings/python/proton/reactors.py | 40 ++++++++--
proton-c/include/proton/handlers.h | 2 +
proton-c/include/proton/reactor.h | 4 +
proton-c/src/handlers/iohandler.c | 96 ++++++++++++++++++++++++
proton-c/src/reactor/reactor.c | 94 ++++++++++-------------
proton-c/src/tests/reactor.c | 5 +-
8 files changed, 193 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 6a7feed..3dd5b86 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -311,6 +311,7 @@ set (qpid-proton-core
src/reactor/timer.c
src/handlers/handshaker.c
+ src/handlers/iohandler.c
src/handlers/flowcontroller.c
src/messenger/messenger.c
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 22ffa6d..17cc670 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -33,7 +33,7 @@ The proton APIs consist of the following classes:
from cproton import *
from wrapper import Wrapper
-import weakref, re, socket
+import weakref, re, socket, sys
try:
import uuid
except ImportError:
@@ -3448,15 +3448,22 @@ class Handler(object):
class _cadapter:
- def __init__(self, handler):
+ def __init__(self, handler, errors=None):
self.handler = handler
+ self.errors = errors
def __call__(self, cevent):
ev = Event.wrap(cevent)
- ev.dispatch(self.handler)
- if hasattr(self.handler, "handlers"):
- for h in self.handler.handlers:
- ev.dispatch(h)
+ try:
+ ev.dispatch(self.handler)
+ if hasattr(self.handler, "handlers"):
+ for h in self.handler.handlers:
+ ev.dispatch(h)
+ except:
+ if self.errors is None:
+ raise
+ else:
+ self.errors.append(sys.exc_info())
class WrappedHandler(Wrapper):
@@ -3464,11 +3471,11 @@ class WrappedHandler(Wrapper):
Wrapper.__init__(self, impl_or_constructor)
def add(self, handler):
- impl = _chandler(handler)
+ impl = _chandler(handler, getattr(self, "errors", None))
pn_handler_add(self._impl, impl)
pn_decref(impl)
-def _chandler(obj):
+def _chandler(obj, errors=None):
if obj is None:
return None
elif isinstance(obj, WrappedHandler):
@@ -3476,7 +3483,7 @@ def _chandler(obj):
pn_incref(impl)
return impl
else:
- return pn_pyhandler(_cadapter(obj))
+ return pn_pyhandler(_cadapter(obj, errors))
###
# Driver
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index 612c38b..493af71 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -835,7 +835,8 @@ class Container(object):
def do_work(self, timeout=None):
return self.loop.do_work(timeout)
-from proton import WrappedHandler, _chandler, Connection, secs2millis, Selectable
+import traceback
+from proton import WrappedHandler, _chandler, Connection, secs2millis, millis2secs, Selectable
from wrapper import Wrapper
from cproton import *
@@ -862,6 +863,11 @@ class Acceptor(Wrapper):
def close(self):
pn_acceptor_close(self._impl)
+def _wrap_handler(reactor, impl):
+ wrapped = WrappedHandler(impl)
+ wrapped.__dict__["errors"] = reactor.errors
+ return wrapped
+
class Reactor(Wrapper):
@staticmethod
@@ -877,31 +883,51 @@ class Reactor(Wrapper):
self.handler.add(h)
def _init(self):
- pass
+ self.errors = []
+
+ def global_(self, handler):
+ impl = _chandler(handler, self.errors)
+ pn_reactor_global(self._impl, impl)
+ pn_decref(impl)
+
+ @property
+ def timeout(self):
+ return millis2secs(pn_reactor_timeout(self._impl))
+
+ def yield_(self):
+ pn_reactor_yield(self._impl)
+
+ def mark(self):
+ pn_reactor_mark(self._impl)
@property
def handler(self):
- return WrappedHandler(pn_reactor_handler(self._impl))
+ return _wrap_handler(self, pn_reactor_handler(self._impl))
def run(self):
pn_reactor_start(self._impl)
- while pn_reactor_work(self._impl, 3142): pass
+ while pn_reactor_work(self._impl, 3142):
+ if self.errors:
+ for exc, value, tb in self.errors[:-1]:
+ traceback.print_exception(exc, value, tb)
+ exc, value, tb = self.errors[-1]
+ raise exc, value, tb
pn_reactor_stop(self._impl)
def schedule(self, delay, task):
- impl = _chandler(task)
+ impl = _chandler(task, self.errors)
task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
pn_decref(impl)
return task
def acceptor(self, host, port, handler=None):
- impl = _chandler(handler)
+ impl = _chandler(handler, self.errors)
result = Acceptor(pn_reactor_acceptor(self._impl, host, port, impl))
pn_decref(impl)
return result
def connection(self, handler=None):
- impl = _chandler(handler)
+ impl = _chandler(handler, self.errors)
result = Connection.wrap(pn_reactor_connection(self._impl, impl))
pn_decref(impl)
return result
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/include/proton/handlers.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/handlers.h b/proton-c/include/proton/handlers.h
index 99242d1..304d0e6 100644
--- a/proton-c/include/proton/handlers.h
+++ b/proton-c/include/proton/handlers.h
@@ -41,9 +41,11 @@ extern "C" {
*/
typedef pn_handler_t pn_handshaker_t;
+typedef pn_handler_t pn_iohandler_t;
typedef pn_handler_t pn_flowcontroller_t;
PN_EXTERN pn_handshaker_t *pn_handshaker(void);
+PN_EXTERN pn_iohandler_t *pn_iohandler(void);
PN_EXTERN pn_flowcontroller_t *pn_flowcontroller(int window);
/** @}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
index e04ba6d..8447355 100644
--- a/proton-c/include/proton/reactor.h
+++ b/proton-c/include/proton/reactor.h
@@ -57,8 +57,12 @@ PN_EXTERN void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event);
PN_EXTERN pn_reactor_t *pn_reactor(void);
PN_EXTERN pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor);
+PN_EXTERN int pn_reactor_timeout(pn_reactor_t *reactor);
+PN_EXTERN void pn_reactor_mark(pn_reactor_t *reactor);
+PN_EXTERN void pn_reactor_yield(pn_reactor_t *reactor);
PN_EXTERN void pn_reactor_free(pn_reactor_t *reactor);
PN_EXTERN pn_collector_t *pn_reactor_collector(pn_reactor_t *reactor);
+PN_EXTERN void pn_reactor_global(pn_reactor_t *reactor, pn_handler_t *handler);
PN_EXTERN pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor);
PN_EXTERN pn_io_t *pn_reactor_io(pn_reactor_t *reactor);
PN_EXTERN pn_list_t *pn_reactor_children(pn_reactor_t *reactor);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/src/handlers/iohandler.c
----------------------------------------------------------------------
diff --git a/proton-c/src/handlers/iohandler.c b/proton-c/src/handlers/iohandler.c
new file mode 100644
index 0000000..02e0d0b
--- /dev/null
+++ b/proton-c/src/handlers/iohandler.c
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/handlers.h>
+#include <proton/selector.h>
+#include <assert.h>
+
+static void *pni_selector_handle = NULL;
+
+#define PN_SELECTOR ((pn_handle_t) &pni_selector_handle)
+
+void pni_handle_quiesced(pn_reactor_t *reactor, pn_selector_t *selector) {
+ pn_selector_select(selector, pn_reactor_timeout(reactor));
+ pn_selectable_t *sel;
+ int events;
+ pn_reactor_mark(reactor);
+ while ((sel = pn_selector_next(selector, &events))) {
+ if (events & PN_READABLE) {
+ pn_selectable_readable(sel);
+ }
+ if (events & PN_WRITABLE) {
+ pn_selectable_writable(sel);
+ }
+ if (events & PN_EXPIRED) {
+ pn_selectable_expired(sel);
+ }
+ }
+ pn_reactor_yield(reactor);
+}
+
+void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event);
+void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event);
+
+static void pn_iodispatch(pn_iohandler_t *handler, pn_event_t *event) {
+ pn_reactor_t *reactor = pn_event_reactor(event);
+ pn_record_t *record = pn_reactor_attachments(reactor);
+ pn_selector_t *selector = (pn_selector_t *) pn_record_get(record, PN_SELECTOR);
+ if (!selector) {
+ selector = pn_io_selector(pn_reactor_io(reactor));
+ pn_record_def(record, PN_SELECTOR, PN_OBJECT);
+ pn_record_set(record, PN_SELECTOR, selector);
+ pn_decref(selector);
+ }
+ switch (pn_event_type(event)) {
+ case PN_SELECTABLE_INIT:
+ {
+ pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
+ pn_selector_add(selector, sel);
+ }
+ break;
+ case PN_SELECTABLE_UPDATED:
+ {
+ pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
+ if (pn_selectable_is_terminal(sel)) {
+ pn_selector_remove(selector, sel);
+ pn_selectable_release(sel);
+ } else {
+ pn_selector_update(selector, sel);
+ }
+ }
+ break;
+ case PN_CONNECTION_LOCAL_OPEN:
+ pni_handle_open(reactor, event);
+ break;
+ case PN_TRANSPORT:
+ pni_handle_transport(reactor, event);
+ break;
+ case PN_REACTOR_QUIESCED:
+ pni_handle_quiesced(reactor, selector);
+ break;
+ default:
+ break;
+ }
+}
+
+pn_iohandler_t *pn_iohandler(void) {
+ return pn_handler(pn_iodispatch);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index 4f90865..536bb11 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -20,8 +20,8 @@
*/
#include <proton/object.h>
+#include <proton/handlers.h>
#include <proton/io.h>
-#include <proton/selector.h>
#include <proton/event.h>
#include <proton/transport.h>
#include <proton/connection.h>
@@ -39,8 +39,8 @@
struct pn_reactor_t {
pn_record_t *attachments;
pn_io_t *io;
- pn_selector_t *selector;
pn_collector_t *collector;
+ pn_handler_t *global;
pn_handler_t *handler;
pn_list_t *children;
pn_timer_t *timer;
@@ -49,9 +49,10 @@ struct pn_reactor_t {
pn_timestamp_t now;
int selectables;
int timeout;
+ bool yield;
};
-static void pn_reactor_mark(pn_reactor_t *reactor) {
+void pn_reactor_mark(pn_reactor_t *reactor) {
assert(reactor);
reactor->now = pn_i_now();
}
@@ -59,8 +60,8 @@ static void pn_reactor_mark(pn_reactor_t *reactor) {
static void pn_reactor_initialize(pn_reactor_t *reactor) {
reactor->attachments = pn_record();
reactor->io = pn_io();
- reactor->selector = pn_io_selector(reactor->io);
reactor->collector = pn_collector();
+ reactor->global = pn_iohandler();
reactor->handler = pn_handler(NULL);
reactor->children = pn_list(PN_OBJECT, 0);
reactor->timer = pn_timer(reactor->collector);
@@ -68,16 +69,17 @@ static void pn_reactor_initialize(pn_reactor_t *reactor) {
reactor->previous = PN_EVENT_NONE;
reactor->selectables = 0;
reactor->timeout = 0;
+ reactor->yield = false;
pn_reactor_mark(reactor);
}
static void pn_reactor_finalize(pn_reactor_t *reactor) {
pn_decref(reactor->attachments);
pn_decref(reactor->collector);
+ pn_decref(reactor->global);
pn_decref(reactor->handler);
pn_decref(reactor->children);
pn_decref(reactor->timer);
- pn_decref(reactor->selector);
pn_decref(reactor->io);
}
@@ -96,6 +98,11 @@ pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor) {
return reactor->attachments;
}
+int pn_reactor_timeout(pn_reactor_t *reactor) {
+ assert(reactor);
+ return reactor->timeout;
+}
+
void pn_reactor_free(pn_reactor_t *reactor) {
if (reactor) {
pn_collector_release(reactor->collector);
@@ -105,6 +112,13 @@ void pn_reactor_free(pn_reactor_t *reactor) {
}
}
+void pn_reactor_global(pn_reactor_t *reactor, pn_handler_t *handler) {
+ assert(reactor);
+ pn_decref(reactor->global);
+ reactor->global = handler;
+ pn_incref(reactor->global);
+}
+
pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor) {
assert(reactor);
return reactor->handler;
@@ -150,28 +164,6 @@ void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable) {
pn_collector_put(reactor->collector, PN_OBJECT, selectable, PN_SELECTABLE_UPDATED);
}
-void pni_handle_quiesced(pn_reactor_t *reactor) {
- pn_selector_select(reactor->selector, reactor->timeout);
- pn_selectable_t *sel;
- int events;
- pn_reactor_mark(reactor);
- while ((sel = pn_selector_next(reactor->selector, &events))) {
- if (events & PN_READABLE) {
- pn_selectable_readable(sel);
- }
- if (events & PN_WRITABLE) {
- pn_selectable_writable(sel);
- }
- if (events & PN_EXPIRED) {
- pn_selectable_expired(sel);
- }
- }
-}
-
-void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event);
-void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event);
-void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event);
-
static void pni_reactor_dispatch_pre(pn_reactor_t *reactor, pn_event_t *event) {
assert(reactor);
assert(event);
@@ -184,39 +176,15 @@ static void pni_reactor_dispatch_pre(pn_reactor_t *reactor, pn_event_t *event) {
}
}
+void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event);
+
static void pni_reactor_dispatch_post(pn_reactor_t *reactor, pn_event_t *event) {
assert(reactor);
assert(event);
switch (pn_event_type(event)) {
- case PN_SELECTABLE_INIT:
- {
- pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
- pn_selector_add(reactor->selector, sel);
- }
- break;
- case PN_SELECTABLE_UPDATED:
- {
- pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
- if (pn_selectable_is_terminal(sel)) {
- pn_selector_remove(reactor->selector, sel);
- pn_selectable_release(sel);
- } else {
- pn_selector_update(reactor->selector, sel);
- }
- }
- break;
- case PN_TRANSPORT:
- pni_handle_transport(reactor, event);
- break;
- case PN_CONNECTION_LOCAL_OPEN:
- pni_handle_open(reactor, event);
- break;
case PN_CONNECTION_FINAL:
pni_handle_final(reactor, event);
break;
- case PN_REACTOR_QUIESCED:
- pni_handle_quiesced(reactor);
- break;
default:
break;
}
@@ -344,22 +312,34 @@ bool pni_reactor_more(pn_reactor_t *reactor) {
return pn_timer_tasks(reactor->timer) || reactor->selectables > 1;
}
+void pn_reactor_yield(pn_reactor_t *reactor) {
+ assert(reactor);
+ reactor->yield = true;
+}
+
bool pn_reactor_process(pn_reactor_t *reactor) {
assert(reactor);
pn_reactor_mark(reactor);
+ pn_event_type_t previous = PN_EVENT_NONE;
while (true) {
pn_event_t *event = pn_collector_peek(reactor->collector);
- // pni_event_print(event);
+ //pni_event_print(event);
if (event) {
+ if (reactor->yield) {
+ reactor->yield = false;
+ return true;
+ }
+ reactor->yield = false;
pni_reactor_dispatch_pre(reactor, event);
pn_handler_t *handler = pn_event_handler(event, reactor->handler);
pn_handler_dispatch(handler, event);
+ pn_handler_dispatch(reactor->global, event);
pni_reactor_dispatch_post(reactor, event);
- reactor->previous = pn_event_type(event);
+ previous = reactor->previous = pn_event_type(event);
pn_collector_pop(reactor->collector);
} else {
if (pni_reactor_more(reactor)) {
- if (reactor->previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) {
+ if (previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) {
pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_QUIESCED);
} else {
return true;
@@ -380,6 +360,8 @@ bool pn_reactor_process(pn_reactor_t *reactor) {
static void pni_timer_expired(pn_selectable_t *sel) {
pn_reactor_t *reactor = pni_reactor(sel);
pn_timer_tick(reactor->timer, reactor->now);
+ pn_selectable_set_deadline(sel, pn_timer_deadline(reactor->timer));
+ pn_reactor_update(reactor, sel);
}
pn_selectable_t *pni_timer_selectable(pn_reactor_t *reactor) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index ff83fa7..5c30b1e 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -420,7 +420,8 @@ static void test_reactor_schedule(void) {
pn_reactor_run(reactor);
pn_reactor_free(reactor);
expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED,
- PN_TIMER_TASK, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
+ PN_TIMER_TASK, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL,
+ END);
pn_free(events);
}
@@ -436,7 +437,7 @@ static void test_reactor_schedule_handler(void) {
pn_reactor_free(reactor);
pn_handler_free(th);
expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED,
- PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
+ PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
expect(tevents, PN_TIMER_TASK, END);
pn_free(events);
pn_free(tevents);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org