You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2016/02/01 19:18:07 UTC
qpid-dispatch git commit: In management,
tie policy ruleset to policy stats. Implement per-application
connection count limit check.
Repository: qpid-dispatch
Updated Branches:
refs/heads/crolke-DISPATCH-188-1 1a8628a05 -> d1f764e3f
In management, tie policy ruleset to policy stats.
Implement per-application connection count limit check.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d1f764e3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d1f764e3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d1f764e3
Branch: refs/heads/crolke-DISPATCH-188-1
Commit: d1f764e3f27db22d4473897fc8b20664378d6720
Parents: 1a8628a
Author: Chuck Rolke <cr...@redhat.com>
Authored: Mon Feb 1 13:13:34 2016 -0500
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Mon Feb 1 13:13:34 2016 -0500
----------------------------------------------------------------------
.../qpid_dispatch_internal/management/agent.py | 4 +
.../policy/policy_local.py | 84 +++++++++++++++++---
.../policy/policy_manager.py | 3 +
.../policy/policy_util.py | 9 +--
tests/router_policy_test.py | 10 +++
5 files changed, 94 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py
index d22179d..b342098 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -292,6 +292,10 @@ class PolicyRulesetEntity(EntityAdapter):
def _identifier(self):
return self.attributes.get('applicationName')
+class PolicyStatsEntity(EntityAdapter):
+ def _identifier(self):
+ return self.attributes.get('applicationName')
+
def _addr_port_identifier(entity):
for attr in ['addr', 'port']: # Set default values if need be
entity.attributes.setdefault(
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/policy/policy_local.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py
index 8518381..9a3ce4d 100644
--- a/python/qpid_dispatch_internal/policy/policy_local.py
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -22,7 +22,7 @@
"""
import json
-from policy_util import PolicyError, HostStruct, HostAddr
+from policy_util import PolicyError, HostStruct, HostAddr, PolicyAppConnectionMgr
"""
Entity implementing the business logic of user connection/access policy.
@@ -31,6 +31,9 @@ Entity implementing the business logic of user connection/access policy.
#
#
class PolicyKeys(object):
+ """
+ String constants
+ """
# Common key words
KW_IGNORED_NAME = "name"
KW_IGNORED_IDENTITY = "identity"
@@ -60,6 +63,13 @@ class PolicyKeys(object):
KW_SOURCES = "sources"
KW_TARGETS = "targets"
+ # Policy stats key words
+ KW_CONNECTIONS_APPROVED = "connectionsApproved"
+ KW_CONNECTIONS_DENIED = "connectionsDenied"
+ KW_CONNECTIONS_CURRENT = "connectionsCurrent"
+ KW_PER_USER_STATE = "perUserState"
+ KW_PER_HOST_STATE = "perHostState"
+
# What settings does a user get when allowed to connect but
# not restricted by a user group?
KW_DEFAULT_SETTINGS = "default"
@@ -72,6 +82,7 @@ class PolicyKeys(object):
# user-to-group computed map in compiled ruleset
RULESET_U2G_MAP = "U2G"
+
#
#
class PolicyCompiler(object):
@@ -79,7 +90,8 @@ class PolicyCompiler(object):
Validate incoming configuration for legal schema.
- Warn about section options that go unused.
- Disallow negative max connection numbers.
- - Check that connectionOrigins resolve to IP hosts
+ - Check that connectionOrigins resolve to IP hosts.
+ - Enforce internal consistency,
"""
allowed_ruleset_options = [
@@ -352,9 +364,41 @@ class PolicyCompiler(object):
return True
+
+#
+#
+class AppStats(object):
+ """
+ Maintain live state and statistics for an application.
+ """
+ def __init__(self, id, manager, ruleset):
+ self.my_id = id
+ self._manager = manager
+ self._ruleset = ruleset
+ self.conn_mgr = PolicyAppConnectionMgr(
+ ruleset[PolicyKeys.KW_MAXCONN],
+ ruleset[PolicyKeys.KW_MAXCONNPERHOST],
+ ruleset[PolicyKeys.KW_MAXCONNPERUSER])
+ self._manager.get_agent().add_implementation(self, "policyStats")
+
+ def refresh_entity(self, attributes):
+ """Refresh management attributes"""
+ attributes.update({
+ PolicyKeys.KW_APPLICATION_NAME: self.my_id,
+ PolicyKeys.KW_CONNECTIONS_APPROVED: self.conn_mgr.connections_approved,
+ PolicyKeys.KW_CONNECTIONS_DENIED: self.conn_mgr.connections_denied,
+ PolicyKeys.KW_CONNECTIONS_CURRENT: self.conn_mgr.connections_active,
+ PolicyKeys.KW_PER_USER_STATE: self.conn_mgr.per_user_state,
+ PolicyKeys.KW_PER_HOST_STATE: self.conn_mgr.per_host_state})
+
+ def can_connect(self, conn_id, user, host, diags):
+ return self.conn_mgr.can_connect(conn_id, user, host, diags)
+
+#
+#
class PolicyLocal(object):
"""
- The policy database.
+ The local policy database.
"""
def __init__(self, manager):
@@ -381,6 +425,13 @@ class PolicyLocal(object):
# created by configuration
self.settingsdb = {}
+ # statsdb is a map
+ # key : <application name>
+ # val : a map
+ # key : stat name
+ # val : stat value
+ self.statsdb = {}
+
# _policy_compiler is a function
# validates incoming policy and readies it for internal use
self._policy_compiler = PolicyCompiler()
@@ -406,7 +457,7 @@ class PolicyLocal(object):
self._manager.log_debug(warning)
self.rulesetdb[name] = {}
self.rulesetdb[name].update(candidate)
- # TODO: Create stats
+ self.statsdb[name] = AppStats(name, self._manager, candidate)
self._manager.log_info("Created policy rules for application %s" % name)
def policy_read(self, name):
@@ -454,22 +505,28 @@ class PolicyLocal(object):
Lookup function called from C.
Determine if a user on host accessing app through AMQP Open is allowed
according to the policy access rules.
- If allowed then return the policy settings name
+ If allowed then return the policy settings name. If stats.can_connect
+ returns true then it has registered and counted the connection.
@param[in] user connection authId
@param[in] host connection remote host numeric IP address as string
@param[in] app application user is accessing
+ @param[in] conn_name connection name used for tracking reports
@return settings user-group name if allowed; "" if not allowed
- # Note: the upolicy[0] output is list of group names joined with '|'.
- TODO: handle the AccessStats
"""
try:
if not app in self.rulesetdb:
self._manager.log_trace(
- "lookup_user failed for user '%s', host '%s', application '%s': "
- "No policy defined for application" % (user, host, app))
+ "lookup_user failed for user '%s', host '%s', application '%s': "
+ "No policy defined for application" % (user, host, app))
return ""
ruleset = self.rulesetdb[app]
+ if not app in self.statsdb:
+ msg = (
+ "lookup_user failed for user '%s', host '%s', application '%s': "
+ "Policy is defined but stats are missing" % (user, host, app))
+ raise PolicyError(msg)
+ stats = self.statsdb[app]
# User in a group or default?
if user in ruleset[PolicyKeys.RULESET_U2G_MAP]:
usergroup = ruleset[PolicyKeys.RULESET_U2G_MAP][user]
@@ -505,7 +562,14 @@ class PolicyLocal(object):
return ""
# This user passes administrative approval.
- # TODO: Count connection limits and possibly deny
+ # Now check live connection counts
+ diags = []
+ if not stats.can_connect(conn_name, user, host, diags):
+ for diag in diags:
+ self._manager.log_trace(
+ "lookup_user failed for user '%s', host '%s', application '%s': "
+ "%s" % (user, host, app, diag))
+ return ""
# Return success
return usergroup
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/policy/policy_manager.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_manager.py b/python/qpid_dispatch_internal/policy/policy_manager.py
index ce6a683..afb3c50 100644
--- a/python/qpid_dispatch_internal/policy/policy_manager.py
+++ b/python/qpid_dispatch_internal/policy/policy_manager.py
@@ -64,6 +64,9 @@ class PolicyManager(object):
def log_error(self, text):
self._log(LOG_ERROR, text)
+ def get_agent(self):
+ return self._agent
+
#
# Management interface to create a ruleset
#
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/policy/policy_util.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_util.py b/python/qpid_dispatch_internal/policy/policy_util.py
index 0b1a719..e9b94d9 100644
--- a/python/qpid_dispatch_internal/policy/policy_util.py
+++ b/python/qpid_dispatch_internal/policy/policy_util.py
@@ -309,14 +309,11 @@ class PolicyAppConnectionMgr(object):
return True
else:
if not allowbytotal:
- diags.append("LogMe: INFO user '%s' from host '%s' denied connection by total connection limit" %
- (user, host))
+ diags.append("Connection denied by total connection limit")
if not allowbyuser:
- diags.append("LogMe: INFO user '%s' from host '%s' denied connection by per user limit" %
- (user, host))
+ diags.append("Connection denied by per user limit")
if not allowbyhost:
- diags.append("LogMe: INFO user '%s' from host '%s' denied connection by per host limit" %
- (user, host))
+ diags.append("Connection denied by per host limit")
self.connections_denied += 1
return False
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/tests/router_policy_test.py
----------------------------------------------------------------------
diff --git a/tests/router_policy_test.py b/tests/router_policy_test.py
index e2b0f87..5c767c9 100644
--- a/tests/router_policy_test.py
+++ b/tests/router_policy_test.py
@@ -107,7 +107,14 @@ class PolicyHostAddrTest(TestCase):
self.expect_deny( "::1,::2,::3", "arg count")
self.expect_deny( "0:ff:0,0:fe:ffff:ffff::0", "a > b")
+class MockAgent(object):
+ def add_implementation(self, entity, cfg_obj_name):
+ pass
+
class MockPolicyManager(object):
+ def __init__(self):
+ self.agent = MockAgent()
+
def log_debug(self, text):
print("DEBUG: %s" % text)
def log_info(self, text):
@@ -117,6 +124,9 @@ class MockPolicyManager(object):
def log_error(self, text):
print("ERROR: %s" % text)
+ def get_agent(self):
+ return self.agent
+
class PolicyFile(TestCase):
manager = MockPolicyManager()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org