You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/01/05 04:40:35 UTC

[3/3] qpid-proton git commit: initial commit of C reactor

initial commit of C reactor


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ebe4e3d3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ebe4e3d3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ebe4e3d3

Branch: refs/heads/master
Commit: ebe4e3d3676d8fcceb3016d7e0b2abb2deff7927
Parents: 86e08ba
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Sun Jan 4 22:38:28 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Sun Jan 4 22:38:28 2015 -0500

----------------------------------------------------------------------
 proton-c/CMakeLists.txt                |   8 +
 proton-c/include/proton/cid.h          |   3 +
 proton-c/include/proton/event.h        |  11 +
 proton-c/include/proton/handlers.h     |  59 +++++
 proton-c/include/proton/reactor.h      |  83 ++++++++
 proton-c/src/events/event.c            |   4 +
 proton-c/src/handlers/flowcontroller.c |  65 ++++++
 proton-c/src/handlers/handshaker.c     | 103 +++++++++
 proton-c/src/messenger/messenger.c     |   2 +
 proton-c/src/reactor/acceptor.c        |  80 +++++++
 proton-c/src/reactor/connection.c      | 207 ++++++++++++++++++
 proton-c/src/reactor/handler.c         | 103 +++++++++
 proton-c/src/reactor/reactor.c         | 216 +++++++++++++++++++
 proton-c/src/tests/CMakeLists.txt      |   1 +
 proton-c/src/tests/reactor.c           | 320 ++++++++++++++++++++++++++++
 15 files changed, 1265 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 7e58b4c..35df99f 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -304,6 +304,14 @@ set (qpid-proton-core
   src/message/message.c
   src/sasl/sasl.c
 
+  src/reactor/reactor.c
+  src/reactor/handler.c
+  src/reactor/connection.c
+  src/reactor/acceptor.c
+
+  src/handlers/handshaker.c
+  src/handlers/flowcontroller.c
+
   src/messenger/messenger.c
   src/messenger/subscription.c
   src/messenger/store.c

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/include/proton/cid.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cid.h b/proton-c/include/proton/cid.h
index 5a06308..ca6172f 100644
--- a/proton-c/include/proton/cid.h
+++ b/proton-c/include/proton/cid.h
@@ -47,6 +47,9 @@ typedef enum {
 
   CID_pn_message,
 
+  CID_pn_reactor,
+  CID_pn_handler,
+
   CID_pn_io,
   CID_pn_selector,
   CID_pn_selectable,

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index c843fee..28c3313 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -24,6 +24,7 @@
 
 #include <proton/import_export.h>
 #include <proton/type_compat.h>
+#include <proton/object.h>
 #include <stddef.h>
 #include <sys/types.h>
 
@@ -87,6 +88,16 @@ typedef enum {
   PN_EVENT_NONE = 0,
 
   /**
+   * A reactor has been started. Events of this type point to the reactor.
+   */
+  PN_REACTOR_INIT,
+
+  /**
+   * A reactor has been stopped. Events of this type point to the reactor.
+   */
+  PN_REACTOR_FINAL,
+
+  /**
    * The connection has been created. This is the first event that
    * will ever be issued for a connection. Events of this type point
    * to the relevant connection.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/include/proton/handlers.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/handlers.h b/proton-c/include/proton/handlers.h
new file mode 100644
index 0000000..d2d732b
--- /dev/null
+++ b/proton-c/include/proton/handlers.h
@@ -0,0 +1,59 @@
+#ifndef PROTON_HANDLERS_H
+#define PROTON_HANDLERS_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/import_export.h>
+#include <proton/type_compat.h>
+#include <proton/reactor.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ *
+ * Reactor API for proton.
+ *
+ * @defgroup handlers Handlers
+ * @ingroup handlers
+ * @{
+ */
+
+typedef struct pn_handshaker_t pn_handshaker_t;
+typedef struct pn_flowcontroller_t pn_flowcontroller_t;
+
+PN_EXTERN pn_handshaker_t *pn_handshaker(void);
+PN_EXTERN pn_handler_t *pn_handshaker_handler(pn_handshaker_t *handshaker);
+
+PN_EXTERN pn_flowcontroller_t *pn_flowcontroller(int window);
+PN_EXTERN pn_handler_t *pn_flowcontroller_handler(pn_flowcontroller_t *flowcontroller);
+
+/** @}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* handlers.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
new file mode 100644
index 0000000..e4cd84a
--- /dev/null
+++ b/proton-c/include/proton/reactor.h
@@ -0,0 +1,83 @@
+#ifndef PROTON_REACTOR_H
+#define PROTON_REACTOR_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/import_export.h>
+#include <proton/type_compat.h>
+#include <proton/event.h>
+#include <proton/selectable.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ *
+ * Reactor API for proton.
+ *
+ * @defgroup reactor Reactor
+ * @ingroup reactor
+ * @{
+ */
+
+typedef struct pn_handler_t pn_handler_t;
+typedef struct pn_reactor_t pn_reactor_t;
+typedef struct pn_acceptor_t pn_acceptor_t;
+
+PN_EXTERN pn_handler_t *pn_handler(void (*dispatch)(pn_handler_t *, pn_event_t *));
+PN_EXTERN pn_handler_t *pn_handler_new(void (*dispatch)(pn_handler_t *, pn_event_t *), size_t size,
+                                       void (*finalize)(pn_handler_t *));
+PN_EXTERN void pn_handler_free(pn_handler_t *handler);
+PN_EXTERN void *pn_handler_mem(pn_handler_t *handler);
+PN_EXTERN pn_handler_t *pn_handler_cast(void *mem);
+PN_EXTERN void pn_handler_add(pn_handler_t *handler, pn_handler_t *child);
+PN_EXTERN void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event);
+
+PN_EXTERN pn_reactor_t *pn_reactor(void);
+PN_EXTERN void pn_reactor_free(pn_reactor_t *reactor);
+PN_EXTERN pn_collector_t *pn_reactor_collector(pn_reactor_t *reactor);
+PN_EXTERN pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor);
+PN_EXTERN pn_io_t *pn_reactor_io(pn_reactor_t *reactor);
+PN_EXTERN pn_selector_t *pn_reactor_selector(pn_reactor_t *reactor);
+PN_EXTERN pn_list_t *pn_reactor_children(pn_reactor_t *reactor);
+PN_EXTERN pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor);
+PN_EXTERN void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable);
+PN_EXTERN pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port,
+                                             pn_handler_t *handler);
+PN_EXTERN pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler);
+PN_EXTERN void pn_reactor_run(pn_reactor_t *reactor);
+
+PN_EXTERN void pn_acceptor_close(pn_reactor_t *reactor, pn_acceptor_t *acceptor);
+
+PN_EXTERN extern void *pni_handler;
+#define PN_HANDLER ((pn_handle_t) &pni_handler)
+
+/** @}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* reactor.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/events/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/events/event.c b/proton-c/src/events/event.c
index 05518be..f90c2cd 100644
--- a/proton-c/src/events/event.c
+++ b/proton-c/src/events/event.c
@@ -232,6 +232,10 @@ const char *pn_event_type_name(pn_event_type_t type)
   switch (type) {
   case PN_EVENT_NONE:
     return "PN_EVENT_NONE";
+  case PN_REACTOR_INIT:
+    return "PN_REACTOR_INIT";
+  case PN_REACTOR_FINAL:
+    return "PN_REACTOR_FINAL";
   case PN_CONNECTION_INIT:
     return "PN_CONNECTION_INIT";
   case PN_CONNECTION_BOUND:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/handlers/flowcontroller.c
----------------------------------------------------------------------
diff --git a/proton-c/src/handlers/flowcontroller.c b/proton-c/src/handlers/flowcontroller.c
new file mode 100644
index 0000000..2a164fe
--- /dev/null
+++ b/proton-c/src/handlers/flowcontroller.c
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/link.h>
+#include <proton/handlers.h>
+#include <assert.h>
+
+struct pn_flowcontroller_t {
+  int window;
+};
+
+static void pni_topup(pn_link_t *link, int window) {
+  int delta = window - pn_link_credit(link);
+  pn_link_flow(link, delta);
+}
+
+static void pn_flowcontroller_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  pn_flowcontroller_t *fc = (pn_flowcontroller_t *) pn_handler_mem(handler);
+  int window = fc->window;
+
+  switch (pn_event_type(event)) {
+  case PN_LINK_LOCAL_OPEN:
+  case PN_LINK_REMOTE_OPEN:
+  case PN_LINK_FLOW:
+  case PN_DELIVERY:
+    {
+      pn_link_t *link = pn_event_link(event);
+      if (pn_link_is_receiver(link)) {
+        pni_topup(link, window);
+      }
+    }
+    break;
+  default:
+    break;
+  }
+}
+
+pn_flowcontroller_t *pn_flowcontroller(int window) {
+  pn_handler_t *handler = pn_handler_new(pn_flowcontroller_dispatch, sizeof(pn_flowcontroller_t), NULL);
+  pn_flowcontroller_t *flowcontroller = (pn_flowcontroller_t *) pn_handler_mem(handler);
+  flowcontroller->window = window;
+  return flowcontroller;
+}
+
+pn_handler_t *pn_flowcontroller_handler(pn_flowcontroller_t *flowcontroller) {
+  return pn_handler_cast(flowcontroller);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/handlers/handshaker.c
----------------------------------------------------------------------
diff --git a/proton-c/src/handlers/handshaker.c b/proton-c/src/handlers/handshaker.c
new file mode 100644
index 0000000..647867a
--- /dev/null
+++ b/proton-c/src/handlers/handshaker.c
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/session.h>
+#include <proton/link.h>
+#include <proton/handlers.h>
+#include <assert.h>
+
+struct pn_handshaker_t {
+  pn_map_t *handlers;
+};
+
+static void pn_handshaker_finalize(pn_handler_t *handler) {
+  pn_handshaker_t *handshaker = (pn_handshaker_t *) pn_handler_mem(handler);
+  pn_free(handshaker->handlers);
+}
+
+static void pn_handshaker_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  switch (pn_event_type(event)) {
+  case PN_CONNECTION_REMOTE_OPEN:
+    {
+      pn_connection_t *conn = pn_event_connection(event);
+      if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
+        pn_connection_open(conn);
+      }
+    }
+    break;
+  case PN_SESSION_REMOTE_OPEN:
+    {
+      pn_session_t *ssn = pn_event_session(event);
+      if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
+        pn_session_open(ssn);
+      }
+    }
+    break;
+  case PN_LINK_REMOTE_OPEN:
+    {
+      pn_link_t *link = pn_event_link(event);
+      if (pn_link_state(link) & PN_LOCAL_UNINIT) {
+        pn_terminus_copy(pn_link_source(link), pn_link_remote_source(link));
+        pn_terminus_copy(pn_link_target(link), pn_link_remote_target(link));
+        pn_link_open(link);
+      }
+    }
+    break;
+  case PN_CONNECTION_REMOTE_CLOSE:
+    {
+      pn_connection_t *conn = pn_event_connection(event);
+      if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
+        pn_connection_close(conn);
+      }
+    }
+    break;
+  case PN_SESSION_REMOTE_CLOSE:
+    {
+      pn_session_t *ssn = pn_event_session(event);
+      if (!(pn_session_state(ssn) & PN_LOCAL_CLOSED)) {
+        pn_session_close(ssn);
+      }
+    }
+    break;
+  case PN_LINK_REMOTE_CLOSE:
+    {
+      pn_link_t *link = pn_event_link(event);
+      if (!(pn_link_state(link) & PN_LOCAL_CLOSED)) {
+        pn_link_close(link);
+      }
+    }
+    break;
+  default:
+    break;
+  }
+}
+
+pn_handshaker_t *pn_handshaker(void) {
+  pn_handler_t *handler = pn_handler_new(pn_handshaker_dispatch, sizeof(pn_handshaker_t), pn_handshaker_finalize);
+  pn_handshaker_t *handshaker = (pn_handshaker_t *) pn_handler_mem(handler);
+  handshaker->handlers = NULL;
+  return handshaker;
+}
+
+pn_handler_t *pn_handshaker_handler(pn_handshaker_t *handshaker) {
+  return pn_handler_cast(handshaker);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/messenger/messenger.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/messenger.c b/proton-c/src/messenger/messenger.c
index 4cdd5e9..92b9003 100644
--- a/proton-c/src/messenger/messenger.c
+++ b/proton-c/src/messenger/messenger.c
@@ -1306,6 +1306,8 @@ int pn_messenger_process_events(pn_messenger_t *messenger)
       break;
     case PN_LINK_FINAL:
       break;
+    default:
+      break;
     }
     pn_collector_pop(messenger->collector);
   }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c
new file mode 100644
index 0000000..8e49772
--- /dev/null
+++ b/proton-c/src/reactor/acceptor.c
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/io.h>
+#include <proton/reactor.h>
+#include <proton/sasl.h>
+#include <proton/selector.h>
+#include <proton/transport.h>
+#include "selectable.h"
+
+static ssize_t pni_acceptor_capacity(pn_selectable_t *sel) {
+  return 1;
+}
+
+pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport);
+
+void pni_acceptor_readable(pn_selectable_t *sel) {
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  char name[1024];
+  pn_socket_t sock = pn_accept(pn_reactor_io(reactor), pn_selectable_fd(sel), name, 1024);
+  pn_record_t *record = pn_selectable_attachments(sel);
+  pn_handler_t *handler = (pn_handler_t *) pn_record_get(record, PN_HANDLER);
+  if (!handler) { handler = pn_reactor_handler(reactor); }
+  pn_connection_t *conn = pn_reactor_connection(reactor, handler);
+  pn_transport_t *trans = pn_transport();
+  pn_transport_set_server(trans);
+  pn_sasl_t *sasl = pn_sasl(trans);
+  pn_sasl_allow_skip(sasl, true);
+  pn_sasl_mechanisms(sasl, "ANONYMOUS");
+  pn_sasl_done(sasl, PN_SASL_OK);
+  pn_transport_bind(trans, conn);
+  pn_reactor_selectable_transport(reactor, sock, trans);
+  pn_decref(trans);
+}
+
+void pni_acceptor_finalize(pn_selectable_t *sel) {
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_close(pn_reactor_io(reactor), pn_selectable_fd(sel));
+}
+
+pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port, pn_handler_t *handler) {
+  pn_selectable_t *sel = pn_reactor_selectable(reactor);
+  pn_selectable_set_capacity(sel, pni_acceptor_capacity);
+  pn_selectable_set_readable(sel, pni_acceptor_readable);
+  pn_selectable_set_finalize(sel, pni_acceptor_finalize);
+  pn_socket_t socket = pn_listen(pn_reactor_io(reactor), host, port);
+  pni_selectable_set_fd(sel, socket);
+  pni_selectable_set_context(sel, reactor);
+  pn_record_t *record = pn_selectable_attachments(sel);
+  pn_record_def(record, PN_HANDLER, PN_OBJECT);
+  pn_record_set(record, PN_HANDLER, handler);
+  pn_reactor_update(reactor, sel);
+  return (pn_acceptor_t *) sel;
+}
+
+void pn_acceptor_close(pn_reactor_t *reactor, pn_acceptor_t *acceptor) {
+  pn_selectable_t *sel = (pn_selectable_t *) acceptor;
+  pn_socket_t socket = pn_selectable_fd(sel);
+  pn_close(pn_reactor_io(reactor), socket);
+  pni_selectable_set_fd(sel, PN_INVALID_SOCKET);
+  pn_selector_remove(pn_reactor_selector(reactor), sel);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/reactor/connection.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c
new file mode 100644
index 0000000..2757ca5
--- /dev/null
+++ b/proton-c/src/reactor/connection.c
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/object.h>
+#include <proton/reactor.h>
+#include <proton/sasl.h>
+#include <proton/transport.h>
+#include <assert.h>
+#include <stdio.h>
+#include <strings.h>
+#include "selectable.h"
+
+// XXX: overloaded for both directions
+static void *pni_transportctx = NULL;
+#define PN_TRANCTX ((pn_handle_t) &pni_transportctx)
+
+void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event) {
+  assert(reactor);
+  pn_transport_t *transport = pn_event_transport(event);
+  pn_record_t *record = pn_transport_attachments(transport);
+  pn_selectable_t *sel = (pn_selectable_t *) pn_record_get(record, PN_TRANCTX);
+  if (sel && !pn_selectable_is_terminal(sel)) {
+    pn_reactor_update(reactor, sel);
+  }
+}
+
+pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport);
+
+void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event) {
+  assert(reactor);
+  assert(event);
+  pn_connection_t *conn = pn_event_connection(event);
+  pn_transport_t *transport = pn_transport();
+  pn_sasl_t *sasl = pn_sasl(transport);
+  pn_sasl_mechanisms(sasl, "ANONYMOUS");
+  pn_transport_bind(transport, conn);
+  const char *hostname = pn_connection_get_hostname(conn);
+  pn_string_t *str = pn_string(hostname);
+  char *host = pn_string_buffer(str);
+  const char *port = "5672";
+  char *colon = rindex(host, ':');
+  if (colon) {
+    port = colon + 1;
+    colon[0] = '\0';
+  }
+  pn_socket_t sock = pn_connect(pn_reactor_io(reactor), host, port);
+  pn_free(str);
+  pn_reactor_selectable_transport(reactor, sock, transport);
+  pn_decref(transport);
+}
+
+void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event) {
+  assert(reactor);
+  assert(event);
+  pn_connection_t *conn = pn_event_connection(event);
+  pn_list_remove(pn_reactor_children(reactor), conn);
+}
+
+static pn_transport_t *pni_transport(pn_selectable_t *sel) {
+  pn_record_t *record = pn_selectable_attachments(sel);
+  return (pn_transport_t *) pn_record_get(record, PN_TRANCTX);
+}
+
+static ssize_t pni_connection_capacity(pn_selectable_t *sel)
+{
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t capacity = pn_transport_capacity(transport);
+  if (capacity < 0) {
+    if (pn_transport_closed(transport)) {
+      pni_selectable_set_terminal(sel, true);
+    }
+  }
+  return capacity;
+}
+
+static ssize_t pni_connection_pending(pn_selectable_t *sel)
+{
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t pending = pn_transport_pending(transport);
+  if (pending < 0) {
+    if (pn_transport_closed(transport)) {
+      pni_selectable_set_terminal(sel, true);
+    }
+  }
+  return pending;
+}
+
+static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
+{
+  return 0;
+}
+
+static void pni_connection_readable(pn_selectable_t *sel)
+{
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t capacity = pn_transport_capacity(transport);
+  if (capacity > 0) {
+    ssize_t n = pn_recv(pn_reactor_io(reactor), pn_selectable_fd(sel),
+                        pn_transport_tail(transport), capacity);
+    if (n <= 0) {
+      if (n == 0 || !pn_wouldblock(pn_reactor_io(reactor))) {
+        if (n < 0) perror("recv");
+        pn_transport_close_tail(transport);
+        /*if (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) {
+          pn_error_report("CONNECTION", "connection aborted (remote)");
+          }*/
+      }
+    } else {
+      /*int err =*/ pn_transport_process(transport, (size_t)n);
+      /*if (err)
+        pn_error_copy(messenger->error, pn_transport_error(transport));*/
+    }
+  }
+
+  ssize_t newcap = pn_transport_capacity(transport);
+  if (newcap != capacity) {
+    pn_reactor_update(reactor, sel);
+  }
+}
+
+static void pni_connection_writable(pn_selectable_t *sel)
+{
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t pending = pn_transport_pending(transport);
+  if (pending > 0) {
+    ssize_t n = pn_send(pn_reactor_io(reactor), pn_selectable_fd(sel),
+                        pn_transport_head(transport), pending);
+    if (n < 0) {
+      if (!pn_wouldblock(pn_reactor_io(reactor))) {
+        perror("send");
+        pn_transport_close_head(transport);
+      }
+    } else {
+      pn_transport_pop(transport, n);
+    }
+  }
+
+  ssize_t newpending = pn_transport_pending(transport);
+  if (newpending != pending) {
+    pn_reactor_update(reactor, sel);
+  }
+}
+
+static void pni_connection_expired(pn_selectable_t *sel) {}
+
+static void pni_connection_finalize(pn_selectable_t *sel) {
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_transport_t *transport = pni_transport(sel);
+  pn_record_t *record = pn_transport_attachments(transport);
+  pn_record_set(record, PN_TRANCTX, NULL);
+  pn_socket_t fd = pn_selectable_fd(sel);
+  pn_close(pn_reactor_io(reactor), fd);
+}
+
+pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport) {
+  pn_selectable_t *sel = pn_reactor_selectable(reactor);
+  pn_selectable_set_capacity(sel, pni_connection_capacity);
+  pn_selectable_set_pending(sel, pni_connection_pending);
+  pn_selectable_set_deadline(sel, pni_connection_deadline);
+  pn_selectable_set_readable(sel, pni_connection_readable);
+  pn_selectable_set_writable(sel, pni_connection_writable);
+  pn_selectable_set_expired(sel, pni_connection_expired);
+  pn_selectable_set_finalize(sel, pni_connection_finalize);
+  pni_selectable_set_fd(sel, sock);
+  pni_selectable_set_context(sel, reactor);
+  pn_record_t *record = pn_selectable_attachments(sel);
+  pn_record_def(record, PN_TRANCTX, PN_OBJECT);
+  pn_record_set(record, PN_TRANCTX, transport);
+  pn_record_t *tr = pn_transport_attachments(transport);
+  pn_record_def(tr, PN_TRANCTX, PN_WEAKREF);
+  pn_record_set(tr, PN_TRANCTX, sel);
+  pn_reactor_update(reactor, sel);
+  return sel;
+}
+
+pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler) {
+  assert(reactor);
+  pn_connection_t *connection = pn_connection();
+  pn_record_t *record = pn_connection_attachments(connection);
+  pn_record_def(record, PN_HANDLER, PN_OBJECT);
+  pn_record_set(record, PN_HANDLER, handler);
+  pn_connection_collect(connection, pn_reactor_collector(reactor));
+  pn_list_add(pn_reactor_children(reactor), connection);
+  pn_decref(connection);
+  return connection;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/reactor/handler.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/handler.c b/proton-c/src/reactor/handler.c
new file mode 100644
index 0000000..cbbbb5d
--- /dev/null
+++ b/proton-c/src/reactor/handler.c
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/object.h>
+#include <proton/reactor.h>
+#include <proton/event.h>
+#include <assert.h>
+
+struct pn_handler_t {
+  void (*dispatch) (pn_handler_t *, pn_event_t *);
+  void (*finalize) (pn_handler_t *);
+  pn_list_t *children;
+};
+
+void pn_handler_initialize(void *object) {
+  pn_handler_t *handler = (pn_handler_t *) object;
+  handler->dispatch = NULL;
+  handler->children = NULL;
+}
+
+void pn_handler_finalize(void *object) {
+  pn_handler_t *handler = (pn_handler_t *) object;
+  if (handler->finalize) {
+    handler->finalize(handler);
+  }
+  pn_free(handler->children);
+}
+
+#define pn_handler_hashcode NULL
+#define pn_handler_compare NULL
+#define pn_handler_inspect NULL
+
+pn_handler_t *pn_handler(void (*dispatch)(pn_handler_t *, pn_event_t *)) {
+  return pn_handler_new(dispatch, 0, NULL);
+}
+
+pn_handler_t *pn_handler_new(void (*dispatch)(pn_handler_t *, pn_event_t *), size_t size, void (*finalize)(pn_handler_t *)) {
+  static const pn_class_t clazz = PN_CLASS(pn_handler);
+  pn_handler_t *handler = (pn_handler_t *) pn_class_new(&clazz, sizeof(pn_handler_t) + size);
+  handler->dispatch = dispatch;
+  handler->finalize = finalize;
+  return handler;
+}
+
+void pn_handler_free(pn_handler_t *handler) {
+  if (handler) {
+    if (handler->children) {
+      size_t n = pn_list_size(handler->children);
+      for (size_t i = 0; i < n; i++) {
+        void *child = pn_list_get(handler->children, i);
+        pn_decref(child);
+      }
+    }
+
+    pn_decref(handler);
+  }
+}
+
+void *pn_handler_mem(pn_handler_t *handler) {
+  return (void *) (handler + 1);
+}
+
+pn_handler_t *pn_handler_cast(void *mem) {
+  return ((pn_handler_t *) mem) - 1;
+}
+
+void pn_handler_add(pn_handler_t *handler, pn_handler_t *child) {
+  if (!handler->children) {
+    handler->children = pn_list(PN_OBJECT, 0);
+  }
+  pn_list_add(handler->children, child);
+}
+
+void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  if (handler->dispatch) {
+    handler->dispatch(handler, event);
+  }
+  if (handler->children) {
+    size_t n = pn_list_size(handler->children);
+    for (size_t i = 0; i < n; i++) {
+      pn_handler_t *child = (pn_handler_t *) pn_list_get(handler->children, i);
+      pn_handler_dispatch(child, event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
new file mode 100644
index 0000000..dbf63e9
--- /dev/null
+++ b/proton-c/src/reactor/reactor.c
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/object.h>
+#include <proton/io.h>
+#include <proton/selector.h>
+#include <proton/event.h>
+#include <proton/reactor.h>
+#include <proton/transport.h>
+#include <proton/connection.h>
+#include <proton/session.h>
+#include <proton/link.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+
+#include "selectable.h"
+
+struct pn_reactor_t {
+  pn_io_t *io;
+  pn_selector_t *selector;
+  pn_collector_t *collector;
+  pn_handler_t *handler;
+  pn_list_t *children;
+};
+
+void *pni_handler = NULL;
+
+static void pn_dummy_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  /*pn_string_t *str = pn_string(NULL);
+  pn_inspect(event, str);
+  printf("%s\n", pn_string_get(str));
+  pn_free(str);*/
+}
+
+static void pn_reactor_initialize(void *object) {
+  pn_reactor_t *reactor = (pn_reactor_t *) object;
+  reactor->io = pn_io();
+  reactor->selector = pn_io_selector(reactor->io);
+  reactor->collector = pn_collector();
+  reactor->handler = pn_handler(pn_dummy_dispatch);
+  reactor->children = pn_list(PN_OBJECT, 0);
+}
+
+static void pn_reactor_finalize(void *object) {
+  pn_reactor_t *reactor = (pn_reactor_t *) object;
+  pn_selector_free(reactor->selector);
+  pn_io_free(reactor->io);
+  pn_collector_free(reactor->collector);
+  pn_free(reactor->handler);
+  pn_free(reactor->children);
+}
+
+#define pn_reactor_hashcode NULL
+#define pn_reactor_compare NULL
+#define pn_reactor_inspect NULL
+
+pn_reactor_t *pn_reactor() {
+  static const pn_class_t clazz = PN_CLASS(pn_reactor);
+  return (pn_reactor_t *) pn_class_new(&clazz, sizeof(pn_reactor_t));
+}
+
+void pn_reactor_free(pn_reactor_t *reactor) {
+  if (reactor) {
+    pn_handler_free(reactor->handler);
+    reactor->handler = NULL;
+    pn_decref(reactor);
+  }
+}
+
+pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor) {
+  assert(reactor);
+  return reactor->handler;
+}
+
+pn_selector_t *pn_reactor_selector(pn_reactor_t *reactor) {
+  assert(reactor);
+  return reactor->selector;
+}
+
+pn_io_t *pn_reactor_io(pn_reactor_t *reactor) {
+  assert(reactor);
+  return reactor->io;
+}
+
+pn_collector_t *pn_reactor_collector(pn_reactor_t *reactor) {
+  assert(reactor);
+  return reactor->collector;
+}
+
+pn_list_t *pn_reactor_children(pn_reactor_t *reactor) {
+  assert(reactor);
+  return reactor->children;
+}
+
+pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor) {
+  assert(reactor);
+  pn_selectable_t *sel = pn_selectable();
+  pn_selector_add(reactor->selector, sel);
+  pn_list_add(reactor->children, sel);
+  pn_decref(sel);
+  return sel;
+}
+
+void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable) {
+  assert(reactor);
+  pn_selector_update(reactor->selector, selectable);
+}
+
+void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event);
+void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event);
+void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event);
+
+static void pni_reactor_dispatch(pn_reactor_t *reactor, pn_event_t *event) {
+  assert(reactor);
+  switch (pn_event_type(event)) {
+  case PN_TRANSPORT:
+    pni_handle_transport(reactor, event);
+    break;
+  case PN_CONNECTION_LOCAL_OPEN:
+    pni_handle_open(reactor, event);
+    break;
+  case PN_CONNECTION_FINAL:
+    pni_handle_final(reactor, event);
+    break;
+  default:
+    break;
+  }
+}
+
+pn_record_t *pni_attachments(const pn_class_t *clazz, void *instance) {
+  switch (pn_class_id(clazz)) {
+  case CID_pn_connection:
+    return pn_connection_attachments((pn_connection_t *) instance);
+  case CID_pn_session:
+    return pn_session_attachments((pn_session_t *) instance);
+  case CID_pn_link:
+    return pn_link_attachments((pn_link_t *) instance);
+  default:
+    return NULL;
+  }
+}
+
+pn_handler_t *pn_event_handler(pn_event_t *event) {
+  pn_record_t *record = pni_attachments(pn_event_class(event), pn_event_context(event));
+  if (record) {
+    return (pn_handler_t *) pn_record_get(record, PN_HANDLER);
+  } else {
+    return NULL;
+  }
+}
+
+void pn_reactor_process(pn_reactor_t *reactor) {
+  assert(reactor);
+  pn_event_t *event;
+  while ((event = pn_collector_peek(reactor->collector))) {
+    pn_handler_t *handler = pn_event_handler(event);
+    if (!handler) {
+      handler = reactor->handler;
+    }
+    pn_handler_dispatch(handler, event);
+    pni_reactor_dispatch(reactor, event);
+    pn_collector_pop(reactor->collector);
+  }
+}
+
+void pn_reactor_run(pn_reactor_t *reactor) {
+  assert(reactor);
+  pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_INIT);
+  while (true) {
+    pn_reactor_process(reactor);
+
+    if (!pn_selector_size(reactor->selector)) {
+      break;
+    }
+
+    pn_selector_select(reactor->selector, 1000);
+    pn_selectable_t *sel;
+    int events;
+    while ((sel = pn_selector_next(reactor->selector, &events))) {
+      if (events & PN_READABLE) {
+        pn_selectable_readable(sel);
+      }
+      if (events & PN_WRITABLE) {
+        pn_selectable_writable(sel);
+      }
+      if (events & PN_EXPIRED) {
+        pn_selectable_expired(sel);
+      }
+      if (pn_selectable_is_terminal(sel)) {
+        pn_selector_remove(reactor->selector, sel);
+        pn_list_remove(reactor->children, sel);
+      }
+    }
+  }
+  pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_FINAL);
+  pn_reactor_process(reactor);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index fb7df1b..312c94f 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -45,3 +45,4 @@ pn_add_c_test (c-message-tests message.c)
 pn_add_c_test (c-engine-tests engine.c)
 pn_add_c_test (c-parse-url-tests parse-url.c)
 pn_add_c_test (c-refcount-tests refcount.c)
+pn_add_c_test (c-reactor-tests reactor.c)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
new file mode 100644
index 0000000..bb93d42
--- /dev/null
+++ b/proton-c/src/tests/reactor.c
@@ -0,0 +1,320 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/reactor.h>
+#include <proton/handlers.h>
+#include <proton/connection.h>
+#include <proton/event.h>
+#include <proton/link.h>
+#include <proton/delivery.h>
+#include <stdlib.h>
+
+#define assert(E) ((E) ? 0 : (abort(), 0))
+
+
+static void test_reactor(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_free(reactor);
+}
+
+static void test_reactor_run(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  // run should exit if there is nothing left to do
+  pn_reactor_run(reactor);
+  pn_free(reactor);
+}
+
+typedef struct {
+  pn_list_t *events;
+} pni_test_handler_t;
+
+pni_test_handler_t *thmem(pn_handler_t *handler) {
+  return (pni_test_handler_t *) pn_handler_mem(handler);
+}
+
+void test_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  pn_list_add(thmem(handler)->events, (void *) pn_event_type(event));
+}
+
+pn_handler_t *test_handler(pn_list_t *events) {
+  pn_handler_t *handler = pn_handler_new(test_dispatch, sizeof(pni_test_handler_t), NULL);
+  thmem(handler)->events = events;
+  return handler;
+}
+
+#define END PN_EVENT_NONE
+
+void expect(pn_list_t *events, ...) {
+  va_list ap;
+
+  va_start(ap, events);
+  size_t idx = 0;
+  while (true) {
+    pn_event_type_t expected = (pn_event_type_t) va_arg(ap, int);
+    if (expected == END) {
+      assert(idx == pn_list_size(events));
+      break;
+    }
+    assert(idx < pn_list_size(events));
+    pn_event_type_t actual = (pn_event_type_t) pn_list_get(events, idx++);
+    assert(expected == actual);
+  }
+  va_end(ap);
+}
+
+static void test_reactor_handler(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_handler_t *handler = pn_reactor_handler(reactor);
+  assert(handler);
+  pn_list_t *events = pn_list(PN_VOID, 0);
+  pn_handler_t *th = test_handler(events);
+  pn_handler_add(handler, th);
+  pn_decref(th);
+  pn_free(reactor);
+  expect(events, END);
+  pn_free(events);
+}
+
+static void test_reactor_handler_free(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_handler_t *handler = pn_reactor_handler(reactor);
+  assert(handler);
+  pn_list_t *events = pn_list(PN_VOID, 0);
+  pn_handler_add(handler, test_handler(events));
+  pn_reactor_free(reactor);
+  expect(events, END);
+  pn_free(events);
+}
+
+static void test_reactor_handler_run(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_handler_t *handler = pn_reactor_handler(reactor);
+  assert(handler);
+  pn_list_t *events = pn_list(PN_VOID, 0);
+  pn_handler_t *th = test_handler(events);
+  pn_handler_add(handler, th);
+  pn_reactor_run(reactor);
+  expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
+  pn_free(reactor);
+  pn_free(th);
+  pn_free(events);
+}
+
+static void test_reactor_handler_run_free(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_handler_t *handler = pn_reactor_handler(reactor);
+  assert(handler);
+  pn_list_t *events = pn_list(PN_VOID, 0);
+  pn_handler_add(handler, test_handler(events));
+  pn_reactor_run(reactor);
+  expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
+  pn_reactor_free(reactor);
+  pn_free(events);
+}
+
+static void test_reactor_connection(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_list_t *cevents = pn_list(PN_VOID, 0);
+  pn_handler_t *tch = test_handler(cevents);
+  pn_connection_t *connection = pn_reactor_connection(reactor, tch);
+  assert(connection);
+  pn_handler_t *root = pn_reactor_handler(reactor);
+  pn_list_t *revents = pn_list(PN_VOID, 0);
+  pn_handler_add(root, test_handler(revents));
+  pn_reactor_run(reactor);
+  expect(revents, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
+  expect(cevents, PN_CONNECTION_INIT, END);
+  pn_reactor_free(reactor);
+  pn_handler_free(tch);
+  pn_free(cevents);
+  pn_free(revents);
+}
+
+static void test_reactor_acceptor(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL);
+  assert(acceptor);
+  pn_reactor_free(reactor);
+}
+
+pn_acceptor_t **tram(pn_handler_t *h) {
+  return (pn_acceptor_t **) pn_handler_mem(h);
+}
+
+static void tra_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  switch (pn_event_type(event)) {
+  case PN_REACTOR_INIT:
+    {
+      pn_acceptor_t *acceptor = *tram(handler);
+      pn_reactor_t *reactor = (pn_reactor_t *) pn_event_context(event);
+      pn_acceptor_close(reactor, acceptor);
+    }
+    break;
+  default:
+    break;
+  }
+}
+
+static pn_handler_t *tra_handler(pn_acceptor_t *acceptor) {
+  pn_handler_t *handler = pn_handler_new(tra_dispatch, sizeof(pn_acceptor_t *), NULL);
+  *tram(handler) = acceptor;
+  return handler;
+}
+
+static void test_reactor_acceptor_run(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_handler_t *root = pn_reactor_handler(reactor);
+  assert(root);
+  pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL);
+  assert(acceptor);
+  pn_handler_add(root, tra_handler(acceptor));
+  pn_reactor_run(reactor);
+  pn_reactor_free(reactor);
+}
+
+typedef struct {
+  pn_reactor_t *reactor;
+  pn_acceptor_t *acceptor;
+  pn_list_t *events;
+} server_t;
+
+static server_t *smem(pn_handler_t *handler) {
+  return (server_t *) pn_handler_mem(handler);
+}
+
+static void server_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  server_t *srv = smem(handler);
+  pn_list_add(srv->events, (void *) pn_event_type(event));
+  switch (pn_event_type(event)) {
+  case PN_CONNECTION_REMOTE_OPEN:
+    pn_acceptor_close(srv->reactor, srv->acceptor);
+    pn_connection_close(pn_event_connection(event));
+    pn_connection_release(pn_event_connection(event));
+    break;
+  default:
+    break;
+  }
+}
+
+typedef struct {
+  pn_list_t *events;
+} client_t;
+
+static client_t *cmem(pn_handler_t *handler) {
+  return (client_t *) pn_handler_mem(handler);
+}
+
+static void client_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  client_t *cli = cmem(handler);
+  pn_list_add(cli->events, (void *) pn_event_type(event));
+  pn_connection_t *conn = pn_event_connection(event);
+  switch (pn_event_type(event)) {
+  case PN_CONNECTION_INIT:
+    pn_connection_set_hostname(conn, "localhost:5672");
+    pn_connection_open(conn);
+    break;
+  case PN_CONNECTION_REMOTE_CLOSE:
+    pn_connection_close(conn);
+    pn_connection_release(conn);
+    break;
+  default:
+    break;
+  }
+}
+
+static void test_reactor_connect(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  pn_handler_t *sh = pn_handler_new(server_dispatch, sizeof(server_t), NULL);
+  server_t *srv = smem(sh);
+  pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5672", sh);
+  srv->reactor = reactor;
+  srv->acceptor = acceptor;
+  srv->events = pn_list(PN_VOID, 0);
+  pn_decref(sh);
+  pn_handler_t *ch = pn_handler_new(client_dispatch, sizeof(client_t), NULL);
+  client_t *cli = cmem(ch);
+  cli->events = pn_list(PN_VOID, 0);
+  pn_reactor_connection(reactor, ch);
+  pn_reactor_run(reactor);
+  expect(srv->events, PN_CONNECTION_INIT, PN_CONNECTION_BOUND,
+         PN_CONNECTION_REMOTE_OPEN, PN_CONNECTION_LOCAL_CLOSE,
+         PN_CONNECTION_REMOTE_CLOSE, PN_CONNECTION_UNBOUND,
+         PN_CONNECTION_FINAL, END);
+  pn_free(srv->events);
+  expect(cli->events, PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN,
+         PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN,
+         PN_CONNECTION_REMOTE_CLOSE, PN_CONNECTION_LOCAL_CLOSE,
+         PN_CONNECTION_UNBOUND, PN_CONNECTION_FINAL, END);
+  pn_free(cli->events);
+  pn_decref(ch);
+  pn_reactor_free(reactor);
+}
+
+/*
+void dispatch(pn_handler_t *handler, pn_event_t *event) {
+  pn_delivery_t *dlv = pn_event_delivery(event);
+  switch (pn_event_type(event)) {
+  case PN_DELIVERY:
+    if (!pn_delivery_partial(dlv)) {
+      pn_delivery_settle(dlv);
+    }
+    break;
+  default:
+    break;
+  }
+}
+
+static void test_reactor_flow(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_handler_t *root = pn_reactor_handler(reactor);
+  assert(root);
+  pn_handler_add(root, pn_handler(dispatch));
+  pn_handler_add(root, pn_handler_cast(pn_handshaker()));
+  pn_handler_add(root, pn_handler_cast(pn_flowcontroller(4*1024)));
+  pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL);
+  pn_reactor_run(reactor);
+  pn_reactor_free(reactor);
+  }*/
+
+int main(int argc, char **argv)
+{
+  test_reactor();
+  test_reactor_run();
+  test_reactor_handler();
+  test_reactor_handler_free();
+  test_reactor_handler_run();
+  test_reactor_handler_run_free();
+  test_reactor_connection();
+  test_reactor_acceptor();
+  test_reactor_acceptor_run();
+  test_reactor_connect();
+  return 0;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Re: new c-reactor-tests hang (was Re: [3/3] qpid-proton git commit: initial commit of C reactor)

Posted by Gordon Sim <gs...@redhat.com>.
On 01/07/2015 03:44 AM, Rafael Schloming wrote:
> On Tue, Jan 6, 2015 at 3:36 PM, Gordon Sim <gs...@redhat.com> wrote:
>
>> On 01/06/2015 07:39 PM, Rafael Schloming wrote:
>>
>>> Hey, I made a bunch of updates to them, can you do a pull and see if this
>>> is still an issue?
>>>
>>
>> That seems to fix it.

Actually, it turns out the hang is caused by another process listening 
on port 5672. It was simply coincidence that I did not have a broker 
running when I retested.

>> (A separate issue I just discovered by accident is
>> that python-test seems to hang if PN_TRACE_FRM is set to 1).
>
>
> I think that's because there are several tests that transfer enough
> messages to exceed the default session window. The tracing slows them down
> enough that the tests take a really really long time. I don't know if they
> actually hang or not, but they probably appear to.

I'm pretty sure in my case its a hang as there is no cpu activity at all.


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Re: new c-reactor-tests hang (was Re: [3/3] qpid-proton git commit: initial commit of C reactor)

Posted by Rafael Schloming <rh...@alum.mit.edu>.
On Tue, Jan 6, 2015 at 3:36 PM, Gordon Sim <gs...@redhat.com> wrote:

> On 01/06/2015 07:39 PM, Rafael Schloming wrote:
>
>> Hey, I made a bunch of updates to them, can you do a pull and see if this
>> is still an issue?
>>
>
> That seems to fix it. (A separate issue I just discovered by accident is
> that python-test seems to hang if PN_TRACE_FRM is set to 1).


I think that's because there are several tests that transfer enough
messages to exceed the default session window. The tracing slows them down
enough that the tests take a really really long time. I don't know if they
actually hang or not, but they probably appear to.

--Rafael

Re: new c-reactor-tests hang (was Re: [3/3] qpid-proton git commit: initial commit of C reactor)

Posted by Gordon Sim <gs...@redhat.com>.
On 01/06/2015 07:39 PM, Rafael Schloming wrote:
> Hey, I made a bunch of updates to them, can you do a pull and see if this
> is still an issue?

That seems to fix it. (A separate issue I just discovered by accident is 
that python-test seems to hang if PN_TRACE_FRM is set to 1).

>
> --Rafael
>
> On Tue, Jan 6, 2015 at 1:07 PM, Gordon Sim <gs...@redhat.com> wrote:
>
>> On 01/05/2015 03:40 AM, rhs@apache.org wrote:
>>
>>> initial commit of C reactor
>>>
>>>
>>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
>>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/
>>> ebe4e3d3
>>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ebe4e3d3
>>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ebe4e3d3
>>>
>>> Branch: refs/heads/master
>>> Commit: ebe4e3d3676d8fcceb3016d7e0b2abb2deff7927
>>> Parents: 86e08ba
>>> Author: Rafael Schloming <rh...@alum.mit.edu>
>>> Authored: Sun Jan 4 22:38:28 2015 -0500
>>> Committer: Rafael Schloming <rh...@alum.mit.edu>
>>> Committed: Sun Jan 4 22:38:28 2015 -0500
>>>
>>> ----------------------------------------------------------------------
>>>    proton-c/CMakeLists.txt                |   8 +
>>>    proton-c/include/proton/cid.h          |   3 +
>>>    proton-c/include/proton/event.h        |  11 +
>>>    proton-c/include/proton/handlers.h     |  59 +++++
>>>    proton-c/include/proton/reactor.h      |  83 ++++++++
>>>    proton-c/src/events/event.c            |   4 +
>>>    proton-c/src/handlers/flowcontroller.c |  65 ++++++
>>>    proton-c/src/handlers/handshaker.c     | 103 +++++++++
>>>    proton-c/src/messenger/messenger.c     |   2 +
>>>    proton-c/src/reactor/acceptor.c        |  80 +++++++
>>>    proton-c/src/reactor/connection.c      | 207 ++++++++++++++++++
>>>    proton-c/src/reactor/handler.c         | 103 +++++++++
>>>    proton-c/src/reactor/reactor.c         | 216 +++++++++++++++++++
>>>    proton-c/src/tests/CMakeLists.txt      |   1 +
>>>    proton-c/src/tests/reactor.c           | 320
>>> ++++++++++++++++++++++++++++
>>>    15 files changed, 1265 insertions(+)
>>> ----------------------------------------------------------------------
>>>
>>
>> The c-reactor-tests hang for me on a clean build of the latest on master.
>> There is no CPU activity. The pstack output (once valgrind is disabled) is:
>>
>>   #0  0x0000003d546e8b94 in poll () from /lib64/libc.so.6
>>> #1  0x00007f772a099028 in pn_selector_select (selector=0x1e5ff20,
>>> timeout=timeout@entry=1000) at /home/gordon/projects/proton-
>>> git/proton-c/src/posix/selector.c:165
>>> #2  0x00007f772a090d88 in pn_reactor_run (reactor=reactor@entry=0x1e60960)
>>> at /home/gordon/projects/proton-git/proton-c/src/reactor/reactor.c:195
>>> #3  0x00000000004013c7 in test_reactor_connect () at
>>> /home/gordon/projects/proton-git/proton-c/src/tests/reactor.c:265
>>> #4  main (argc=<optimized out>, argv=<optimized out>) at
>>> /home/gordon/projects/proton-git/proton-c/src/tests/reactor.c:318
>>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: dev-help@qpid.apache.org
>>
>>
>


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Re: new c-reactor-tests hang (was Re: [3/3] qpid-proton git commit: initial commit of C reactor)

Posted by Rafael Schloming <rh...@alum.mit.edu>.
Hey, I made a bunch of updates to them, can you do a pull and see if this
is still an issue?

--Rafael

On Tue, Jan 6, 2015 at 1:07 PM, Gordon Sim <gs...@redhat.com> wrote:

> On 01/05/2015 03:40 AM, rhs@apache.org wrote:
>
>> initial commit of C reactor
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/
>> ebe4e3d3
>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ebe4e3d3
>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ebe4e3d3
>>
>> Branch: refs/heads/master
>> Commit: ebe4e3d3676d8fcceb3016d7e0b2abb2deff7927
>> Parents: 86e08ba
>> Author: Rafael Schloming <rh...@alum.mit.edu>
>> Authored: Sun Jan 4 22:38:28 2015 -0500
>> Committer: Rafael Schloming <rh...@alum.mit.edu>
>> Committed: Sun Jan 4 22:38:28 2015 -0500
>>
>> ----------------------------------------------------------------------
>>   proton-c/CMakeLists.txt                |   8 +
>>   proton-c/include/proton/cid.h          |   3 +
>>   proton-c/include/proton/event.h        |  11 +
>>   proton-c/include/proton/handlers.h     |  59 +++++
>>   proton-c/include/proton/reactor.h      |  83 ++++++++
>>   proton-c/src/events/event.c            |   4 +
>>   proton-c/src/handlers/flowcontroller.c |  65 ++++++
>>   proton-c/src/handlers/handshaker.c     | 103 +++++++++
>>   proton-c/src/messenger/messenger.c     |   2 +
>>   proton-c/src/reactor/acceptor.c        |  80 +++++++
>>   proton-c/src/reactor/connection.c      | 207 ++++++++++++++++++
>>   proton-c/src/reactor/handler.c         | 103 +++++++++
>>   proton-c/src/reactor/reactor.c         | 216 +++++++++++++++++++
>>   proton-c/src/tests/CMakeLists.txt      |   1 +
>>   proton-c/src/tests/reactor.c           | 320
>> ++++++++++++++++++++++++++++
>>   15 files changed, 1265 insertions(+)
>> ----------------------------------------------------------------------
>>
>
> The c-reactor-tests hang for me on a clean build of the latest on master.
> There is no CPU activity. The pstack output (once valgrind is disabled) is:
>
>  #0  0x0000003d546e8b94 in poll () from /lib64/libc.so.6
>> #1  0x00007f772a099028 in pn_selector_select (selector=0x1e5ff20,
>> timeout=timeout@entry=1000) at /home/gordon/projects/proton-
>> git/proton-c/src/posix/selector.c:165
>> #2  0x00007f772a090d88 in pn_reactor_run (reactor=reactor@entry=0x1e60960)
>> at /home/gordon/projects/proton-git/proton-c/src/reactor/reactor.c:195
>> #3  0x00000000004013c7 in test_reactor_connect () at
>> /home/gordon/projects/proton-git/proton-c/src/tests/reactor.c:265
>> #4  main (argc=<optimized out>, argv=<optimized out>) at
>> /home/gordon/projects/proton-git/proton-c/src/tests/reactor.c:318
>>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
> For additional commands, e-mail: dev-help@qpid.apache.org
>
>

new c-reactor-tests hang (was Re: [3/3] qpid-proton git commit: initial commit of C reactor)

Posted by Gordon Sim <gs...@redhat.com>.
On 01/05/2015 03:40 AM, rhs@apache.org wrote:
> initial commit of C reactor
>
>
> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ebe4e3d3
> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ebe4e3d3
> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ebe4e3d3
>
> Branch: refs/heads/master
> Commit: ebe4e3d3676d8fcceb3016d7e0b2abb2deff7927
> Parents: 86e08ba
> Author: Rafael Schloming <rh...@alum.mit.edu>
> Authored: Sun Jan 4 22:38:28 2015 -0500
> Committer: Rafael Schloming <rh...@alum.mit.edu>
> Committed: Sun Jan 4 22:38:28 2015 -0500
>
> ----------------------------------------------------------------------
>   proton-c/CMakeLists.txt                |   8 +
>   proton-c/include/proton/cid.h          |   3 +
>   proton-c/include/proton/event.h        |  11 +
>   proton-c/include/proton/handlers.h     |  59 +++++
>   proton-c/include/proton/reactor.h      |  83 ++++++++
>   proton-c/src/events/event.c            |   4 +
>   proton-c/src/handlers/flowcontroller.c |  65 ++++++
>   proton-c/src/handlers/handshaker.c     | 103 +++++++++
>   proton-c/src/messenger/messenger.c     |   2 +
>   proton-c/src/reactor/acceptor.c        |  80 +++++++
>   proton-c/src/reactor/connection.c      | 207 ++++++++++++++++++
>   proton-c/src/reactor/handler.c         | 103 +++++++++
>   proton-c/src/reactor/reactor.c         | 216 +++++++++++++++++++
>   proton-c/src/tests/CMakeLists.txt      |   1 +
>   proton-c/src/tests/reactor.c           | 320 ++++++++++++++++++++++++++++
>   15 files changed, 1265 insertions(+)
> ----------------------------------------------------------------------

The c-reactor-tests hang for me on a clean build of the latest on 
master. There is no CPU activity. The pstack output (once valgrind is 
disabled) is:

> #0  0x0000003d546e8b94 in poll () from /lib64/libc.so.6
> #1  0x00007f772a099028 in pn_selector_select (selector=0x1e5ff20, timeout=timeout@entry=1000) at /home/gordon/projects/proton-git/proton-c/src/posix/selector.c:165
> #2  0x00007f772a090d88 in pn_reactor_run (reactor=reactor@entry=0x1e60960) at /home/gordon/projects/proton-git/proton-c/src/reactor/reactor.c:195
> #3  0x00000000004013c7 in test_reactor_connect () at /home/gordon/projects/proton-git/proton-c/src/tests/reactor.c:265
> #4  main (argc=<optimized out>, argv=<optimized out>) at /home/gordon/projects/proton-git/proton-c/src/tests/reactor.c:318



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org