You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/02/15 14:33:01 UTC

svn commit: r1446576 - in /qpid/trunk/qpid/extras/nexus: CMakeLists.txt include/qpid/nexus/router.h router/ router/CMakeLists.txt router/src/ router/src/main.c src/router_node.c

Author: tross
Date: Fri Feb 15 13:33:01 2013
New Revision: 1446576

URL: http://svn.apache.org/r1446576
Log:
QPID-4538 - Added router executable

Added:
    qpid/trunk/qpid/extras/nexus/include/qpid/nexus/router.h   (with props)
    qpid/trunk/qpid/extras/nexus/router/
    qpid/trunk/qpid/extras/nexus/router/CMakeLists.txt   (with props)
    qpid/trunk/qpid/extras/nexus/router/src/
    qpid/trunk/qpid/extras/nexus/router/src/main.c   (with props)
    qpid/trunk/qpid/extras/nexus/src/router_node.c   (with props)
Modified:
    qpid/trunk/qpid/extras/nexus/CMakeLists.txt

Modified: qpid/trunk/qpid/extras/nexus/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/CMakeLists.txt?rev=1446576&r1=1446575&r2=1446576&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/CMakeLists.txt (original)
+++ qpid/trunk/qpid/extras/nexus/CMakeLists.txt Fri Feb 15 13:33:01 2013
@@ -63,6 +63,7 @@ set(CATCH_UNDEFINED "-Wl,--no-undefined"
 ## Build the Multi-Threaded Server Library
 ##
 set(server_SOURCES
+#    src/agent.c
     src/alloc.c
     src/auth.c
     src/buffer.c
@@ -73,6 +74,7 @@ set(server_SOURCES
     src/log.c
     src/message.c
     src/posix/threading.c
+    src/router_node.c
     src/server.c
     src/timer.c
     src/work_queue.c
@@ -93,4 +95,5 @@ install(FILES ${headers} DESTINATION ${I
 ##
 ## Build Tests
 ##
+add_subdirectory(router)
 add_subdirectory(tests)

Added: qpid/trunk/qpid/extras/nexus/include/qpid/nexus/router.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/include/qpid/nexus/router.h?rev=1446576&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/nexus/include/qpid/nexus/router.h (added)
+++ qpid/trunk/qpid/extras/nexus/include/qpid/nexus/router.h Fri Feb 15 13:33:01 2013
@@ -0,0 +1,35 @@
+#ifndef __nexus_router_h__
+#define __nexus_router_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/engine.h>
+#include <qpid/nexus/container.h>
+
+typedef struct nx_router_t nx_router_t;
+
+typedef struct {
+    size_t  message_limit;
+    size_t  memory_limit;
+} nx_router_configuration_t;
+
+nx_router_t *nx_router(nx_router_configuration_t *config);
+void         nx_router_free(nx_router_t *router);
+
+#endif

Propchange: qpid/trunk/qpid/extras/nexus/include/qpid/nexus/router.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/extras/nexus/include/qpid/nexus/router.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/trunk/qpid/extras/nexus/router/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/router/CMakeLists.txt?rev=1446576&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/nexus/router/CMakeLists.txt (added)
+++ qpid/trunk/qpid/extras/nexus/router/CMakeLists.txt Fri Feb 15 13:33:01 2013
@@ -0,0 +1,31 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+## 
+##   http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License.
+##
+
+##
+## Build the router application
+##
+set(router_SOURCES
+    src/main.c
+    )
+
+add_executable(nexus-router ${router_SOURCES})
+target_link_libraries(nexus-router qpid-nexus ${proton_lib})
+
+install(TARGETS nexus-router)
+

Propchange: qpid/trunk/qpid/extras/nexus/router/CMakeLists.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/extras/nexus/router/src/main.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/router/src/main.c?rev=1446576&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/nexus/router/src/main.c (added)
+++ qpid/trunk/qpid/extras/nexus/router/src/main.c Fri Feb 15 13:33:01 2013
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <proton/driver.h>
+#include <qpid/nexus/server.h>
+#include <qpid/nexus/container.h>
+#include <qpid/nexus/timer.h>
+#include <qpid/nexus/log.h>
+#include <qpid/nexus/router.h>
+#include <signal.h>
+
+static int exit_with_sigint = 0;
+
+static void thread_start_handler(void* context, int thread_id)
+{
+}
+
+
+static void signal_handler(void* context, int signum)
+{
+    nx_server_pause();
+
+    switch (signum) {
+    case SIGINT:
+        exit_with_sigint = 1;
+
+    case SIGQUIT:
+    case SIGTERM:
+        fflush(stdout);
+        nx_server_stop();
+        break;
+
+    case SIGHUP:
+        break;
+
+    default:
+        break;
+    }
+
+    nx_server_resume();
+}
+
+
+static void startup(void *context)
+{
+    // TODO - Move this into a configuration framework
+
+    nx_server_pause();
+
+    static nx_server_config_t server_config;
+    server_config.host            = "0.0.0.0";
+    server_config.port            = "5672";
+    server_config.sasl_mechanisms = "ANONYMOUS";
+    server_config.ssl_enabled     = 0;
+
+    nx_server_listen(&server_config, 0);
+
+    /*
+    static nx_server_config_t client_config;
+    client_config.host            = "0.0.0.0";
+    client_config.port            = "10000";
+    client_config.sasl_mechanisms = "ANONYMOUS";
+    client_config.ssl_enabled     = 0;
+
+    nx_server_connect(&client_config, 0);
+    */
+
+    nx_server_resume();
+}
+
+
+int main(int argc, char **argv)
+{
+    nx_log_set_mask(LOG_INFO | LOG_TRACE | LOG_ERROR);
+
+    nx_server_initialize(4);
+    nx_container_initialize();
+
+    nx_server_set_signal_handler(signal_handler, 0);
+    nx_server_set_start_handler(thread_start_handler, 0);
+
+    nx_router_t *router = nx_router(0);
+
+    nx_timer_t *startup_timer = nx_timer(startup, 0);
+    nx_timer_schedule(startup_timer, 0);
+
+    nx_server_signal(SIGHUP);
+    nx_server_signal(SIGQUIT);
+    nx_server_signal(SIGTERM);
+    nx_server_signal(SIGINT);
+
+    nx_server_run();
+    nx_router_free(router);
+    nx_server_finalize();
+
+    if (exit_with_sigint) {
+	signal(SIGINT, SIG_DFL);
+	kill(getpid(), SIGINT);
+    }
+
+    return 0;
+}
+

Propchange: qpid/trunk/qpid/extras/nexus/router/src/main.c
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/extras/nexus/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/router_node.c?rev=1446576&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/router_node.c (added)
+++ qpid/trunk/qpid/extras/nexus/src/router_node.c Fri Feb 15 13:33:01 2013
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <qpid/nexus/server.h>
+#include <qpid/nexus/message.h>
+#include <qpid/nexus/threading.h>
+#include <qpid/nexus/timer.h>
+#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/hash.h>
+#include <qpid/nexus/iterator.h>
+#include <qpid/nexus/log.h>
+#include <qpid/nexus/router.h>
+
+static char *module="ROUTER_NODE";
+
+struct nx_router_t {
+    nx_node_t          *node;
+    nx_link_list_t      in_links;
+    nx_link_list_t      out_links;
+    nx_message_list_t   in_fifo;
+    sys_mutex_t        *lock;
+    nx_timer_t         *timer;
+    hash_t             *out_hash;
+    uint64_t            dtag;
+};
+
+
+typedef struct {
+    nx_link_t         *link;
+    nx_message_list_t  out_fifo;
+} nx_router_link_t;
+
+
+ALLOC_DECLARE(nx_router_link_t);
+ALLOC_DEFINE(nx_router_link_t);
+
+
+/**
+ * Outbound Delivery Handler
+ */
+static void router_tx_handler(void* context, nx_link_t *link, pn_delivery_t *delivery)
+{
+    nx_router_t      *router  = (nx_router_t*) context;
+    pn_link_t        *pn_link = pn_delivery_link(delivery);
+    nx_router_link_t *rlink   = (nx_router_link_t*) nx_link_get_context(link);
+    nx_message_t     *msg;
+    size_t            size;
+
+    sys_mutex_lock(router->lock);
+    msg = DEQ_HEAD(rlink->out_fifo);
+    if (!msg) {
+        // TODO - Recind the delivery
+        sys_mutex_unlock(router->lock);
+        return;
+    }
+
+    DEQ_REMOVE_HEAD(rlink->out_fifo);
+    size = (DEQ_SIZE(rlink->out_fifo));
+    sys_mutex_unlock(router->lock);
+
+    nx_message_send(msg, pn_link);
+
+    //
+    // If there is no incoming delivery, it was pre-settled.  In this case,
+    // we must pre-settle the outgoing delivery as well.
+    //
+    if (nx_message_in_delivery(msg)) {
+        pn_delivery_set_context(delivery, (void*) msg);
+        nx_message_set_out_delivery(msg, delivery);
+    } else {
+        pn_delivery_settle(delivery);
+        nx_free_message(msg);
+    }
+
+    pn_link_advance(pn_link);
+    pn_link_offered(pn_link, size);
+}
+
+
+/**
+ * Inbound Delivery Handler
+ */
+static void router_rx_handler(void* context, nx_link_t *link, pn_delivery_t *delivery)
+{
+    nx_router_t  *router  = (nx_router_t*) context;
+    pn_link_t    *pn_link = pn_delivery_link(delivery);
+    nx_message_t *msg;
+    int           valid_message = 0;
+
+    //
+    // Receive the message into a local representation.  If the returned message
+    // pointer is NULL, we have not yet received a complete message.
+    //
+    sys_mutex_lock(router->lock);
+    msg = nx_message_receive(delivery);
+    sys_mutex_unlock(router->lock);
+
+    if (!msg)
+        return;
+
+    //
+    // Validate the message through the Properties section
+    //
+    valid_message = nx_message_check(msg, NX_DEPTH_PROPERTIES);
+
+    pn_link_advance(pn_link);
+    pn_link_flow(pn_link, 1);
+
+    if (valid_message) {
+        nx_field_iterator_t *iter = nx_message_field_iterator(msg, NX_FIELD_TO);
+        nx_router_link_t    *rlink;
+        if (iter) {
+            nx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+            sys_mutex_lock(router->lock);
+            int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
+            nx_field_iterator_free(iter);
+
+            if (result == 0) {
+                //
+                // To field is valid and contains a known destination.  Enqueue on
+                // the output fifo for the next-hop-to-destination.
+                //
+                pn_link_t* pn_outlink = nx_link_pn(rlink->link);
+                DEQ_INSERT_TAIL(rlink->out_fifo, msg);
+                pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo));
+                nx_link_activate(rlink->link);
+            } else {
+                //
+                // To field contains an unknown address.  Release the message.
+                //
+                pn_delivery_update(delivery, PN_RELEASED);
+                pn_delivery_settle(delivery);
+            }
+
+            sys_mutex_unlock(router->lock);
+        }
+    } else {
+        //
+        // Message is invalid.  Reject the message.
+        //
+        pn_delivery_update(delivery, PN_REJECTED);
+        pn_delivery_settle(delivery);
+        pn_delivery_set_context(delivery, 0);
+        nx_free_message(msg);
+    }
+}
+
+
+/**
+ * Delivery Disposition Handler
+ */
+static void router_disp_handler(void* context, nx_link_t *link, pn_delivery_t *delivery)
+{
+    pn_link_t *pn_link = pn_delivery_link(delivery);
+
+    if (pn_link_is_sender(pn_link)) {
+        pn_disposition_t  disp     = pn_delivery_remote_state(delivery);
+        nx_message_t     *msg      = pn_delivery_get_context(delivery);
+        pn_delivery_t    *activate = 0;
+
+        if (msg) {
+            assert(delivery == nx_message_out_delivery(msg));
+            if (disp != 0) {
+                activate = nx_message_in_delivery(msg);
+                pn_delivery_update(activate, disp);
+                // TODO - handling of the data accompanying RECEIVED/MODIFIED
+            }
+
+            if (pn_delivery_settled(delivery)) {
+                //
+                // Downstream delivery has been settled.  Propagate the settlement
+                // upstream.
+                //
+                activate = nx_message_in_delivery(msg);
+                pn_delivery_settle(activate);
+                pn_delivery_settle(delivery);
+                nx_free_message(msg);
+            }
+
+            if (activate) {
+                //
+                // Activate the upstream/incoming link so that the settlement will
+                // get pushed out.
+                //
+                nx_link_t *act_link = (nx_link_t*) pn_link_get_context(pn_delivery_link(activate));
+                nx_link_activate(act_link);
+            }
+
+            return;
+        }
+    }
+
+    pn_delivery_settle(delivery);
+}
+
+
+/**
+ * New Incoming Link Handler
+ */
+static int router_incoming_link_handler(void* context, nx_link_t *link)
+{
+    nx_router_t    *router  = (nx_router_t*) context;
+    nx_link_item_t *item    = new_nx_link_item_t();
+    pn_link_t      *pn_link = nx_link_pn(link);
+
+    if (item) {
+        DEQ_ITEM_INIT(item);
+        item->link = link;
+
+        sys_mutex_lock(router->lock);
+        DEQ_INSERT_TAIL(router->in_links, item);
+        sys_mutex_unlock(router->lock);
+
+        pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+        pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+        pn_link_flow(pn_link, 8);
+        pn_link_open(pn_link);
+    } else {
+        pn_link_close(pn_link);
+    }
+    return 0;
+}
+
+
+/**
+ * New Outgoing Link Handler
+ */
+static int router_outgoing_link_handler(void* context, nx_link_t *link)
+{
+    nx_router_t *router  = (nx_router_t*) context;
+    pn_link_t   *pn_link = nx_link_pn(link);
+    const char  *r_tgt   = pn_terminus_get_address(pn_link_remote_target(pn_link));
+
+    sys_mutex_lock(router->lock);
+    nx_router_link_t *rlink = new_nx_router_link_t();
+    rlink->link = link;
+    DEQ_INIT(rlink->out_fifo);
+    nx_link_set_context(link, rlink);
+
+    nx_field_iterator_t *iter = nx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+    int result = hash_insert(router->out_hash, iter, rlink);
+    nx_field_iterator_free(iter);
+
+    if (result == 0) {
+        pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+        pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+        pn_link_open(pn_link);
+        sys_mutex_unlock(router->lock);
+        nx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
+        return 0;
+    }
+
+    nx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt);
+    pn_link_close(pn_link);
+    sys_mutex_unlock(router->lock);
+    return 0;
+}
+
+
+/**
+ * Outgoing Link Writable Handler
+ */
+static int router_writable_link_handler(void* context, nx_link_t *link)
+{
+    nx_router_t      *router = (nx_router_t*) context;
+    int               grant_delivery = 0;
+    pn_delivery_t    *delivery;
+    nx_router_link_t *rlink = (nx_router_link_t*) nx_link_get_context(link);
+    pn_link_t        *pn_link = nx_link_pn(link);
+    uint64_t          tag;
+
+    sys_mutex_lock(router->lock);
+    if (DEQ_SIZE(rlink->out_fifo) > 0) {
+        grant_delivery = 1;
+        tag = router->dtag++;
+    }
+    sys_mutex_unlock(router->lock);
+
+    if (grant_delivery) {
+        pn_delivery(pn_link, pn_dtag((char*) &tag, 8));
+        delivery = pn_link_current(pn_link);
+        if (delivery) {
+            router_tx_handler(context, link, delivery);
+            return 1;
+        }
+    }
+
+    return 0;
+}
+
+
+/**
+ * Link Detached Handler
+ */
+static int router_link_detach_handler(void* context, nx_link_t *link, int closed)
+{
+    nx_router_t    *router  = (nx_router_t*) context;
+    pn_link_t      *pn_link = nx_link_pn(link);
+    const char     *r_tgt   = pn_terminus_get_address(pn_link_remote_target(pn_link));
+    nx_link_item_t *item;
+
+    sys_mutex_lock(router->lock);
+    if (pn_link_is_sender(pn_link)) {
+        item = DEQ_HEAD(router->out_links);
+
+        nx_field_iterator_t *iter = nx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+        nx_router_link_t    *rlink;
+        if (iter) {
+            int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
+            if (result == 0) {
+                nx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+                hash_remove(router->out_hash, iter);
+                free_nx_router_link_t(rlink);
+                nx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+            }
+            nx_field_iterator_free(iter);
+        }
+    }
+    else
+        item = DEQ_HEAD(router->in_links);
+
+    while (item) {
+        if (item->link == link) {
+            if (pn_link_is_sender(pn_link))
+                DEQ_REMOVE(router->out_links, item);
+            else
+                DEQ_REMOVE(router->in_links, item);
+            free_nx_link_item_t(item);
+            break;
+        }
+        item = item->next;
+    }
+
+    sys_mutex_unlock(router->lock);
+    return 0;
+}
+
+
+static void router_inbound_open_handler(void *type_context, nx_connection_t *conn)
+{
+}
+
+
+static void router_outbound_open_handler(void *type_context, nx_connection_t *conn)
+{
+}
+
+
+static void nx_router_timer_handler(void *context)
+{
+    nx_router_t *router = (nx_router_t*) context;
+
+    //
+    // Periodic processing.
+    //
+    nx_timer_schedule(router->timer, 1000);
+}
+
+
+static nx_node_type_t router_node = {"router", 0, 0,
+                                     router_rx_handler,
+                                     router_tx_handler,
+                                     router_disp_handler,
+                                     router_incoming_link_handler,
+                                     router_outgoing_link_handler,
+                                     router_writable_link_handler,
+                                     router_link_detach_handler,
+                                     0,   // node_created_handler
+                                     0,   // node_destroyed_handler
+                                     router_inbound_open_handler,
+                                     router_outbound_open_handler };
+static int type_registered = 0;
+
+
+nx_router_t *nx_router(nx_router_configuration_t *config)
+{
+    if (!type_registered) {
+        type_registered = 1;
+        nx_container_register_node_type(&router_node);
+    }
+
+    nx_router_t *router = NEW(nx_router_t);
+    nx_container_set_default_node_type(&router_node, (void*) router, NX_DIST_BOTH);
+
+    DEQ_INIT(router->in_links);
+    DEQ_INIT(router->out_links);
+    DEQ_INIT(router->in_fifo);
+
+    router->lock = sys_mutex();
+
+    router->timer = nx_timer(nx_router_timer_handler, (void*) router);
+    nx_timer_schedule(router->timer, 0); // Immediate
+
+    router->out_hash = hash(10, 32, 0);
+    router->dtag = 1;
+
+    return router;
+}
+
+
+void nx_router_free(nx_router_t *router)
+{
+    nx_container_set_default_node_type(0, 0, NX_DIST_BOTH);
+    sys_mutex_free(router->lock);
+    free(router);
+}
+

Propchange: qpid/trunk/qpid/extras/nexus/src/router_node.c
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org