You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/12/22 22:24:42 UTC

qpid-dispatch git commit: DISPATCH-179 - Added an option to designate internally originated messages as "control". This will be used to separate control traffic on dedicated sessions to prevent data congestion from adversely

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 45b627bde -> 40b444015


DISPATCH-179 - Added an option to designate internally originated messages as "control".
               This will be used to separate control traffic on dedicated sessions to prevent
               data congestion from adversely affecting control traffic (DISPATCH-43).


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/40b44401
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/40b44401
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/40b44401

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 40b4440159a622feff22579f78f8183f249fa36f
Parents: 45b627b
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Dec 22 16:19:25 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Dec 22 16:19:25 2015 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h            | 3 ++-
 python/qpid_dispatch_internal/router/engine.py | 2 +-
 src/python_embedded.c                          | 5 +++--
 src/router_core/router_core_private.h          | 1 +
 src/router_core/transfer.c                     | 8 +++++---
 5 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/40b44401/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 39d8933..bc4d03a 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -410,8 +410,9 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, q
  * @param msg Pointer to the message to be sent.  The message will be copied during the call
  *            can must be freed by the caller if the caller doesn't need to hold it for later use.
  * @param exclude_inprocess If true, the message will not be sent to in-process subscribers.
+ * @param control If true, this message is to be treated as control traffic and flow on a control link.
  */
-void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess);
+void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess, bool control);
 
 typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn, qdr_link_t *link, 
                                           qdr_terminus_t *source, qdr_terminus_t *target);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/40b44401/python/qpid_dispatch_internal/router/engine.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/engine.py b/python/qpid_dispatch_internal/router/engine.py
index 768b68e..140e5c0 100644
--- a/python/qpid_dispatch_internal/router/engine.py
+++ b/python/qpid_dispatch_internal/router/engine.py
@@ -232,7 +232,7 @@ class RouterEngine:
         Send a control message to another router.
         """
         app_props = {'opcode' : msg.get_opcode() }
-        self.io_adapter[0].send(Message(address=dest, properties=app_props, body=msg.to_dict()))
+        self.io_adapter[0].send(Message(address=dest, properties=app_props, body=msg.to_dict()), True, True)
 
 
     def node_updated(self, addr, reachable, neighbor):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/40b44401/src/python_embedded.c
----------------------------------------------------------------------
diff --git a/src/python_embedded.c b/src/python_embedded.c
index d8bab3f..1eb4779 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -596,8 +596,9 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args)
     qd_composed_field_t *field = 0;
     PyObject *message = 0;
     int       no_echo = 1;
+    int       control = 0;
 
-    if (!PyArg_ParseTuple(args, "O|i", &message, &no_echo))
+    if (!PyArg_ParseTuple(args, "O|ii", &message, &no_echo, &control))
         return 0;
 
     if (compose_python_message(&field, message, ioa->qd) == QD_ERROR_NONE) {
@@ -605,7 +606,7 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args)
         qd_message_compose_2(msg, field);
         PyObject *address = PyObject_GetAttrString(message, "address");
         if (address) {
-            qdr_send_to(ioa->core, msg, PyString_AsString(address), (bool) no_echo);
+            qdr_send_to(ioa->core, msg, PyString_AsString(address), (bool) no_echo, (bool) control);
             Py_DECREF(address);
         }
         qd_compose_free(field);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/40b44401/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 25c7d89..c135f11 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -86,6 +86,7 @@ struct qdr_action_t {
             qdr_subscription_t     *subscription;
             qd_message_t           *message;
             bool                    exclude_inprocess;
+            bool                    control;
         } io;
 
         //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/40b44401/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 8911339..6f22603 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -57,12 +57,13 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, q
 }
 
 
-void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess)
+void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess, bool control)
 {
     qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
     action->args.io.address           = qdr_field(addr);
     action->args.io.message           = qd_message_copy(msg);
     action->args.io.exclude_inprocess = exclude_inprocess;
+    action->args.io.control           = control;
 
     qdr_action_enqueue(core, action);
 }
@@ -76,7 +77,8 @@ static void qdr_route_message_CT(qdr_core_t     *core,
                                  qdr_address_t  *addr,
                                  qd_message_t   *msg,
                                  qdr_delivery_t *dlv,
-                                 bool            exclude_inprocess)
+                                 bool            exclude_inprocess,
+                                 bool            control)
 {
     const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
     printf("qdr_route_message_CT - %s, %s\n", key, exclude_inprocess ? "yes" : "no");
@@ -107,7 +109,7 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
         qd_address_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH);
         qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) &addr);
         if (addr)
-            qdr_route_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess);
+            qdr_route_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, action->args.io.control);
         else
             qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address");
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org