You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2016/02/01 19:18:07 UTC

qpid-dispatch git commit: In management, tie policy ruleset to policy stats. Implement per-application connection count limit check.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/crolke-DISPATCH-188-1 1a8628a05 -> d1f764e3f


In management, tie policy ruleset to policy stats.
Implement per-application connection count limit check.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d1f764e3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d1f764e3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d1f764e3

Branch: refs/heads/crolke-DISPATCH-188-1
Commit: d1f764e3f27db22d4473897fc8b20664378d6720
Parents: 1a8628a
Author: Chuck Rolke <cr...@redhat.com>
Authored: Mon Feb 1 13:13:34 2016 -0500
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Mon Feb 1 13:13:34 2016 -0500

----------------------------------------------------------------------
 .../qpid_dispatch_internal/management/agent.py  |  4 +
 .../policy/policy_local.py                      | 84 +++++++++++++++++---
 .../policy/policy_manager.py                    |  3 +
 .../policy/policy_util.py                       |  9 +--
 tests/router_policy_test.py                     | 10 +++
 5 files changed, 94 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py
index d22179d..b342098 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -292,6 +292,10 @@ class PolicyRulesetEntity(EntityAdapter):
     def _identifier(self):
         return self.attributes.get('applicationName')
 
+class PolicyStatsEntity(EntityAdapter):
+    def _identifier(self):
+        return self.attributes.get('applicationName')
+
 def _addr_port_identifier(entity):
     for attr in ['addr', 'port']: # Set default values if need be
         entity.attributes.setdefault(

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/policy/policy_local.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py
index 8518381..9a3ce4d 100644
--- a/python/qpid_dispatch_internal/policy/policy_local.py
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -22,7 +22,7 @@
 """
 
 import json
-from policy_util import PolicyError, HostStruct, HostAddr
+from policy_util import PolicyError, HostStruct, HostAddr, PolicyAppConnectionMgr
 
 """
 Entity implementing the business logic of user connection/access policy.
@@ -31,6 +31,9 @@ Entity implementing the business logic of user connection/access policy.
 #
 #
 class PolicyKeys(object):
+    """
+    String constants
+    """
     # Common key words
     KW_IGNORED_NAME             = "name"
     KW_IGNORED_IDENTITY         = "identity"
@@ -60,6 +63,13 @@ class PolicyKeys(object):
     KW_SOURCES                  = "sources"
     KW_TARGETS                  = "targets"
 
+    # Policy stats key words
+    KW_CONNECTIONS_APPROVED     = "connectionsApproved"
+    KW_CONNECTIONS_DENIED       = "connectionsDenied"
+    KW_CONNECTIONS_CURRENT      = "connectionsCurrent"
+    KW_PER_USER_STATE           = "perUserState"
+    KW_PER_HOST_STATE           = "perHostState"
+
     # What settings does a user get when allowed to connect but
     # not restricted by a user group?
     KW_DEFAULT_SETTINGS         = "default"
@@ -72,6 +82,7 @@ class PolicyKeys(object):
 
     # user-to-group computed map in compiled ruleset
     RULESET_U2G_MAP             = "U2G"
+
 #
 #
 class PolicyCompiler(object):
@@ -79,7 +90,8 @@ class PolicyCompiler(object):
     Validate incoming configuration for legal schema.
     - Warn about section options that go unused.
     - Disallow negative max connection numbers.
-    - Check that connectionOrigins resolve to IP hosts
+    - Check that connectionOrigins resolve to IP hosts.
+    - Enforce internal consistency,
     """
 
     allowed_ruleset_options = [
@@ -352,9 +364,41 @@ class PolicyCompiler(object):
 
         return True
 
+
+#
+#
+class AppStats(object):
+    """
+    Maintain live state and statistics for an application.
+    """
+    def __init__(self, id, manager, ruleset):
+        self.my_id = id
+        self._manager = manager
+        self._ruleset = ruleset
+        self.conn_mgr = PolicyAppConnectionMgr(
+                ruleset[PolicyKeys.KW_MAXCONN],
+                ruleset[PolicyKeys.KW_MAXCONNPERHOST],
+                ruleset[PolicyKeys.KW_MAXCONNPERUSER])
+        self._manager.get_agent().add_implementation(self, "policyStats")
+
+    def refresh_entity(self, attributes):
+        """Refresh management attributes"""
+        attributes.update({
+            PolicyKeys.KW_APPLICATION_NAME:     self.my_id,
+            PolicyKeys.KW_CONNECTIONS_APPROVED: self.conn_mgr.connections_approved,
+            PolicyKeys.KW_CONNECTIONS_DENIED:   self.conn_mgr.connections_denied,
+            PolicyKeys.KW_CONNECTIONS_CURRENT:  self.conn_mgr.connections_active,
+            PolicyKeys.KW_PER_USER_STATE:       self.conn_mgr.per_user_state,
+            PolicyKeys.KW_PER_HOST_STATE:       self.conn_mgr.per_host_state})
+
+    def can_connect(self, conn_id, user, host, diags):
+        return self.conn_mgr.can_connect(conn_id, user, host, diags)
+
+#
+#
 class PolicyLocal(object):
     """
-    The policy database.
+    The local policy database.
     """
 
     def __init__(self, manager):
@@ -381,6 +425,13 @@ class PolicyLocal(object):
         # created by configuration
         self.settingsdb = {}
 
+        # statsdb is a map
+        #  key : <application name>
+        #  val : a map
+        #   key : stat name
+        #   val : stat value
+        self.statsdb = {}
+
         # _policy_compiler is a function
         #  validates incoming policy and readies it for internal use
         self._policy_compiler = PolicyCompiler()
@@ -406,7 +457,7 @@ class PolicyLocal(object):
                 self._manager.log_debug(warning)
         self.rulesetdb[name] = {}
         self.rulesetdb[name].update(candidate)
-        # TODO: Create stats
+        self.statsdb[name] = AppStats(name, self._manager, candidate)
         self._manager.log_info("Created policy rules for application %s" % name)
 
     def policy_read(self, name):
@@ -454,22 +505,28 @@ class PolicyLocal(object):
         Lookup function called from C.
         Determine if a user on host accessing app through AMQP Open is allowed
         according to the policy access rules.
-        If allowed then return the policy settings name
+        If allowed then return the policy settings name. If stats.can_connect
+        returns true then it has registered and counted the connection.
         @param[in] user connection authId
         @param[in] host connection remote host numeric IP address as string
         @param[in] app application user is accessing
+        @param[in] conn_name connection name used for tracking reports
         @return settings user-group name if allowed; "" if not allowed
-        # Note: the upolicy[0] output is list of group names joined with '|'.
-        TODO: handle the AccessStats
         """
         try:
             if not app in self.rulesetdb:
                 self._manager.log_trace(
-                        "lookup_user failed for user '%s', host '%s', application '%s': "
-                        "No policy defined for application" % (user, host, app))
+                    "lookup_user failed for user '%s', host '%s', application '%s': "
+                    "No policy defined for application" % (user, host, app))
                 return ""
 
             ruleset = self.rulesetdb[app]
+            if not app in self.statsdb:
+                msg = (
+                    "lookup_user failed for user '%s', host '%s', application '%s': "
+                    "Policy is defined but stats are missing" % (user, host, app))
+                raise PolicyError(msg)
+            stats = self.statsdb[app]
             # User in a group or default?
             if user in ruleset[PolicyKeys.RULESET_U2G_MAP]:
                 usergroup = ruleset[PolicyKeys.RULESET_U2G_MAP][user]
@@ -505,7 +562,14 @@ class PolicyLocal(object):
                 return ""
 
             # This user passes administrative approval.
-            # TODO: Count connection limits and possibly deny
+            # Now check live connection counts
+            diags = []
+            if not stats.can_connect(conn_name, user, host, diags):
+                for diag in diags:
+                    self._manager.log_trace(
+                        "lookup_user failed for user '%s', host '%s', application '%s': "
+                        "%s" % (user, host, app, diag))
+                return ""
 
             # Return success
             return usergroup

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/policy/policy_manager.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_manager.py b/python/qpid_dispatch_internal/policy/policy_manager.py
index ce6a683..afb3c50 100644
--- a/python/qpid_dispatch_internal/policy/policy_manager.py
+++ b/python/qpid_dispatch_internal/policy/policy_manager.py
@@ -64,6 +64,9 @@ class PolicyManager(object):
     def log_error(self, text):
         self._log(LOG_ERROR, text)
 
+    def get_agent(self):
+        return self._agent
+
     #
     # Management interface to create a ruleset
     #

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/policy/policy_util.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_util.py b/python/qpid_dispatch_internal/policy/policy_util.py
index 0b1a719..e9b94d9 100644
--- a/python/qpid_dispatch_internal/policy/policy_util.py
+++ b/python/qpid_dispatch_internal/policy/policy_util.py
@@ -309,14 +309,11 @@ class PolicyAppConnectionMgr(object):
             return True
         else:
             if not allowbytotal:
-                diags.append("LogMe: INFO user '%s' from host '%s' denied connection by total connection limit" %
-                             (user, host))
+                diags.append("Connection denied by total connection limit")
             if not allowbyuser:
-                diags.append("LogMe: INFO user '%s' from host '%s' denied connection by per user limit" %
-                             (user, host))
+                diags.append("Connection denied by per user limit")
             if not allowbyhost:
-                diags.append("LogMe: INFO user '%s' from host '%s' denied connection by per host limit" %
-                             (user, host))
+                diags.append("Connection denied by per host limit")
             self.connections_denied += 1
             return False
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/tests/router_policy_test.py
----------------------------------------------------------------------
diff --git a/tests/router_policy_test.py b/tests/router_policy_test.py
index e2b0f87..5c767c9 100644
--- a/tests/router_policy_test.py
+++ b/tests/router_policy_test.py
@@ -107,7 +107,14 @@ class PolicyHostAddrTest(TestCase):
         self.expect_deny( "::1,::2,::3", "arg count")
         self.expect_deny( "0:ff:0,0:fe:ffff:ffff::0", "a > b")
 
+class MockAgent(object):
+    def add_implementation(self, entity, cfg_obj_name):
+        pass
+
 class MockPolicyManager(object):
+    def __init__(self):
+        self.agent = MockAgent()
+
     def log_debug(self, text):
         print("DEBUG: %s" % text)
     def log_info(self, text):
@@ -117,6 +124,9 @@ class MockPolicyManager(object):
     def log_error(self, text):
         print("ERROR: %s" % text)
 
+    def get_agent(self):
+        return self.agent
+
 class PolicyFile(TestCase):
 
     manager = MockPolicyManager()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org