You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2019/11/01 14:03:31 UTC

[qpid-dispatch] branch master updated: DISPATCH-1427: Add optional per-vhost-group connection limits

This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 2efbdf9  DISPATCH-1427: Add optional per-vhost-group connection limits
2efbdf9 is described below

commit 2efbdf9baf213e9f499e18c281cf38843d3382b7
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Fri Nov 1 09:57:38 2019 -0400

    DISPATCH-1427: Add optional per-vhost-group connection limits
    
    With these settings a vhost may be configured so that users in
    a vhost group may receive different limits than the vhost has
    overall. The original motivation was to allow administrators
    more connections per user and per host than everyone else but
    the limits may be applied to raise or lower the bounds for any
    group.
---
 python/qpid_dispatch/management/qdrouter.json      | 12 ++++
 .../qpid_dispatch_internal/policy/policy_local.py  | 24 +++++--
 .../qpid_dispatch_internal/policy/policy_util.py   |  9 ++-
 tests/router_policy_test.py                        | 41 +++++++-----
 tests/system_tests_policy.py                       | 74 ++++++++++++++++++++++
 5 files changed, 137 insertions(+), 23 deletions(-)

diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index ab241b7..c862d8b 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1953,6 +1953,18 @@
                     "description": "A list of remote hosts from which the users may connect. A host can be a hostname, IP address, or IP address range. Use commas to separate multiple hosts. To allow access from all remote hosts, specify a wildcard '*'. To deny access from all remote hosts, leave this attribute blank.",
                     "required": true
                 },
+                "maxConnectionsPerUser": {
+                    "type": "integer",
+                    "description": "Optional maximum number of connections that may be created by users in this group. This value, if specified, overrides the vhost maxConnectionsPerUser value",
+                    "required": false,
+                    "create": false
+                },
+                "maxConnectionsPerHost": {
+                    "type": "integer",
+                    "description": "Optional maximum number of connections that may be created by users in this group. This value, if specified, overrides the vhost maxConnectionsPerUser value",
+                    "required": false,
+                    "create": false
+                },
                 "maxFrameSize": {
                     "type": "integer",
                     "description": "The largest frame, in bytes, that may be sent on this connection. Non-zero policy values overwrite values specified for a listener object (AMQP Open, max-frame-size).",
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py
index cff8df3..37733f1 100644
--- a/python/qpid_dispatch_internal/policy/policy_local.py
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -140,6 +140,8 @@ class PolicyCompiler(object):
     allowed_settings_options = [
         PolicyKeys.KW_USERS,
         PolicyKeys.KW_REMOTE_HOSTS,
+        PolicyKeys.KW_MAXCONNPERHOST,
+        PolicyKeys.KW_MAXCONNPERUSER,
         PolicyKeys.KW_MAX_FRAME_SIZE,
         PolicyKeys.KW_MAX_MESSAGE_SIZE,
         PolicyKeys.KW_MAX_SESSION_WINDOW,
@@ -262,6 +264,8 @@ class PolicyCompiler(object):
         policy_out[PolicyKeys.KW_TARGETS] = ''
         policy_out[PolicyKeys.KW_SOURCE_PATTERN] = ''
         policy_out[PolicyKeys.KW_TARGET_PATTERN] = ''
+        policy_out[PolicyKeys.KW_MAXCONNPERHOST] = None # optional group limit
+        policy_out[PolicyKeys.KW_MAXCONNPERUSER] = None
 
         cerror = []
         user_sources = False
@@ -272,7 +276,16 @@ class PolicyCompiler(object):
             if key not in self.allowed_settings_options:
                 warnings.append("Policy vhost '%s' user group '%s' option '%s' is ignored." %
                                 (vhostname, usergroup, key))
-            if key in [PolicyKeys.KW_MAX_FRAME_SIZE,
+            if key in [PolicyKeys.KW_MAXCONNPERHOST,
+                       PolicyKeys.KW_MAXCONNPERUSER
+                       ]:
+                if not self.validateNumber(val, 0, 65535, cerror):
+                    msg = ("Policy vhost '%s' user group '%s' option '%s' has error '%s'." % 
+                           (vhostname, usergroup, key, cerror[0]))
+                    errors.append(msg)
+                    return False
+                policy_out[key] = int(val)
+            elif key in [PolicyKeys.KW_MAX_FRAME_SIZE,
                        PolicyKeys.KW_MAX_MESSAGE_SIZE,
                        PolicyKeys.KW_MAX_RECEIVERS,
                        PolicyKeys.KW_MAX_SENDERS,
@@ -518,8 +531,8 @@ class AppStats(object):
         self._manager.get_agent().qd.qd_dispatch_policy_c_counts_refresh(self._cstats, entitymap)
         attributes.update(entitymap)
 
-    def can_connect(self, conn_id, user, host, diags):
-        return self.conn_mgr.can_connect(conn_id, user, host, diags)
+    def can_connect(self, conn_id, user, host, diags, group_max_conn_user, group_max_conn_host):
+        return self.conn_mgr.can_connect(conn_id, user, host, diags, group_max_conn_user, group_max_conn_host)
 
     def disconnect(self, conn_id, user, host):
         self.conn_mgr.disconnect(conn_id, user, host)
@@ -753,8 +766,11 @@ class PolicyLocal(object):
 
             # This user passes administrative approval.
             # Now check live connection counts
+            # Extract optional usergroup connection counts
+            group_max_conn_user = groupsettings.get(PolicyKeys.KW_MAXCONNPERUSER)
+            group_max_conn_host = groupsettings.get(PolicyKeys.KW_MAXCONNPERHOST)
             diags = []
-            if not stats.can_connect(conn_name, user, rhost, diags):
+            if not stats.can_connect(conn_name, user, rhost, diags, group_max_conn_user, group_max_conn_host):
                 for diag in diags:
                     self._manager.log_info(
                         "DENY AMQP Open for user '%s', rhost '%s', vhost '%s': "
diff --git a/python/qpid_dispatch_internal/policy/policy_util.py b/python/qpid_dispatch_internal/policy/policy_util.py
index ae7914d..00a0426 100644
--- a/python/qpid_dispatch_internal/policy/policy_util.py
+++ b/python/qpid_dispatch_internal/policy/policy_util.py
@@ -287,7 +287,7 @@ class PolicyAppConnectionMgr(object):
         self.max_per_user = maxconnperuser
         self.max_per_host = maxconnperhost
 
-    def can_connect(self, conn_id, user, host, diags):
+    def can_connect(self, conn_id, user, host, diags, grp_max_user, grp_max_host):
         """
         Register a connection attempt.
         If all the connection limit rules pass then add the
@@ -305,9 +305,12 @@ class PolicyAppConnectionMgr(object):
         if host in self.per_host_state:
             n_host = len(self.per_host_state[host])
 
+        max_per_user = grp_max_user if grp_max_user is not None else self.max_per_user
+        max_per_host = grp_max_host if grp_max_host is not None else self.max_per_host
+
         allowbytotal = self.connections_active < self.max_total
-        allowbyuser  = n_user < self.max_per_user
-        allowbyhost  = n_host < self.max_per_host
+        allowbyuser  = n_user < max_per_user
+        allowbyhost  = n_host < max_per_host
 
         if allowbytotal and allowbyuser and allowbyhost:
             if not user in self.per_user_state:
diff --git a/tests/router_policy_test.py b/tests/router_policy_test.py
index e6f749c..76108c5 100644
--- a/tests/router_policy_test.py
+++ b/tests/router_policy_test.py
@@ -238,57 +238,66 @@ class PolicyAppConnectionMgrTests(TestCase):
     def test_policy_app_conn_mgr_fail_by_total(self):
         stats = PolicyAppConnectionMgr(1, 2, 2)
         diags = []
-        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
-        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None))
         self.assertTrue(len(diags) == 1)
         self.assertTrue('application connection limit' in diags[0])
 
     def test_policy_app_conn_mgr_fail_by_user(self):
         stats = PolicyAppConnectionMgr(3, 1, 2)
         diags = []
-        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
-        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None))
         self.assertTrue(len(diags) == 1)
         self.assertTrue('per user' in diags[0])
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10002', 'chuck', '10.10.10.10', diags, 2, None))
+        self.assertFalse(stats.can_connect('10.10.10.10:10003', 'chuck', '10.10.10.10', diags, 2, None))
 
     def test_policy_app_conn_mgr_fail_by_hosts(self):
         stats = PolicyAppConnectionMgr(3, 2, 1)
         diags = []
-        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
-        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None))
         self.assertTrue(len(diags) == 1)
         self.assertTrue('per host' in diags[0])
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10002', 'chuck', '10.10.10.10', diags, None, 2))
+        self.assertFalse(stats.can_connect('10.10.10.10:10003', 'chuck', '10.10.10.10', diags, None, 2))
 
     def test_policy_app_conn_mgr_fail_by_user_hosts(self):
         stats = PolicyAppConnectionMgr(3, 1, 1)
         diags = []
-        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
-        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None))
         self.assertTrue(len(diags) == 2)
         self.assertTrue('per user' in diags[0] or 'per user' in diags[1])
         self.assertTrue('per host' in diags[0] or 'per host' in diags[1])
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10002', 'chuck', '10.10.10.10', diags, 2, 2))
+        self.assertFalse(stats.can_connect('10.10.10.10:10003', 'chuck', '10.10.10.10', diags, 2, 2))
 
     def test_policy_app_conn_mgr_update(self):
         stats = PolicyAppConnectionMgr(3, 1, 2)
         diags = []
-        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
-        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None))
         self.assertTrue(len(diags) == 1)
         self.assertTrue('per user' in diags[0])
         diags = []
         stats.update(3, 2, 2)
-        self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None))
 
     def test_policy_app_conn_mgr_disconnect(self):
         stats = PolicyAppConnectionMgr(3, 1, 2)
         diags = []
-        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
-        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags, None, None))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None))
         self.assertTrue(len(diags) == 1)
         self.assertTrue('per user' in diags[0])
         diags = []
         stats.disconnect("10.10.10.10:10000", 'chuck', '10.10.10.10')
-        self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags, None, None))
 
     def test_policy_app_conn_mgr_create_bad_settings(self):
         denied = False
@@ -315,9 +324,9 @@ class PolicyAppConnectionMgrTests(TestCase):
         stats = PolicyAppConnectionMgr(10000, 10000, 10000)
         diags = []
         for i in range(0, 10000):
-            self.assertTrue(stats.can_connect('1.1.1.1:' + str(i), 'chuck', '1.1.1.1', diags))
+            self.assertTrue(stats.can_connect('1.1.1.1:' + str(i), 'chuck', '1.1.1.1', diags, None, None))
             self.assertTrue(len(diags) == 0)
-        self.assertFalse(stats.can_connect('1.1.1.1:10000', 'chuck', '1.1.1.1', diags))
+        self.assertFalse(stats.can_connect('1.1.1.1:10000', 'chuck', '1.1.1.1', diags, None, None))
         self.assertTrue(len(diags) == 3)
         self.assertTrue(stats.connections_active == 10000)
         self.assertTrue(stats.connections_approved == 10000)
diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py
index 9aa5262..e19d402 100644
--- a/tests/system_tests_policy.py
+++ b/tests/system_tests_policy.py
@@ -1171,6 +1171,80 @@ class VhostPolicyFromRouterConfig(TestCase):
                             msg="source address must not be allowed, but it was [%s]" % source_addr)
 
 
+class VhostPolicyConnLimit(TestCase):
+    """
+    Verify that connections beyond the vhost limit are allowed
+    if override specified in vhost.group.
+    """
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(VhostPolicyConnLimit, cls).setUpClass()
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
+            ('listener', {'port': cls.tester.get_port()}),
+            ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
+            ('vhost', {
+                'hostname': '0.0.0.0', 'maxConnections': 100,
+                'maxConnectionsPerUser': 2,
+                'allowUnknownUser': 'true',
+                'groups': [(
+                    '$default', {
+                        'users': '*', 'remoteHosts': '*',
+                        'sources': '*', 'targets': '*',
+                        'allowDynamicSource': 'true',
+                        'maxConnectionsPerUser': 3
+                    }
+                ), (
+                    'anonymous', {
+                        'users': 'anonymous', 'remoteHosts': '*',
+                        'sourcePattern': 'addr/*/queue/*, simpleaddress, queue.${user}',
+                        'targets': 'addr/*, simpleaddress, queue.${user}',
+                        'allowDynamicSource': 'true',
+                        'allowAnonymousSender': 'true',
+                        'maxConnectionsPerUser': 3
+                    }
+                )]
+            })
+        ])
+
+        cls.router = cls.tester.qdrouterd('vhost-policy-conn-limit', config, wait=True)
+
+    def address(self):
+        return self.router.addresses[0]
+
+    def test_verify_vhost_maximum_connections_override(self):
+        addr = "%s/$management" % self.address()
+        timeout = 5
+
+        # three connections should be ok
+        denied = False
+        try:
+            bc1 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
+            bc2 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
+            bc3 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
+        except ConnectionException:
+            denied = True
+        except Timeout:
+            denied = True
+
+        self.assertFalse(denied)  # assert connections were opened
+
+        # fourth connection should be denied
+        denied = False
+        try:
+            bc4 = SyncRequestResponse(BlockingConnection(addr, timeout=timeout))
+        except ConnectionException:
+            denied = True
+        except Timeout:
+            denied = True
+
+        self.assertTrue(denied)  # assert if connection that should not open did open
+
+        bc1.connection.close()
+        bc2.connection.close()
+        bc3.connection.close()
+
 class ClientAddressValidator(MessagingHandler):
     """
     Base client class used to validate vhost policies through


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org