You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/04/13 21:10:10 UTC

qpid-dispatch git commit: DISPATCH-744 - Change default behavior for unsettled deliveries to multicast addresses.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 9b80efdce -> b17156967


DISPATCH-744 - Change default behavior for unsettled deliveries to multicast addresses.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b1715696
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b1715696
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b1715696

Branch: refs/heads/master
Commit: b171569675f401eef8105186403c1b60904fad36
Parents: 9b80efd
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Apr 13 16:26:08 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Apr 13 16:26:08 2017 -0400

----------------------------------------------------------------------
 python/qpid_dispatch/management/qdrouter.json   |   7 +
 src/dispatch.c                                  |   1 +
 src/dispatch_private.h                          |   1 +
 src/router_core/forwarder.c                     |  13 +-
 src/router_core/router_core.c                   |   5 +
 src/router_node.c                               |   3 +-
 tests/CMakeLists.txt                            |   1 +
 .../system_tests_denied_unsettled_multicast.py  | 131 +++++++++++++++++++
 tests/system_tests_one_router.py                |   2 +-
 tests/system_tests_two_routers.py               |   2 +-
 10 files changed, 162 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 65b5522..92fbfe2 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -484,6 +484,13 @@
                     "required": false,
                     "create": true
                 },
+                "allowUnsettledMulticast": {
+                    "type": "boolean",
+                    "description": "If true, allow senders to send unsettled deliveries to multicast addresses.  These deliveries shall be settled by the ingress router.  If false, unsettled deliveries to multicast addresses shall be rejected.",
+                    "create": true,
+                    "required": false,
+                    "default": false
+                },
                 "routerId": {
                     "description":"(DEPRECATED) Router's unique identity. This attribute has been deprecated. Use id instead",
                     "type": "string",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index 25b1dcf..eb0c6eb 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -178,6 +178,7 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity)
 
     qd->router_mode = qd_entity_get_long(entity, "mode"); QD_ERROR_RET();
     qd->thread_count = qd_entity_opt_long(entity, "workerThreads", 4); QD_ERROR_RET();
+    qd->allow_unsettled_multicast = qd_entity_opt_bool(entity, "allowUnsettledMulticast", false); QD_ERROR_RET();
 
     if (! qd->sasl_config_path) {
         qd->sasl_config_path = qd_entity_opt_string(entity, "saslConfigPath", 0); QD_ERROR_RET();

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/src/dispatch_private.h
----------------------------------------------------------------------
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index 9e62085..8b46d71 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -59,6 +59,7 @@ struct qd_dispatch_t {
     char  *router_area;
     char  *router_id;
     qd_router_mode_t  router_mode;
+    bool   allow_unsettled_multicast;
 };
 
 /**

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 53e2c59..8cfcb03 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -250,9 +250,20 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     // NOTE:  This is the only multicast mode currently supported.  Others will likely be
     //        implemented in the future.
     //
-    if (!presettled)
+    if (!presettled) {
         in_delivery->settled = true;
 
+        //
+        // If the router is configured to reject unsettled multicasts, settle and reject this delivery.
+        //
+        if (!core->qd->allow_unsettled_multicast) {
+            in_delivery->disposition = PN_REJECTED;
+            in_delivery->error = qdr_error("qd:forbidden", "Deliveries to a multicast address must be pre-settled");
+            qdr_delivery_push_CT(core, in_delivery);
+            return 0;
+        }
+    }
+
     //
     // Forward to local subscribers
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 3257980..1733b90 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -53,6 +53,11 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area,
     core->agent_log = qd_log_source("AGENT");
 
     //
+    // Report on the configuration for unsettled multicasts
+    //
+    qd_log(core->log, QD_LOG_INFO, "Allow Unsettled Multicast: %s", qd->allow_unsettled_multicast ? "yes" : "no");
+
+    //
     // Set up the threading support
     //
     core->action_cond = sys_cond();

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index e16ae33..914c736 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1081,7 +1081,8 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
         char *description = qdr_error_description(error);
         pn_condition_set_name(condition, (const char*)name);
         pn_condition_set_description(condition, (const char*)description);
-        pn_data_copy(pn_condition_info(condition), qdr_error_info(error));
+        if (qdr_error_info(error))
+            pn_data_copy(pn_condition_info(condition), qdr_error_info(error));
         //proton makes copies of name and description, so it is ok to free them here.
         free(name);
         free(description);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 2201e22..d961e3a 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -95,6 +95,7 @@ foreach(py_test_module
     system_tests_dynamic_terminus
     system_tests_log_message_components
     system_tests_failover_list
+    system_tests_denied_unsettled_multicast
     ${SYSTEM_TESTS_HTTP}
     )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/tests/system_tests_denied_unsettled_multicast.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_denied_unsettled_multicast.py b/tests/system_tests_denied_unsettled_multicast.py
new file mode 100644
index 0000000..3008fcc
--- /dev/null
+++ b/tests/system_tests_denied_unsettled_multicast.py
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess import PIPE, STDOUT
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
+    
+class RouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(RouterTest, cls).setUpClass()
+
+        def router(name):
+            
+            config = [
+                ('router', {'mode': 'standalone', 'id': name}),
+                ('listener', {'port': cls.tester.get_port()}),
+                ('address', {'prefix': 'multicast', 'distribution' : 'multicast'}),
+            ]
+            
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+        
+        inter_router_port = cls.tester.get_port()
+        
+        router('A')
+        cls.routers[0].wait_ready()
+
+
+    def test_01_default_multicast_test(self):
+        test = DeniedUnsettledMulticastTest(self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class DeniedUnsettledMulticastTest(MessagingHandler):
+    def __init__(self, host):
+        super(DeniedUnsettledMulticastTest, self).__init__()
+        self.host       = host
+        self.count      = 10
+        self.error      = None
+        self.addr       = "multicast/test"
+        self.sent_uns   = 0
+        self.sent_pres  = 0
+        self.n_received = 0
+        self.n_rejected = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired - n_received=%d n_rejected=%d" % (self.n_received, self.n_rejected)
+        self.conn.close()
+
+    def check_done(self):
+        if self.n_received == self.count and self.n_rejected == self.count:
+            self.conn.close()
+            self.timer.cancel()
+
+    def send(self):
+        while self.sent_uns < self.count:
+            m = Message(body="Unsettled %d" % self.sent_uns)
+            self.sender.send(m)
+            self.sent_uns += 1
+        while self.sent_pres < self.count:
+            m = Message(body="Presettled %d" % self.sent_pres)
+            dlv = self.sender.send(m)
+            dlv.settle()
+            self.sent_pres += 1
+
+    def on_start(self, event):
+        self.timer    = event.reactor.schedule(5, Timeout(self))
+        self.conn     = event.container.connect(self.host)
+        self.receiver = event.container.create_receiver(self.conn, self.addr)
+        self.sender   = event.container.create_sender(self.conn, self.addr)
+
+    def on_sendable(self, event):
+        self.send()
+
+    def on_message(self, event):
+        self.n_received += 1
+        self.check_done()
+
+    def on_rejected(self, event):
+        self.n_rejected += 1
+        self.check_done()
+
+    def run(self):
+        Container(self).run()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index af16065..0f0cc81 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -35,7 +35,7 @@ class RouterTest(TestCase):
         super(RouterTest, cls).setUpClass()
         name = "test-router"
         config = Qdrouterd.Config([
-            ('router', {'mode': 'standalone', 'id': 'QDR'}),
+            ('router', {'mode': 'standalone', 'id': 'QDR', 'allowUnsettledMulticast': 'yes'}),
 
             # Setting the stripAnnotations to 'no' so that the existing tests will work.
             # Setting stripAnnotations to no will not strip the annotations and any tests that were already in this file

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1715696/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 75bc545..7eec4b5 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -43,7 +43,7 @@ class RouterTest(TestCase):
         def router(name, client_server, connection):
 
             config = [
-                ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
+                ('router', {'mode': 'interior', 'id': 'QDR.%s'%name, 'allowUnsettledMulticast': 'yes'}),
 
                 ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org