You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/04/11 23:03:48 UTC
svn commit: r1586773 - in /qpid/dispatch/trunk:
include/qpid/dispatch/server.h src/container.c src/server.c
src/server_private.h
Author: tross
Date: Fri Apr 11 21:03:48 2014
New Revision: 1586773
URL: http://svn.apache.org/r1586773
Log:
DISPATCH-41 - Container now uses the Proton Engine Event Collector API.
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/server.h
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/server.c
qpid/dispatch/trunk/src/server_private.h
Modified: qpid/dispatch/trunk/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/server.h?rev=1586773&r1=1586772&r2=1586773&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/server.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/server.h Fri Apr 11 21:03:48 2014
@@ -408,6 +408,15 @@ pn_connection_t *qd_connection_pn(qd_con
/**
+ * \brief Get the event collector for a connection.
+ *
+ * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The pn_collector associated with the connection.
+ */
+pn_collector_t *qd_connection_collector(qd_connection_t *conn);
+
+
+/**
* \brief Get the configuration that was used in the setup of this connection.
*
* @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1586773&r1=1586772&r2=1586773&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Fri Apr 11 21:03:48 2014
@@ -25,6 +25,8 @@
#include <qpid/dispatch/message.h>
#include <proton/engine.h>
#include <proton/message.h>
+#include <proton/connection.h>
+#include <proton/event.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/hash.h>
#include <qpid/dispatch/threading.h>
@@ -279,100 +281,89 @@ static int close_handler(void* unused, p
}
-static int process_handler(qd_container_t *container, void* unused, pn_connection_t *conn)
+static int process_handler(qd_container_t *container, void* unused, qd_connection_t *qd_conn)
{
pn_session_t *ssn;
pn_link_t *pn_link;
pn_delivery_t *delivery;
+ pn_collector_t *collector = qd_connection_collector(qd_conn);
+ pn_connection_t *conn = qd_connection_pn(qd_conn);
+ pn_event_t *event;
int event_count = 0;
- // Step 1: setup the engine's connection, and any sessions and links
- // that may be pending.
-
- // initialize the connection if it's new
- if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
- pn_connection_open(conn);
- event_count++;
- }
-
- // open all pending sessions
- ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
- while (ssn) {
- pn_session_set_incoming_capacity(ssn, 1000000);
- pn_session_open(ssn);
- ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
- event_count++;
- }
-
- // configure and open any pending links
- pn_link = pn_link_head(conn, PN_LOCAL_UNINIT);
- while (pn_link) {
- if (pn_link_is_sender(pn_link))
- setup_outgoing_link(container, pn_link);
- else
- setup_incoming_link(container, pn_link);
- pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT);
+ //
+ // Spin through the collected events for this connection and process them
+ // individually.
+ //
+ event = pn_collector_peek(collector);
+ while (event) {
event_count++;
- }
+ switch (pn_event_type(event)) {
+ case PN_CONNECTION_REMOTE_STATE :
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+ pn_connection_open(conn);
+ else if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
+ pn_connection_close(conn);
+ break;
+
+ case PN_SESSION_REMOTE_STATE :
+ ssn = pn_event_session(event);
+ if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
+ pn_session_set_incoming_capacity(ssn, 1000000);
+ pn_session_open(ssn);
+ } else if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
+ pn_session_close(ssn);
+ break;
+
+ case PN_LINK_REMOTE_STATE :
+ pn_link = pn_event_link(event);
+ if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
+ if (pn_link_is_sender(pn_link))
+ setup_outgoing_link(container, pn_link);
+ else
+ setup_incoming_link(container, pn_link);
+ } else if (pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+ qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
+ qd_node_t *node = link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
+ pn_link_close(pn_link);
+ }
+ break;
- // Step 2: Now drain all the pending deliveries from the connection's
- // work queue and process them
+ case PN_DELIVERY :
+ delivery = pn_event_delivery(event);
+ if (pn_delivery_readable(delivery))
+ do_receive(delivery);
+
+ if (pn_delivery_updated(delivery)) {
+ do_updated(delivery);
+ pn_delivery_clear(delivery);
+ }
+ break;
- delivery = pn_work_head(conn);
- while (delivery) {
- if (pn_delivery_readable(delivery))
- do_receive(delivery);
-
- if (pn_delivery_updated(delivery)) {
- do_updated(delivery);
- pn_delivery_clear(delivery);
+ default :
+ break;
}
- delivery = pn_work_next(delivery);
- event_count++;
+
+ pn_collector_pop(collector);
+ event = pn_collector_peek(collector);
}
//
- // Step 2.5: Call the attached node's writable handler for all active links
+ // Call the attached node's writable handler for all active links
// on the connection. Note that in Dispatch, links are considered
// bidirectional. Incoming and outgoing only pertains to deliveries and
// deliveries are a subset of the traffic that flows both directions on links.
//
- pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
- while (pn_link) {
- assert(pn_session_connection(pn_link_session(pn_link)) == conn);
- event_count += do_writable(pn_link);
- pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
- }
-
- // Step 3: Clean up any links or sessions that have been closed by the
- // remote. If the connection has been closed remotely, clean that up
- // also.
-
- // teardown any terminating links
- pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
- while (pn_link) {
- qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
- qd_node_t *node = link->node;
- if (node)
- node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
- pn_link_close(pn_link);
- pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
- event_count++;
- }
-
- // teardown any terminating sessions
- ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
- while (ssn) {
- pn_session_close(ssn);
- ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
- event_count++;
- }
-
- // teardown the connection if it's terminating
- if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
- pn_connection_close(conn);
- event_count++;
+ if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)) {
+ pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ while (pn_link) {
+ assert(pn_session_connection(pn_link_session(pn_link)) == conn);
+ event_count += do_writable(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ }
}
return event_count;
@@ -420,7 +411,7 @@ static int handler(void *handler_context
case QD_CONN_EVENT_LISTENER_OPEN: open_handler(container, qd_conn, QD_INCOMING, conn_context); break;
case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, QD_OUTGOING, conn_context); break;
case QD_CONN_EVENT_CLOSE: return close_handler(conn_context, conn);
- case QD_CONN_EVENT_PROCESS: return process_handler(container, conn_context, conn);
+ case QD_CONN_EVENT_PROCESS: return process_handler(container, conn_context, qd_conn);
}
return 0;
Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1586773&r1=1586772&r2=1586773&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Fri Apr 11 21:03:48 2014
@@ -69,6 +69,7 @@ static void thread_process_listeners(qd_
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_cxtr = cxtr;
+ ctx->collector = 0;
ctx->listener = (qd_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
@@ -77,6 +78,8 @@ static void thread_process_listeners(qd_
ctx->ufd = 0;
pn_connection_t *conn = pn_connection();
+ ctx->collector = pn_collector();
+ pn_connection_collect(conn, ctx->collector);
pn_connection_set_container(conn, qd_server->container_name);
pn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
@@ -195,6 +198,8 @@ static int process_connector(qd_server_t
}
pn_connection_t *conn = pn_connection();
+ ctx->collector = pn_collector();
+ pn_connection_collect(conn, ctx->collector);
pn_connection_set_container(conn, qd_server->container_name);
pn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
@@ -486,6 +491,8 @@ static void *thread_run(void *arg)
sys_mutex_lock(qd_server->lock);
DEQ_REMOVE(qd_server->connections, ctx);
+ if (ctx->collector)
+ pn_collector_free(ctx->collector);
free_qd_connection_t(ctx);
pn_connector_free(cxtr);
if (conn)
@@ -567,6 +574,7 @@ static void cxtr_try_open(void *context)
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_conn = 0;
+ ctx->collector = 0;
ctx->listener = 0;
ctx->connector = ct;
ctx->context = ct->context;
@@ -878,6 +886,12 @@ pn_connection_t *qd_connection_pn(qd_con
}
+pn_collector_t *qd_connection_collector(qd_connection_t *conn)
+{
+ return conn->collector;
+}
+
+
const qd_server_config_t *qd_connection_config(const qd_connection_t *conn)
{
if (conn->listener)
@@ -975,6 +989,7 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_conn = 0;
+ ctx->collector = 0;
ctx->listener = 0;
ctx->connector = 0;
ctx->context = 0;
Modified: qpid/dispatch/trunk/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1586773&r1=1586772&r2=1586773&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Fri Apr 11 21:03:48 2014
@@ -26,6 +26,7 @@
#include <qpid/dispatch/log.h>
#include <proton/driver.h>
#include <proton/engine.h>
+#include <proton/event.h>
#include <proton/driver_extras.h>
#include "dispatch_private.h"
@@ -79,6 +80,7 @@ struct qd_connection_t {
int enqueued;
pn_connector_t *pn_cxtr;
pn_connection_t *pn_conn;
+ pn_collector_t *collector;
qd_listener_t *listener;
qd_connector_t *connector;
void *context; // Copy of context from listener or connector
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org