You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2016/03/30 19:13:39 UTC

[1/3] qpid-dispatch git commit: DISPATCH-188: Implement policy. merge from branch crolke-DISPATCH-188-1.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master f93d5ce9c -> 48e816208


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy-2/test-router-with-policy.json.in
----------------------------------------------------------------------
diff --git a/tests/policy-2/test-router-with-policy.json.in b/tests/policy-2/test-router-with-policy.json.in
new file mode 100644
index 0000000..5f96fdf
--- /dev/null
+++ b/tests/policy-2/test-router-with-policy.json.in
@@ -0,0 +1,170 @@
+[
+    ["container", {
+	"containerName": "dispatch",
+	"saslConfigName": "policy-photoserver-sasl",
+	"saslConfigPath": "${CMAKE_CURRENT_BINARY_DIR}/policy-2",
+	"debugDump": "qddebug.txt"
+    }],
+    ["listener", {
+	"addr": "0.0.0.0",
+	"saslMechanisms": "ANONYMOUS PLAIN",
+	"authenticatePeer": "no",
+	"idleTimeoutSeconds": "120",
+	"port": 21000
+    }],
+    ["listener", {
+	"addr": "0.0.0.0",
+	"saslMechanisms": "PLAIN",
+	"authenticatePeer": "no",
+	"idleTimeoutSeconds": "120",
+	"port": 21001
+    }],
+    ["log", {
+	"source": "true",
+	"enable": "trace+",
+	"module": "DEFAULT"
+    }],
+    ["policy", {
+	"maximumConnections": 20,
+	"enableAccessRules": "true"
+    }],
+# Some ruleset
+    ["policyRuleset", {
+      "applicationName": "photoserver",
+      "maxConnections": 50,
+      "maxConnPerUser": 5,
+      "maxConnPerHost": 20,
+      "userGroups": {
+        "anonymous":       "anonymous",
+        "users":           "u1, u2",
+        "paidsubscribers": "p1, p2",
+        "test":            "zeke, ynot",
+        "admin":           "alice, bob",
+        "superuser":       "ellen"
+      },
+      "ingressHostGroups": {
+        "Ten18":     "10.18.0.0-10.18.255.255",
+        "EllensWS":  "72.135.2.9",
+        "TheLabs":   "10.48.0.0-10.48.255.255, 192.168.100.0-192.168.100.255",
+        "localhost": "127.0.0.1, ::1",
+        "TheWorld":  "*"
+      },
+      "ingressPolicies": {
+        "anonymous":       "TheWorld",
+        "users":           "TheWorld",
+        "paidsubscribers": "TheWorld",
+        "test":            "TheLabs",
+        "admin":           "Ten18, TheLabs, localhost",
+        "superuser":       "EllensWS, localhost"
+      },
+      "connectionAllowDefault": true,
+      "settings": {
+        "anonymous" : {
+          "maxFrameSize":     111111,
+          "maxMessageSize":   111111,
+          "maxSessionWindow": 111111,
+          "maxSessions":           1,
+          "maxSenders":           11,
+          "maxReceivers":         11,
+          "allowDynamicSrc":      false,
+          "allowAnonymousSender": false,
+          "sources": "public",
+          "targets": ""
+        },
+        "users" : {
+          "maxFrameSize":     222222,
+          "maxMessageSize":   222222,
+          "maxSessionWindow": 222222,
+          "maxSessions":           2,
+          "maxSenders":           22,
+          "maxReceivers":         22,
+          "allowDynamicSrc":      false,
+          "allowAnonymousSender": false,
+          "sources": "public, private",
+          "targets": "public"
+        },
+        "paidsubscribers" : {
+          "maxFrameSize":     333333,
+          "maxMessageSize":   333333,
+          "maxSessionWindow": 333333,
+          "maxSessions":           3,
+          "maxSenders":           33,
+          "maxReceivers":         33,
+          "allowDynamicSrc":      true,
+          "allowAnonymousSender": false,
+          "sources": "public, private",
+          "targets": "public, private"
+        },
+        "test" : {
+          "maxFrameSize":     444444,
+          "maxMessageSize":   444444,
+          "maxSessionWindow": 444444,
+          "maxSessions":           4,
+          "maxSenders":           44,
+          "maxReceivers":         44,
+          "allowDynamicSrc":      true,
+          "allowAnonymousSender": true,
+          "sources": "private",
+          "targets": "private"
+        },
+        "admin" : {
+          "maxFrameSize":     555555,
+          "maxMessageSize":   555555,
+          "maxSessionWindow": 555555,
+          "maxSessions":           5,
+          "maxSenders":           55,
+          "maxReceivers":         55,
+          "allowDynamicSrc":      true,
+          "allowAnonymousSender": true,
+          "sources": "public, private, management",
+          "targets": "public, private, management"
+        },
+        "superuser" : {
+          "maxFrameSize":     666666,
+          "maxMessageSize":   666666,
+          "maxSessionWindow": 666666,
+          "maxSessions":           6,
+          "maxSenders":           66,
+          "maxReceivers":         66,
+          "allowDynamicSrc":      false,
+          "allowAnonymousSender": false,
+          "sources": "public, private, management, root",
+          "targets": "public, private, management, root"
+        },
+        "default" : {
+          "maxFrameSize":     222222,
+          "maxMessageSize":   222222,
+          "maxSessionWindow": 222222,
+          "maxSessions":           2,
+          "maxSenders":           22,
+          "maxReceivers":         22,
+          "allowDynamicSrc":      false,
+          "allowAnonymousSender": false,
+          "sources": "public, private",
+          "targets": "public"
+        }
+      }
+  }],
+  ["policyRuleset", {
+      "applicationName": "0.0.0.0",
+      "maxConnections": 50,
+      "maxConnPerUser": 5,
+      "maxConnPerHost": 20,
+      "connectionAllowDefault": true,
+      "settings": {
+        "default" : {
+          "maxFrameSize":     222222,
+          "maxMessageSize":   222222,
+          "maxSessionWindow": 222222,
+          "maxSessions":           2,
+          "maxSenders":           22,
+          "maxReceivers":         22,
+          "allowDynamicSrc":      true,
+          "allowAnonymousSender": true,
+          "sources": "public, private, $management",
+          "targets": "public, private, $management"
+        }
+      }
+    }
+  ]
+]

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy_test.c
----------------------------------------------------------------------
diff --git a/tests/policy_test.c b/tests/policy_test.c
new file mode 100644
index 0000000..0061169
--- /dev/null
+++ b/tests/policy_test.c
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_case.h"
+#include <stdio.h>
+#include <string.h>
+#include "policy.h"
+#include "policy_internal.h"
+
+static char *test_link_name_lookup(void *context)
+{
+    // Degenerate blank names
+    if (_qd_policy_approve_link_name("a", "a", ""))
+	return "blank proposed name not rejected";
+    if (_qd_policy_approve_link_name("a", "", "a"))
+	return "blank allowed list not rejected";
+
+    // Easy matches
+    if (!_qd_policy_approve_link_name("", "joe", "joe"))
+        return "proposed link 'joe' should match allowed links 'joe' but does not";
+    if (_qd_policy_approve_link_name("", "joe", "joey"))
+        return "proposed link 'joey' should not match allowed links 'joe' but does";
+
+    // Wildcard matches
+    if (!_qd_policy_approve_link_name("", "joe*", "joey"))
+        return "proposed link 'joey' should match allowed links 'joe*' but does not";
+    if (!_qd_policy_approve_link_name("", "joe*", "joezzzZZZ"))
+        return "proposed link 'joezzzZZZ' should match allowed links 'joe*' but does not";
+    if (!_qd_policy_approve_link_name("", "joe,*", "joey"))
+        return "proposed link 'joey' should match allowed links 'joe,*' but does not";
+
+    // Deeper match
+    if (!_qd_policy_approve_link_name("", "no1,no2,no3,yes,no4", "yes"))
+        return "proposed link 'yes' should match allowed links 'no1,no2,no3,yes,no4' but does not";
+
+    // Deeeper match - triggers malloc/free internal handler
+    char * bufp = (char *)malloc(512 * 5 + 6);
+    char * wp = bufp;
+    int i;
+    for (i=0; i<512; i++) {
+        wp += sprintf(wp, "n%03d,", i);
+    }
+    sprintf(wp, "yes");
+    if (!_qd_policy_approve_link_name("", bufp, "yes")) {
+        free(bufp);
+        return "proposed link 'yes' should match allowed large list but does not";
+    }
+    free(bufp);
+
+    // Substitute a user name
+    if (!_qd_policy_approve_link_name("chuck", "ab${user}xyz", "abchuckxyz"))
+        return "proposed link 'abchuckxyz' should match allowed links with ${user} but does not";
+    if (!_qd_policy_approve_link_name("chuck", "${user}xyz", "chuckxyz"))
+        return "proposed link 'chuckxyz' should match allowed links with ${user} but does not";
+    if (!_qd_policy_approve_link_name("chuck", "ab${user}", "abchuck"))
+        return "proposed link 'abchuck' should match allowed links with ${user} but does not";
+
+    // Combine user name and wildcard
+    if (!_qd_policy_approve_link_name("chuck", "ab${user}*", "abchuckzyxw"))
+        return "proposed link 'abchuckzyxw' should match allowed links with ${user}* but does not";
+    
+    return 0;
+}
+
+int policy_tests(void)
+{
+    int result = 0;
+
+    TEST_CASE(test_link_name_lookup, 0);
+
+    return result;
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/router_policy_test.py
----------------------------------------------------------------------
diff --git a/tests/router_policy_test.py b/tests/router_policy_test.py
new file mode 100644
index 0000000..99165c9
--- /dev/null
+++ b/tests/router_policy_test.py
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+from qpid_dispatch_internal.policy.policy_util import HostAddr
+from qpid_dispatch_internal.policy.policy_util import HostStruct
+from qpid_dispatch_internal.policy.policy_util import PolicyError
+from qpid_dispatch_internal.policy.policy_util import PolicyAppConnectionMgr
+from qpid_dispatch_internal.policy.policy_local import PolicyLocal
+from system_test import TestCase, main_module
+
+class PolicyHostAddrTest(TestCase):
+
+    def expect_deny(self, badhostname, msg):
+        denied = False
+        try:
+            xxx = HostStruct(badhostname)
+        except PolicyError:
+            denied = True
+        self.assertTrue(denied, ("%s" % msg))
+
+    def check_hostaddr_match(self, tHostAddr, tString, expectOk=True):
+        # check that the string is a match for the addr
+        # check that the internal struct version matches, too
+        ha = HostStruct(tString)
+        if expectOk:
+            self.assertTrue( tHostAddr.match_str(tString) )
+            self.assertTrue( tHostAddr.match_bin(ha) )
+        else:
+            self.assertFalse( tHostAddr.match_str(tString) )
+            self.assertFalse( tHostAddr.match_bin(ha) )
+
+    def test_policy_hostaddr_ipv4(self):
+        # Create simple host and range
+        aaa = HostAddr("192.168.1.1")
+        bbb = HostAddr("1.1.1.1,1.1.1.255")
+        # Verify host and range
+        self.check_hostaddr_match(aaa, "192.168.1.1")
+        self.check_hostaddr_match(aaa, "1.1.1.1", False)
+        self.check_hostaddr_match(aaa, "192.168.1.2", False)
+        self.check_hostaddr_match(bbb, "1.1.1.1")
+        self.check_hostaddr_match(bbb, "1.1.1.254")
+        self.check_hostaddr_match(bbb, "1.1.1.0", False)
+        self.check_hostaddr_match(bbb, "1.1.2.0", False)
+
+    def test_policy_hostaddr_ipv6(self):
+        if not HostAddr.has_ipv6:
+            self.skipTest("System IPv6 support is not available")
+        # Create simple host and range
+        aaa = HostAddr("::1")
+        bbb = HostAddr("::1,::ffff")
+        ccc = HostAddr("ffff::0,ffff:ffff::0")
+        # Verify host and range
+        self.check_hostaddr_match(aaa, "::1")
+        self.check_hostaddr_match(aaa, "::2", False)
+        self.check_hostaddr_match(aaa, "ffff:ffff::0", False)
+        self.check_hostaddr_match(bbb, "::1")
+        self.check_hostaddr_match(bbb, "::fffe")
+        self.check_hostaddr_match(bbb, "::1:0", False)
+        self.check_hostaddr_match(bbb, "ffff::0", False)
+        self.check_hostaddr_match(ccc, "ffff::1")
+        self.check_hostaddr_match(ccc, "ffff:fffe:ffff:ffff::ffff")
+        self.check_hostaddr_match(ccc, "ffff:ffff::1", False)
+        self.check_hostaddr_match(ccc, "ffff:ffff:ffff:ffff::ffff", False)
+
+    def test_policy_hostaddr_ipv4_wildcard(self):
+        aaa = HostAddr("*")
+        self.check_hostaddr_match(aaa,"0.0.0.0")
+        self.check_hostaddr_match(aaa,"127.0.0.1")
+        self.check_hostaddr_match(aaa,"255.254.253.252")
+
+
+    def test_policy_hostaddr_ipv6_wildcard(self):
+        if not HostAddr.has_ipv6:
+            self.skipTest("System IPv6 support is not available")
+        aaa = HostAddr("*")
+        self.check_hostaddr_match(aaa,"::0")
+        self.check_hostaddr_match(aaa,"::1")
+        self.check_hostaddr_match(aaa,"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
+
+    def test_policy_malformed_hostaddr_ipv4(self):
+        self.expect_deny( "0.0.0.0.0", "Name or service not known")
+        self.expect_deny( "1.1.1.1,2.2.2.2,3.3.3.3", "arg count")
+        self.expect_deny( "9.9.9.9,8.8.8.8", "a > b")
+
+    def test_policy_malformed_hostaddr_ipv6(self):
+        if not HostAddr.has_ipv6:
+            self.skipTest("System IPv6 support is not available")
+        self.expect_deny( "1::2::3", "Name or service not known")
+        self.expect_deny( "::1,::2,::3", "arg count")
+        self.expect_deny( "0:ff:0,0:fe:ffff:ffff::0", "a > b")
+
+class QpidDispatch(object):
+    def qd_dispatch_policy_c_counts_alloc(self):
+        return 100
+
+    def qd_dispatch_policy_c_counts_refresh(self, cstats, entitymap):
+        pass
+
+class MockAgent(object):
+    def __init__(self):
+        self.qd = QpidDispatch()
+
+    def add_implementation(self, entity, cfg_obj_name):
+        pass
+
+class MockPolicyManager(object):
+    def __init__(self):
+        self.agent = MockAgent()
+
+    def log_debug(self, text):
+        print("DEBUG: %s" % text)
+    def log_info(self, text):
+        print("INFO: %s" % text)
+    def log_trace(self, text):
+        print("TRACE: %s" % text)
+    def log_error(self, text):
+        print("ERROR: %s" % text)
+
+    def get_agent(self):
+        return self.agent
+
+class PolicyFile(TestCase):
+
+    manager = MockPolicyManager()
+    policy = PolicyLocal(manager)
+    policy.test_load_config()
+
+    def dict_compare(self, d1, d2):
+        d1_keys = set(d1.keys())
+        d2_keys = set(d2.keys())
+        intersect_keys = d1_keys.intersection(d2_keys)
+        added = d1_keys - d2_keys
+        removed = d2_keys - d1_keys
+        modified = {o : (d1[o], d2[o]) for o in intersect_keys if d1[o] != d2[o]}
+        same = set(o for o in intersect_keys if d1[o] == d2[o])
+        return len(added) == 0 and len(removed) == 0 and len(modified) == 0
+
+    def test_policy1_test_zeke_ok(self):
+        p1 = PolicyFile.policy.lookup_user('zeke', '192.168.100.5', 'photoserver', '192.168.100.5:33333', 1)
+        self.assertTrue(p1 == 'test')
+        upolicy = {}
+        self.assertTrue(
+            PolicyFile.policy.lookup_settings('photoserver', p1, upolicy)
+        )
+        self.assertTrue(upolicy['maxFrameSize']            == 444444)
+        self.assertTrue(upolicy['maxMessageSize']          == 444444)
+        self.assertTrue(upolicy['maxSessionWindow']        == 444444)
+        self.assertTrue(upolicy['maxSessions']              == 4)
+        self.assertTrue(upolicy['maxSenders']               == 44)
+        self.assertTrue(upolicy['maxReceivers']             == 44)
+        self.assertTrue(upolicy['allowAnonymousSender'])
+        self.assertTrue(upolicy['allowDynamicSrc'])
+        self.assertTrue(upolicy['targets'] == 'private')
+        self.assertTrue(upolicy['sources'] == 'private')
+
+    def test_policy1_test_zeke_bad_IP(self):
+        self.assertTrue(
+            PolicyFile.policy.lookup_user('zeke', '10.18.0.1',    'photoserver', "connid", 2) == '')
+        self.assertTrue(
+            PolicyFile.policy.lookup_user('zeke', '72.135.2.9',   'photoserver', "connid", 3) == '')
+        self.assertTrue(
+            PolicyFile.policy.lookup_user('zeke', '127.0.0.1',    'photoserver', "connid", 4) == '')
+
+    def test_policy1_test_zeke_bad_app(self):
+        self.assertTrue(
+            PolicyFile.policy.lookup_user('zeke', '192.168.100.5','galleria', "connid", 5) == '')
+
+    def test_policy1_test_users_same_permissions(self):
+        zname = PolicyFile.policy.lookup_user('zeke', '192.168.100.5', 'photoserver', '192.168.100.5:33333', 6)
+        yname = PolicyFile.policy.lookup_user('ynot', '10.48.255.254', 'photoserver', '192.168.100.5:33334', 7)
+        self.assertTrue( zname == yname )
+
+    def test_policy1_lookup_unknown_application(self):
+        upolicy = {}
+        self.assertFalse(
+            PolicyFile.policy.lookup_settings('unknown', 'doesntmatter', upolicy)
+        )
+
+    def test_policy1_lookup_unknown_usergroup(self):
+        upolicy = {}
+        self.assertFalse(
+            PolicyFile.policy.lookup_settings('photoserver', 'unknown', upolicy)
+        )
+
+class PolicyAppConnectionMgrTests(TestCase):
+
+    def test_policy_app_conn_mgr_fail_by_total(self):
+        stats = PolicyAppConnectionMgr(1, 2, 2)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(len(diags) == 1)
+        self.assertTrue('by total' in diags[0])
+
+    def test_policy_app_conn_mgr_fail_by_user(self):
+        stats = PolicyAppConnectionMgr(3, 1, 2)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(len(diags) == 1)
+        self.assertTrue('per user' in diags[0])
+
+    def test_policy_app_conn_mgr_fail_by_hosts(self):
+        stats = PolicyAppConnectionMgr(3, 2, 1)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(len(diags) == 1)
+        self.assertTrue('per host' in diags[0])
+
+    def test_policy_app_conn_mgr_fail_by_user_hosts(self):
+        stats = PolicyAppConnectionMgr(3, 1, 1)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(len(diags) == 2)
+        self.assertTrue('per user' in diags[0] or 'per user' in diags[1])
+        self.assertTrue('per host' in diags[0] or 'per host' in diags[1])
+
+    def test_policy_app_conn_mgr_update(self):
+        stats = PolicyAppConnectionMgr(3, 1, 2)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(len(diags) == 1)
+        self.assertTrue('per user' in diags[0])
+        diags = []
+        stats.update(3, 2, 2)
+        self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+
+    def test_policy_app_conn_mgr_disconnect(self):
+        stats = PolicyAppConnectionMgr(3, 1, 2)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+        self.assertTrue(len(diags) == 1)
+        self.assertTrue('per user' in diags[0])
+        diags = []
+        stats.disconnect("10.10.10.10:10000", 'chuck', '10.10.10.10')
+        self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags))
+
+    def test_policy_app_conn_mgr_create_bad_settings(self):
+        denied = False
+        try:
+            stats = PolicyAppConnectionMgr(-3, 1, 2)
+        except PolicyError:
+            denied = True
+        self.assertTrue(denied, "Failed to detect negative setting value.")
+
+    def test_policy_app_conn_mgr_update_bad_settings(self):
+        denied = False
+        try:
+            stats = PolicyAppConnectionMgr(0, 0, 0)
+        except PolicyError:
+            denied = True
+        self.assertFalse(denied, "Should allow all zeros.")
+        try:
+            stats.update(0, -1, 0)
+        except PolicyError:
+            denied = True
+        self.assertTrue(denied, "Failed to detect negative setting value.")
+
+    def test_policy_app_conn_mgr_larger_counts(self):
+        stats = PolicyAppConnectionMgr(10000, 10000, 10000)
+        diags = []
+        for i in range(0, 10000):
+            self.assertTrue(stats.can_connect('1.1.1.1:' + str(i), 'chuck', '1.1.1.1', diags))
+            self.assertTrue(len(diags) == 0)
+        self.assertFalse(stats.can_connect('1.1.1.1:10000', 'chuck', '1.1.1.1', diags))
+        self.assertTrue(len(diags) == 3)
+        self.assertTrue(stats.connections_active == 10000)
+        self.assertTrue(stats.connections_approved == 10000)
+        self.assertTrue(stats.connections_denied == 1)
+
+if __name__ == '__main__':
+    unittest.main(main_module())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/run_unit_tests.c
----------------------------------------------------------------------
diff --git a/tests/run_unit_tests.c b/tests/run_unit_tests.c
index d2e5571..df4fe56 100644
--- a/tests/run_unit_tests.c
+++ b/tests/run_unit_tests.c
@@ -29,6 +29,7 @@ int alloc_tests(void);
 int server_tests(qd_dispatch_t *qd);
 int parse_tests(void);
 int compose_tests(void);
+int policy_tests(void);
 
 int main(int argc, char** argv)
 {
@@ -53,6 +54,7 @@ int main(int argc, char** argv)
 #if USE_MEMORY_POOL
     result += alloc_tests();
 #endif
+    result += policy_tests();
     qd_dispatch_free(qd);       // dispatch_free last.
 
     return result;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/system_tests_policy.py.in
----------------------------------------------------------------------
diff --git a/tests/system_tests_policy.py.in b/tests/system_tests_policy.py.in
new file mode 100644
index 0000000..3304f60
--- /dev/null
+++ b/tests/system_tests_policy.py.in
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, json
+from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT
+from subprocess import PIPE, STDOUT
+from proton import ConnectionException
+from proton.utils import BlockingConnection, LinkDetached
+
+class AbsoluteConnectionCountLimit(TestCase):
+    """
+    Verify that connections beyond the absolute limit are denied
+    """
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(AbsoluteConnectionCountLimit, cls).setUpClass()
+        config = Qdrouterd.Config([
+            ('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.Policy'}),
+            ('router', {'mode': 'standalone', 'routerId': 'QDR.Policy'}),
+            ('listener', {'port': cls.tester.get_port()}),
+            ('policy', {'maximumConnections': 2})
+        ])
+
+        cls.router = cls.tester.qdrouterd('conn-limit-router', config, wait=True)
+
+    def address(self):
+        return self.router.addresses[0]
+
+    def test_verify_maximum_connections(self):
+        addr = self.address()
+
+        # two connections should be ok
+        denied = False
+        try:
+            bc1 = BlockingConnection(addr)
+            bc2 = BlockingConnection(addr)
+        except ConnectionException:
+            denied = True
+
+        self.assertFalse(denied) # assert if connections that should open did not open
+
+        # third connection should be denied
+        denied = False
+        try:
+            bc3 = BlockingConnection(addr)
+        except ConnectionException:
+            denied = True
+
+        self.assertTrue(denied) # assert if connection that should not open did open
+
+        bc1.close()
+        bc2.close()
+
+class LoadPolicyFromFolder(TestCase):
+    """
+    Verify that specifying a policy folder from the router conf file
+    effects loading the policies in that folder.
+    This test relies on qdmanage utility.
+    """
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(LoadPolicyFromFolder, cls).setUpClass()
+        config = Qdrouterd.Config([
+            ('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.Policy2'}),
+            ('router', {'mode': 'standalone', 'routerId': 'QDR.Policy'}),
+            ('listener', {'port': cls.tester.get_port()}),
+            ('policy', {'maximumConnections': 2, 'policyFolder': '${CMAKE_CURRENT_BINARY_DIR}/policy-1/', 'enableAccessRules': 'true'})
+        ])
+
+        cls.router = cls.tester.qdrouterd('conn-limit-router', config, wait=True)
+
+    def address(self):
+        return self.router.addresses[0]
+
+    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
+        p = self.popen(
+            ['qdmanage'] + cmd.split(' ') + ['--bus', 'u1:password@' + self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception, e:
+            raise Exception("%s\n%s" % (e, out))
+        return out
+
+    def test_verify_policies_are_loaded(self):
+        addr = self.address()
+
+        rulesets = json.loads(self.run_qdmanage('query --type=policyRuleset'))
+        self.assertEqual(len(rulesets), 3)
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-dispatch git commit: DISPATCH-188: Implement policy. merge from branch crolke-DISPATCH-188-1.

Posted by ch...@apache.org.
DISPATCH-188: Implement policy. merge from branch crolke-DISPATCH-188-1.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/48e81620
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/48e81620
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/48e81620

Branch: refs/heads/master
Commit: 48e8162081806d8b4d55efca7cf5ea49d6083d37
Parents: f93d5ce
Author: Chuck Rolke <cr...@redhat.com>
Authored: Wed Mar 30 13:05:47 2016 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Wed Mar 30 13:05:47 2016 -0400

----------------------------------------------------------------------
 doc/notes/qdr-policy-01.odt                     | Bin 0 -> 38076 bytes
 doc/notes/qdr-policy-01.pdf                     | Bin 0 -> 168277 bytes
 doc/notes/qdr-policy-diagrams-01.odg            | Bin 0 -> 30918 bytes
 include/qpid/dispatch/container.h               |   1 +
 include/qpid/dispatch/driver.h                  |  13 +-
 include/qpid/dispatch/server.h                  |  12 +
 python/qpid_dispatch/management/qdrouter.json   | 128 ++++
 .../qdrouter.policyRuleset.settings.txt         | 105 +++
 python/qpid_dispatch_internal/dispatch.py       |   6 +
 .../qpid_dispatch_internal/management/agent.py  |  33 +
 .../qpid_dispatch_internal/management/config.py |  38 +-
 .../qpid_dispatch_internal/management/schema.py |   2 +-
 .../qpid_dispatch_internal/policy/__init__.py   |  20 +
 .../policy/policy_local.py                      | 709 +++++++++++++++++
 .../policy/policy_manager.py                    | 157 ++++
 .../policy/policy_util.py                       | 338 +++++++++
 src/CMakeLists.txt                              |   1 +
 src/container.c                                 |  73 +-
 src/dispatch.c                                  |  44 +-
 src/dispatch_private.h                          |  13 +
 src/policy.c                                    | 758 +++++++++++++++++++
 src/policy.h                                    | 165 ++++
 src/policy_internal.h                           | 103 +++
 src/posix/driver.c                              |  19 +-
 src/server.c                                    | 116 ++-
 src/server_private.h                            |   7 +
 tests/CMakeLists.txt                            |  12 +-
 tests/config-1/A.json                           |  81 ++
 tests/policy-1/management-access.json           |  45 ++
 tests/policy-1/policy-boardwalk.json            |  96 +++
 tests/policy-1/policy-safari.json               |  96 +++
 .../test-policy-conf-includes-folder.conf.in    |  64 ++
 tests/policy-2/make-sasl.sh                     |  63 ++
 tests/policy-2/policy-photoserver-sasl.conf.in  |   4 +
 tests/policy-2/policy-photoserver-sasl.sasldb   | Bin 0 -> 12288 bytes
 tests/policy-2/ssl_certs/gencerts.sh            |  39 +
 tests/policy-2/test-router-with-policy.json.in  | 170 +++++
 tests/policy_test.c                             |  89 +++
 tests/router_policy_test.py                     | 294 +++++++
 tests/run_unit_tests.c                          |   2 +
 tests/system_tests_policy.py.in                 | 111 +++
 41 files changed, 3967 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/doc/notes/qdr-policy-01.odt
----------------------------------------------------------------------
diff --git a/doc/notes/qdr-policy-01.odt b/doc/notes/qdr-policy-01.odt
new file mode 100644
index 0000000..5e9f185
Binary files /dev/null and b/doc/notes/qdr-policy-01.odt differ

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/doc/notes/qdr-policy-01.pdf
----------------------------------------------------------------------
diff --git a/doc/notes/qdr-policy-01.pdf b/doc/notes/qdr-policy-01.pdf
new file mode 100644
index 0000000..024adca
Binary files /dev/null and b/doc/notes/qdr-policy-01.pdf differ

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/doc/notes/qdr-policy-diagrams-01.odg
----------------------------------------------------------------------
diff --git a/doc/notes/qdr-policy-diagrams-01.odg b/doc/notes/qdr-policy-diagrams-01.odg
new file mode 100644
index 0000000..af344ce
Binary files /dev/null and b/doc/notes/qdr-policy-diagrams-01.odg differ

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/include/qpid/dispatch/container.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index 39fe968..22c6870 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -169,6 +169,7 @@ void *qd_link_get_context(qd_link_t *link);
 void qd_link_set_conn_context(qd_link_t *link, void *link_context);
 void *qd_link_get_conn_context(qd_link_t *link);
 
+void policy_notify_opened(void *container, qd_connection_t *conn, void *context);
 qd_direction_t qd_link_direction(const qd_link_t *link);
 qd_connection_t *qd_link_connection(qd_link_t *link);
 pn_link_t *qd_link_pn(qd_link_t *link);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/include/qpid/dispatch/driver.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/driver.h b/include/qpid/dispatch/driver.h
index 05e8b18..c3c1b79 100644
--- a/include/qpid/dispatch/driver.h
+++ b/include/qpid/dispatch/driver.h
@@ -178,9 +178,11 @@ void qdpn_listener_trace(qdpn_listener_t *listener, pn_trace_t trace);
 /** Accept a connection that is pending on the listener.
  *
  * @param[in] listener the listener to accept the connection on
+ * @param[in] policy function that accepts remote host name and returns 
+ *            decision to allow or deny this connection
  * @return a new connector for the remote, or NULL on error
  */
-qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener);
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener, void *policy, bool (*policy_fn)(void *, const char *));
 
 /** Access the application context that is associated with the listener.
  *
@@ -314,11 +316,18 @@ void qdpn_connector_set_context(qdpn_connector_t *connector, void *context);
 
 /** Access the name of the connector
  *
- * @param[in] connector the connector which will hole the name
+ * @param[in] connector the connector of interest
  * @return the name of the connector in the form of a null-terminated character string.
  */
 const char *qdpn_connector_name(const qdpn_connector_t *connector);
 
+/** Access the numeric host ip of the connector
+ *
+ * @param[in] connector the connector of interest
+ * @return the numeric host ip address of the connector in the form of a null-terminated character string.
+ */
+const char *qdpn_connector_hostip(const qdpn_connector_t *connector);
+
 /** Access the transport used by this connector.
  *
  * @param[in] connector connector whose transport will be returned

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 46f50b5..71ac479 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -572,6 +572,18 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo
 
 
 /**
+ * Write accessor to the connection's proton-event stall flag.
+ * When set no further events are processed on this connection.
+ * Used during processing of policy decisions to hold off incoming
+ * pipeline of amqp events.
+ *
+ * @param conn Connection object
+ * @param stall Value of stall flag
+ */
+void qd_connection_set_event_stall(qd_connection_t *conn, bool stall);
+
+
+/**
  * Create a listener for incoming connections.
  *
  * @param qd The dispatch handle returned by qd_dispatch.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 8ff352a..64cf6a4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -735,6 +735,7 @@
                         "CONFIG",
                         "ERROR",
                         "DISPATCH",
+                        "POLICY",
                         "DEFAULT"
                     ],
                     "required": true,
@@ -1235,6 +1236,133 @@
             }
         },
 
+        "policy": {
+            "description": "Defines global connection limit",
+            "extends": "configurationEntity",
+            "singleton": true,
+            "attributes": {
+                "maximumConnections": {
+                    "type": "integer",
+                    "default": 0,
+                    "description": "Global maximum number of concurrent client connections allowed. Zero implies no limit. This limit is always enforced even if no other policy settings have been defined.",
+                    "required": false,
+                    "create": true
+                },
+                "enableAccessRules": {
+                    "type": "boolean",
+                    "default": false,
+                    "description": "Enable user rule set processing and connection denial.",
+                    "required": false,
+                    "create": true
+                },
+                "policyFolder": {
+                    "type": "path",
+                    "default": "",
+                    "description": "The path to a folder that holds policyRuleset definition .json files. For a small system the rulesets may all be defined in this file. At a larger scale it is better to have the policy files in their own folder and to have none of the rulesets defined here. All rulesets in all .json files in this folder are processed.",
+                    "required": false,
+                    "create": true
+                },
+                "connectionsProcessed": {"type": "integer", "graph": true},
+                "connectionsDenied": {"type": "integer", "graph": true},
+                "connectionsCurrent": {"type": "integer", "graph": true}
+            }
+        },
+
+        "policyRuleset": {
+            "description": "Per application definition of the locations from which users may connect and the groups to which users belong.",
+            "extends": "configurationEntity",
+            "operations": ["CREATE"],
+            "attributes": {
+                "applicationName": {
+                    "type": "string",
+                    "description": "The application name.",
+                    "required": true
+                },
+                "maxConnections": {
+                    "type": "integer",
+                    "default": 0,
+                    "description": "Maximum number of concurrent client connections allowed. Zero implies no limit.",
+                    "required": false,
+                    "create": true
+                },
+                "maxConnPerUser": {
+                    "type": "integer",
+                    "default": 0,
+                    "description": "Maximum number of concurrent client connections allowed for any single user. Zero implies no limit.",
+                    "required": false,
+                    "create": true
+                },
+                "maxConnPerHost": {
+                    "type": "integer",
+                    "default": 0,
+                    "description": "Maximum number of concurrent client connections allowed for any remote host. Zero implies no limit.",
+                    "required": false,
+                    "create": true
+                },
+                "userGroups": {
+                    "type": "map",
+                    "description": "A map where each key is a user group name and the corresponding value is a CSV string naming the users in that group. Users who are assigned to one or more groups are deemed 'restricted'. Restricted users are subject to connection ingress policy and are assigned policy settings based on the assigned user groups. Unrestricted users may be allowed or denied. If unrestricted users are allowed to connect then they are assigned to user group default.",
+                    "required": false,
+                    "create": true
+                },
+                "ingressHostGroups": {
+                    "type": "map",
+                    "description": "A map where each key is an ingress host group name and the corresponding value is a CSV string naming the IP addresses or address ranges in that group. IP addresses may be FQDN strings or numeric IPv4 or IPv6 host addresses. A host range is two host addresses of the same address family separated with a hyphen.  The wildcard host address '*' represents any host address.",
+                    "required": false,
+                    "create": true
+                },
+                "ingressPolicies": {
+                    "type": "map",
+                    "description": "A map where each key is a user group name and the corresponding value is a CSV string naming the ingress host group names that restrict the ingress host for the user group. Users who are members of the user group are allowed to connect only from a host in one of the named ingress host groups.",
+                    "required": false,
+                    "create": true
+                },
+                "connectionAllowDefault": {
+                    "type": "boolean",
+                    "description": "Unrestricted users, those who are not members of a defined user group, are allowed to connect to this application. Unrestricted users are assigned to the 'default' user group and receive 'default' settings.",
+                    "default": false,
+                    "required": false,
+                    "create": true
+                },
+                "settings": {
+                    "type": "map",
+                    "description": "A map where each key is a user group name and the value is a map of the corresponding settings for that group.",
+                    "required": false,
+                    "create": true
+                }
+            }
+        },
+
+        "policyStats": {
+            "description": "Per application connection and access statistics.",
+            "extends": "operationalEntity",
+            "attributes": {
+                "applicationName": {
+                    "type": "string",
+                    "description": "The application name."
+                },
+                "connectionsApproved": {"type": "integer", "graph": true},
+                "connectionsDenied": {"type": "integer", "graph": true},
+                "connectionsCurrent": {"type": "integer", "graph": true},
+                "perUserState": {
+                    "type": "map",
+                    "description": "A map where the key is the authenticated user name and the value is a list of the user's connections."
+                },
+                "perHostState": {
+                    "type": "map",
+                    "description": "A map where the key is the host name and the value is a list of the host's connections."
+                },
+
+                "sessionDenied": {"type": "integer", "graph": true},
+                "senderDenied": {"type": "integer", "graph": true},
+                "receiverDenied": {"type": "integer", "graph": true},
+                "dynamicSrcDenied": {"type": "integer", "graph": true},
+                "anonymousSenderDenied": {"type": "integer", "graph": true},
+                "linkSourceDenied": {"type": "integer", "graph": true},
+                "linkTargetDenied": {"type": "integer", "graph": true}
+            }
+        },
+
         "dummy": {
             "description": "Dummy entity for test purposes.",
             "extends": "entity",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt b/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt
new file mode 100644
index 0000000..773df7a
--- /dev/null
+++ b/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt
@@ -0,0 +1,105 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+
+The schema `qdrouterd.json` is a JSON format file that defines annotations and
+entity types of the Qpid Dispatch Router management model.  The model is based
+on the AMQP management specification.
+
+Schema entity `policyRuleset` includes several attributes of type map. In the current form the management schema provides no way to define the keys and values in these attributes. These maps cannot be specified in the schema and they cannot be checked by the schema processing. 
+
+Until the schema is extended specify embedded maps this document describes the policyRuleset settings.
+
+  "policyAppSettings": {
+      "description": "For a given user group define the policy settings applied to the user's AMQP connection.",
+      "extends": "configurationEntity",
+      "attributes": {
+          "userGroupName": {
+              "type": "string",
+              "description": "The user group to which these settings apply.",
+              "required": true
+          },
+          "maxFrameSize": {
+              "type": "integer",
+              "description": "Largest frame that may be sent on this connection. Zero implies system default. (AMQP Open, max-frame-size)",
+              "default": 65536,
+              "required": false,
+              "create": true
+          },
+          "maxMessageSize": {
+              "type": "integer",
+              "description": "Largest message size supported by links created on this connection. Zero implies system default. (AMQP Attach, max-message-size)",
+              "default": 0,
+              "required": false,
+              "create": true
+          },
+          "maxSessionWindow": {
+              "type": "integer",
+              "description": "Largest incoming and outgoing window for sessions created on this connection. Zero implies system default. (AMQP Begin, incoming-window, outgoing-window)",
+              "default": 2147483647,
+              "required": false,
+              "create": true
+          },
+          "maxSessions": {
+              "type": "integer",
+              "description": "Maximum number of sessions that may be created on this connection. Zero implies system default. (AMQP Open, channel-max)",
+              "default": 10,
+              "required": false,
+              "create": true
+          },
+          "maxSenders": {
+              "type": "integer",
+              "description": "Maximum number of sending links that may be created on this connection. Zero implies system default.",
+              "default": 10,
+              "required": false,
+              "create": true
+          },
+          "maxReceivers": {
+              "type": "integer",
+              "description": "Maximum number of receiving links that may be created on this connection. Zero implies system default.",
+              "required": false,
+              "create": true
+          },
+          "allowDynamicSrc": {
+              "type": "boolean",
+              "description": "This connection is allowed to use the dynamic link source feature.",
+              "default": false,
+              "required": false,
+              "create": true
+          },
+          "allowAnonymousSender": {
+              "type": "boolean",
+              "description": "This connection is allowed to use the Anonymous Sender feature.",
+              "default": false,
+              "required": false,
+              "create": true
+          },
+          "sources": {
+              "type": "string",
+              "description": "List of Source addresses allowed when creating receiving links.",
+              "required": false,
+              "create": true
+          },
+          "targets": {
+              "type": "string",
+              "description": "List of Target addresses allowed when creating sending links.",
+              "required": false,
+              "create": true
+          }
+      }
+  }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch_internal/dispatch.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/dispatch.py b/python/qpid_dispatch_internal/dispatch.py
index 3b16193..3ad4436 100644
--- a/python/qpid_dispatch_internal/dispatch.py
+++ b/python/qpid_dispatch_internal/dispatch.py
@@ -72,6 +72,12 @@ class QdDll(ctypes.PyDLL):
         self._prototype(self.qd_dispatch_configure_link_route, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_auto_link, None, [self.qd_dispatch_p, py_object])
 
+        self._prototype(self.qd_dispatch_configure_policy, None, [self.qd_dispatch_p, py_object])
+        self._prototype(self.qd_dispatch_register_policy_manager, None, [self.qd_dispatch_p, py_object])
+        self._prototype(self.qd_dispatch_policy_c_counts_alloc, c_long, [], check=False)
+        self._prototype(self.qd_dispatch_policy_c_counts_free, None, [c_long], check=False)
+        self._prototype(self.qd_dispatch_policy_c_counts_refresh, None, [c_long, py_object])
+
         self._prototype(self.qd_dispatch_set_agent, None, [self.qd_dispatch_p, py_object])
 
         self._prototype(self.qd_router_setup_late, None, [self.qd_dispatch_p])

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py
index 5b9a7f5..336ef86 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -81,6 +81,7 @@ from .schema import ValidationError, SchemaEntity, EntityType
 from .qdrouter import QdSchema
 from ..router.message import Message
 from ..router.address import Address
+from ..policy.policy_manager import PolicyManager
 
 
 def dictstr(d):
@@ -154,6 +155,7 @@ class EntityAdapter(SchemaEntity):
         self.__dict__['_log'] = agent.log
         self.__dict__['_qd'] = agent.qd
         self.__dict__['_dispatch'] = agent.dispatch
+        self.__dict__['_policy'] = agent.policy
         self.__dict__['_implementations'] = []
 
     def validate(self, **kwargs):
@@ -278,6 +280,36 @@ class LogEntity(EntityAdapter):
     def __str__(self):
         return super(LogEntity, self).__str__().replace("Entity(", "LogEntity(")
 
+
+class PolicyEntity(EntityAdapter):
+    def __init__(self, agent, entity_type, attributes=None):
+        super(PolicyEntity, self).__init__(agent, entity_type, attributes, validate=False)
+        # Policy is a mix of configuration and operational entity.
+        # The statistics attributes are operational not configured.
+        self._add_implementation(
+            CImplementation(agent.qd, entity_type, self._dispatch))
+
+    def create(self):
+        self._qd.qd_dispatch_configure_policy(self._dispatch, self)
+        self._qd.qd_dispatch_register_policy_manager(self._dispatch, self._policy)
+
+    def _identifier(self):
+        return self.attributes.get('module')
+
+
+class PolicyRulesetEntity(EntityAdapter):
+    def create(self):
+        self._policy.create_ruleset(self.attributes)
+
+    def _identifier(self):
+        return self.attributes.get('applicationName')
+
+
+class PolicyStatsEntity(EntityAdapter):
+    def _identifier(self):
+        return self.attributes.get('applicationName')
+
+
 def _addr_port_identifier(entity):
     for attr in ['addr', 'port']: # Set default values if need be
         entity.attributes.setdefault(
@@ -671,6 +703,7 @@ class Agent(object):
         self.entities = EntityCache(self)
         self.request_lock = Lock()
         self.log_adapter = LogAdapter("AGENT")
+        self.policy = PolicyManager(self)
         self.management = self.create_entity({"type": "management"})
         self.add_entity(self.management)
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch_internal/management/config.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py
index 1ae8f6c..d742960 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -22,6 +22,7 @@ Configuration file parsing
 """
 
 import json, re, sys
+import os
 from copy import copy
 from qpid_dispatch.management.entity import camelcase
 
@@ -31,13 +32,13 @@ from .qdrouter import QdSchema
 class Config(object):
     """Load config entities from qdrouterd.conf and validated against L{QdSchema}."""
 
-    def __init__(self, filename=None, schema=QdSchema()):
+    def __init__(self, filename=None, schema=QdSchema(), raw_json=False):
         self.schema = schema
         self.config_types = [et for et in schema.entity_types.itervalues()
                              if schema.is_configuration(et)]
         if filename:
             try:
-                self.load(filename)
+                self.load(filename, raw_json)
             except Exception, e:
                 raise Exception, "Cannot load configuration file %s: %s" % (filename, e), sys.exc_info()[2]
         else:
@@ -71,6 +72,17 @@ class Config(object):
             if s[0] == "autoLink":  s[0] = "router.config.autoLink"
         return sections
 
+    @staticmethod
+    def _parserawjson(lines):
+        """Parse raw json config file format into a section list"""
+        def sub(line):
+            """Do substitutions to make line json-friendly"""
+            line = line.split('#')[0].strip() # Strip comments
+            return line
+        js_text = "%s"%("".join([sub(l) for l in lines]))
+        sections = json.loads(js_text)
+        return sections
+
 
     def _expand(self, content):
         """
@@ -97,16 +109,18 @@ class Config(object):
         return [_expand_section(s, annotations) for s in content
                 if self.schema.is_configuration(self.schema.entity_type(s[0], False))]
 
-    def load(self, source):
+    def load(self, source, raw_json=False):
         """
         Load a configuration file.
         @param source: A file name, open file object or iterable list of lines
+        @param raw_json: Source is pure json not needing conf-style substitutions
         """
         if isinstance(source, basestring):
+            raw_json |= source.endswith(".json")
             with open(source) as f:
-                self.load(f)
+                self.load(f, raw_json)
         else:
-            sections = self._parse(source)
+            sections = self._parserawjson(source) if raw_json else self._parse(source)
             # Add missing singleton sections
             for et in self.config_types:
                 if et.singleton and not [s for s in sections if s[0] == et.short_name]:
@@ -159,9 +173,11 @@ def configure_dispatch(dispatch, lib_handle, filename):
 
     from qpid_dispatch_internal.display_name.display_name import DisplayNameService
     displayname_service = DisplayNameService("$displayname")
+    policyFolder = config.by_type('policy')[0]['policyFolder']
     # Remaining configuration
     for t in "fixedAddress", "listener", "connector", "waypoint", "linkRoutePattern", \
-             "router.config.address", "router.config.linkRoute", "router.config.autoLink":
+             "router.config.address", "router.config.linkRoute", "router.config.autoLink", \
+             "policy", "policyRuleset":
         for a in config.by_type(t):
             configure(a)
             if t == "listener":
@@ -173,4 +189,12 @@ def configure_dispatch(dispatch, lib_handle, filename):
     for e in config.entities:
         configure(e)
 
-
+    # Load the policyRulesets from the .json files in policyFolder
+    # Only policyRulesets are loaded. Other entities are silently discarded.
+    if not policyFolder == '':
+        apath = os.path.abspath(policyFolder)
+        for i in os.listdir(policyFolder):
+            if i.endswith(".json"):
+                pconfig = Config(os.path.join(apath, i))
+                for a in pconfig.by_type("policyRuleset"):
+                    agent.configure(a)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch_internal/management/schema.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/schema.py b/python/qpid_dispatch_internal/management/schema.py
index 70e9b83..794e0ad 100644
--- a/python/qpid_dispatch_internal/management/schema.py
+++ b/python/qpid_dispatch_internal/management/schema.py
@@ -155,7 +155,7 @@ BUILTIN_TYPES = OrderedDict(
                           Type("entityId", str),
                           Type("integer", int),
                           Type("list", list),
-                          Type("map", map),
+                          Type("map", dict),
                           BooleanType()])
 
 def get_type(rep):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch_internal/policy/__init__.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/__init__.py b/python/qpid_dispatch_internal/policy/__init__.py
new file mode 100644
index 0000000..6417447
--- /dev/null
+++ b/python/qpid_dispatch_internal/policy/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""Qpid Dispatch internal policy package."""
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch_internal/policy/policy_local.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py
new file mode 100644
index 0000000..89e5016
--- /dev/null
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -0,0 +1,709 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+"""
+
+"""
+
+import json
+from policy_util import PolicyError, HostStruct, HostAddr, PolicyAppConnectionMgr
+
+"""
+Entity implementing the business logic of user connection/access policy.
+"""
+
+#
+#
+class PolicyKeys(object):
+    """
+    String constants
+    """
+    # Common key words
+    KW_IGNORED_NAME             = "name"
+    KW_IGNORED_IDENTITY         = "identity"
+    KW_IGNORED_TYPE             = "type"
+    KW_APPLICATION_NAME         = "applicationName"
+
+    # Policy ruleset key words
+    KW_MAXCONN                     = "maxConnections"
+    KW_MAXCONNPERHOST              = "maxConnPerHost"
+    KW_MAXCONNPERUSER              = "maxConnPerUser"
+    KW_USER_GROUPS                 = "userGroups"
+    KW_INGRESS_HOST_GROUPS         = "ingressHostGroups"
+    KW_INGRESS_POLICIES            = "ingressPolicies"
+    KW_CONNECTION_ALLOW_DEFAULT    = "connectionAllowDefault"
+    KW_SETTINGS                    = "settings"
+
+    # Policy settings key words
+    KW_USER_GROUP_NAME          = "userGroupName"
+    KW_MAX_FRAME_SIZE           = "maxFrameSize"
+    KW_MAX_MESSAGE_SIZE         = "maxMessageSize"
+    KW_MAX_SESSION_WINDOW       = "maxSessionWindow"
+    KW_MAX_SESSIONS             = "maxSessions"
+    KW_MAX_SENDERS              = "maxSenders"
+    KW_MAX_RECEIVERS            = "maxReceivers"
+    KW_ALLOW_DYNAMIC_SRC        = "allowDynamicSrc"
+    KW_ALLOW_ANONYMOUS_SENDER   = "allowAnonymousSender"
+    KW_SOURCES                  = "sources"
+    KW_TARGETS                  = "targets"
+
+    # Policy stats key words
+    KW_CONNECTIONS_APPROVED     = "connectionsApproved"
+    KW_CONNECTIONS_DENIED       = "connectionsDenied"
+    KW_CONNECTIONS_CURRENT      = "connectionsCurrent"
+    KW_PER_USER_STATE           = "perUserState"
+    KW_PER_HOST_STATE           = "perHostState"
+
+    # What settings does a user get when allowed to connect but
+    # not restricted by a user group?
+    KW_DEFAULT_SETTINGS         = "default"
+
+    # Config file separator character for two IP addresses in a range
+    KC_CONFIG_IP_SEP            = "-"
+
+    # Config file separator character for names in a list
+    KC_CONFIG_LIST_SEP          = ","
+
+    # user-to-group computed map in compiled ruleset
+    RULESET_U2G_MAP             = "U2G"
+
+    # policy stats controlled by C code but referenced by settings
+    KW_CSTATS                   = "denialCounts"
+#
+#
+class PolicyCompiler(object):
+    """
+    Validate incoming configuration for legal schema.
+    - Warn about section options that go unused.
+    - Disallow negative max connection numbers.
+    - Check that connectionOrigins resolve to IP hosts.
+    - Enforce internal consistency,
+    """
+
+    allowed_ruleset_options = [
+        PolicyKeys.KW_IGNORED_NAME,
+        PolicyKeys.KW_IGNORED_IDENTITY,
+        PolicyKeys.KW_IGNORED_TYPE,
+        PolicyKeys.KW_APPLICATION_NAME,
+        PolicyKeys.KW_MAXCONN,
+        PolicyKeys.KW_MAXCONNPERHOST,
+        PolicyKeys.KW_MAXCONNPERUSER,
+        PolicyKeys.KW_USER_GROUPS,
+        PolicyKeys.KW_INGRESS_HOST_GROUPS,
+        PolicyKeys.KW_INGRESS_POLICIES,
+        PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT,
+        PolicyKeys.KW_SETTINGS
+        ]
+
+    allowed_settings_options = [
+        PolicyKeys.KW_MAX_FRAME_SIZE,
+        PolicyKeys.KW_MAX_MESSAGE_SIZE,
+        PolicyKeys.KW_MAX_SESSION_WINDOW,
+        PolicyKeys.KW_MAX_SESSIONS,
+        PolicyKeys.KW_MAX_SENDERS,
+        PolicyKeys.KW_MAX_RECEIVERS,
+        PolicyKeys.KW_ALLOW_DYNAMIC_SRC,
+        PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER,
+        PolicyKeys.KW_SOURCES,
+        PolicyKeys.KW_TARGETS
+        ]
+
+    def __init__(self):
+        """
+        Create a validator
+        """
+        pass
+
+
+    def validateNumber(self, val, v_min, v_max, errors):
+        """
+        Range check a numeric int policy value
+        @param[in] val policy value to check
+        @param[in] v_min minumum value
+        @param[in] v_max maximum value. zero disables check
+        @param[out] errors failure message
+        @return v_min <= val <= v_max
+        """
+        try:
+            v_int = int(val)
+        except Exception, e:
+            errors.append("Value '%s' does not resolve to an integer." % val)
+            return False
+        if v_int < v_min:
+            errors.append("Value '%s' is below minimum '%s'." % (val, v_min))
+            return False
+        if v_max > 0 and v_int > v_max:
+            errors.append("Value '%s' is above maximum '%s'." % (val, v_max))
+            return False
+        return True
+
+
+    def compile_connection_groups(self, name, submap_in, submap_out, warnings, errors):
+        """
+        Handle an ingressHostGroups submap.
+        Each origin value is verified. On a successful run the submap
+        is replaced parsed lists of HostAddr objects.
+        @param[in] name application name
+        @param[in] submap_in user input origin list as text strings
+        @param[out] submap_out user inputs replaced with HostAddr objects
+        @param[out] warnings nonfatal irregularities observed
+        @param[out] errors descriptions of failure
+        @return - origins is usable. If True then warnings[] may contain useful
+                  information about fields that are ignored. If False then
+                  warnings[] may contain info and errors[0] will hold the
+                  description of why the origin was rejected.
+        """
+        key = PolicyKeys.KW_INGRESS_HOST_GROUPS
+        for coname in submap_in:
+            try:
+                ostr = str(submap_in[coname])
+                olist = [x.strip(' ') for x in ostr.split(PolicyKeys.KC_CONFIG_LIST_SEP)]
+                submap_out[coname] = []
+                for co in olist:
+                    coha = HostAddr(co, PolicyKeys.KC_CONFIG_IP_SEP)
+                    submap_out[coname].append(coha)
+            except Exception, e:
+                errors.append("Application '%s' option '%s' connectionOption '%s' failed to translate: '%s'." %
+                                (name, key, coname, e))
+                return False
+        return True
+
+
+    def compile_app_settings(self, appname, usergroup, policy_in, policy_out, warnings, errors):
+        """
+        Compile a schema from processed json format to local internal format.
+        @param[in] name application name
+        @param[in] policy_in user config settings
+        @param[out] policy_out validated Internal format
+        @param[out] warnings nonfatal irregularities observed
+        @param[out] errors descriptions of failure
+        @return - settings are usable. If True then warnings[] may contain useful
+                  information about fields that are ignored. If False then
+                  warnings[] may contain info and errors[0] will hold the
+                  description of why the policy was rejected.
+        """
+        # rulesets may not come through standard config so make nice defaults
+        policy_out[PolicyKeys.KW_MAX_FRAME_SIZE] = 65536
+        policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = 0
+        policy_out[PolicyKeys.KW_MAX_SESSION_WINDOW] = 2147483647
+        policy_out[PolicyKeys.KW_MAX_SESSIONS] = 10
+        policy_out[PolicyKeys.KW_MAX_SENDERS] = 10
+        policy_out[PolicyKeys.KW_MAX_RECEIVERS] = 10
+        policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_SRC] = False
+        policy_out[PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER] = False
+        policy_out[PolicyKeys.KW_SOURCES] = ''
+        policy_out[PolicyKeys.KW_TARGETS] = ''
+
+        cerror = []
+        for key, val in policy_in.iteritems():
+            if key not in self.allowed_settings_options:
+                warnings.append("Application '%s' user group '%s' option '%s' is ignored." %
+                                (appname, usergroup, key))
+            if key in [PolicyKeys.KW_MAX_FRAME_SIZE,
+                       PolicyKeys.KW_MAX_MESSAGE_SIZE,
+                       PolicyKeys.KW_MAX_RECEIVERS,
+                       PolicyKeys.KW_MAX_SENDERS,
+                       PolicyKeys.KW_MAX_SESSION_WINDOW,
+                       PolicyKeys.KW_MAX_SESSIONS
+                       ]:
+                if not self.validateNumber(val, 0, 0, cerror):
+                    errors.append("Application '%s' user group '%s' option '%s' has error '%s'." %
+                                  (appname, usergroup, key, cerror[0]))
+                    return False
+                policy_out[key] = val
+            elif key in [PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER,
+                         PolicyKeys.KW_ALLOW_DYNAMIC_SRC
+                         ]:
+                if not type(val) is bool:
+                    errors.append("Application '%s' user group '%s' option '%s' has illegal boolean value '%s'." %
+                                  (appname, usergroup, key, val))
+                    return False
+                policy_out[key] = val
+            elif key in [PolicyKeys.KW_SOURCES,
+                         PolicyKeys.KW_TARGETS
+                         ]:
+                # accept a string or list
+                if type(val) is str:
+                    # 'abc, def, mytarget'
+                    val = [x.strip(' ') for x in val.split(PolicyKeys.KC_CONFIG_LIST_SEP)]
+                elif type(val) is list:
+                    # ['abc', 'def', 'mytarget']
+                    pass
+                elif type(val) is unicode:
+                    # u'abc, def, mytarget'
+                    val = [x.strip(' ') for x in str(val).split(PolicyKeys.KC_CONFIG_LIST_SEP)]
+                else:
+                    errors.append("Application '%s' user group '%s' option '%s' has illegal value '%s'. Type must be 'str' or 'list' but is '%s;" %
+                                  (appname, usergroup, key, val, type(val)))
+                # deduplicate address lists
+                val = list(set(val))
+                # output result is CSV string with no white space between values: 'abc,def,mytarget'
+                policy_out[key] = ','.join(val)
+        return True
+
+
+    def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors):
+        """
+        Compile a schema from processed json format to local internal format.
+        @param[in] name application name
+        @param[in] policy_in raw policy to be validated
+        @param[out] policy_out validated Internal format
+        @param[out] warnings nonfatal irregularities observed
+        @param[out] errors descriptions of failure
+        @return - policy is usable. If True then warnings[] may contain useful
+                  information about fields that are ignored. If False then
+                  warnings[] may contain info and errors[0] will hold the
+                  description of why the policy was rejected.
+        """
+        cerror = []
+        # rulesets may not come through standard config so make nice defaults
+        policy_out[PolicyKeys.KW_MAXCONN] = 0
+        policy_out[PolicyKeys.KW_MAXCONNPERHOST] = 0
+        policy_out[PolicyKeys.KW_MAXCONNPERUSER] = 0
+        policy_out[PolicyKeys.KW_USER_GROUPS] = {}
+        policy_out[PolicyKeys.KW_INGRESS_HOST_GROUPS] = {}
+        policy_out[PolicyKeys.KW_INGRESS_POLICIES] = {}
+        policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT] = False
+        policy_out[PolicyKeys.KW_SETTINGS] = {}
+
+        # validate the options
+        for key, val in policy_in.iteritems():
+            if key not in self.allowed_ruleset_options:
+                warnings.append("Application '%s' option '%s' is ignored." %
+                                (name, key))
+            if key in [PolicyKeys.KW_MAXCONN,
+                       PolicyKeys.KW_MAXCONNPERHOST,
+                       PolicyKeys.KW_MAXCONNPERUSER
+                       ]:
+                if not self.validateNumber(val, 0, 65535, cerror):
+                    msg = ("Application '%s' option '%s' has error '%s'." % 
+                           (name, key, cerror[0]))
+                    errors.append(msg)
+                    return False
+                policy_out[key] = val
+            elif key in [PolicyKeys.KW_USER_GROUPS,
+                         PolicyKeys.KW_INGRESS_HOST_GROUPS,
+                         PolicyKeys.KW_INGRESS_POLICIES
+                         ]:
+                try:
+                    if not type(val) is dict:
+                        errors.append("Application '%s' option '%s' must be of type 'dict' but is '%s'" %
+                                      (name, key, type(val)))
+                        return False
+                    if key == PolicyKeys.KW_INGRESS_HOST_GROUPS:
+                        # Conection groups are lists of IP addresses that need to be
+                        # converted into binary structures for comparisons.
+                        val_out = {}
+                        if not self.compile_connection_groups(name, val, val_out, warnings, errors):
+                            return False
+                        policy_out[key] = {}
+                        policy_out[key].update(val_out)
+                    else:
+                        # deduplicate ingressPolicy and userGroups lists
+                        for k,v in val.iteritems():
+                            v = [x.strip(' ') for x in v.split(PolicyKeys.KC_CONFIG_LIST_SEP)]
+                            v = list(set(v))
+                            val[k] = v
+                        policy_out[key] = val
+                except Exception, e:
+                    errors.append("Application '%s' option '%s' error processing map: %s" %
+                                  (name, key, e))
+                    return False
+            elif key in [PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
+                if not type(val) is bool:
+                    errors.append("Application '%s' option '%s' must be of type 'bool' but is '%s'" %
+                                  (name, key, type(val)))
+                    return False
+                policy_out[key] = val
+            elif key in [PolicyKeys.KW_SETTINGS]:
+                if not type(val) is dict:
+                    errors.append("Application '%s' option '%s' must be of type 'dict' but is '%s'" %
+                                  (name, key, type(val)))
+                    return False
+                for skey, sval in val.iteritems():
+                    newsettings = {}
+                    if not self.compile_app_settings(name, skey, sval, newsettings, warnings, errors):
+                        return False
+                    policy_out[key][skey] = {}
+                    policy_out[key][skey].update(newsettings)
+
+        # Verify that each user is in only one group.
+        # Verify that each user group has defined settings
+        # Create user-to-group map for looking up user's group
+        policy_out[PolicyKeys.RULESET_U2G_MAP] = {}
+        if PolicyKeys.KW_USER_GROUPS in policy_out:
+            for group, userlist in policy_out[PolicyKeys.KW_USER_GROUPS].iteritems():
+                for user in userlist:
+                    if user in policy_out[PolicyKeys.RULESET_U2G_MAP]:
+                        errors.append("Application '%s' user '%s' is in multiple user groups '%s' and '%s'" %
+                                      (name, user, policy_out[PolicyKeys.RULESET_U2G_MAP][user], group))
+                        return False
+                    else:
+                        policy_out[PolicyKeys.RULESET_U2G_MAP][user] = group
+                if not group in policy_out[PolicyKeys.KW_SETTINGS]:
+                    errors.append("Application '%s' user group '%s' has no defined settings" %
+                                  (name, group))
+                    return False
+
+        # Default connections require a default settings
+        if policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
+            if not PolicyKeys.KW_DEFAULT_SETTINGS in policy_out[PolicyKeys.KW_SETTINGS]:
+                errors.append("Application '%s' allows connections by default but default settings are not defined" %
+                              (name))
+                return False
+
+        # Each ingress policy name reference must exist in ingressHostGroups
+        for cipname, cip in policy_out[PolicyKeys.KW_INGRESS_POLICIES].iteritems():
+            for co in cip:
+                if not co in policy_out[PolicyKeys.KW_INGRESS_HOST_GROUPS]:
+                    errors.append("Application '%s' connection ingress policy '%s' references ingress host group '%s' but that group does not exist"
+                                  (name, cipname, co))
+                    return False
+
+        return True
+
+
+#
+#
+class AppStats(object):
+    """
+    Maintain live state and statistics for an application.
+    """
+    def __init__(self, id, manager, ruleset):
+        self.my_id = id
+        self._manager = manager
+        self.conn_mgr = PolicyAppConnectionMgr(
+                ruleset[PolicyKeys.KW_MAXCONN],
+                ruleset[PolicyKeys.KW_MAXCONNPERHOST],
+                ruleset[PolicyKeys.KW_MAXCONNPERUSER])
+        self._cstats = self._manager.get_agent().qd.qd_dispatch_policy_c_counts_alloc()
+        self._manager.get_agent().add_implementation(self, "policyStats")
+
+    def update_ruleset(self, ruleset):
+        """
+        The parent ruleset has changed.
+        Propagate settings into the connection manager.
+        @param ruleset: new ruleset
+        @return:
+        """
+        self.conn_mgr.update(
+            ruleset[PolicyKeys.KW_MAXCONN],
+            ruleset[PolicyKeys.KW_MAXCONNPERHOST],
+            ruleset[PolicyKeys.KW_MAXCONNPERUSER])
+
+    def refresh_entity(self, attributes):
+        """Refresh management attributes"""
+        entitymap = {}
+        entitymap[PolicyKeys.KW_APPLICATION_NAME] =     self.my_id
+        entitymap[PolicyKeys.KW_CONNECTIONS_APPROVED] = self.conn_mgr.connections_approved
+        entitymap[PolicyKeys.KW_CONNECTIONS_DENIED] =   self.conn_mgr.connections_denied
+        entitymap[PolicyKeys.KW_CONNECTIONS_CURRENT] =  self.conn_mgr.connections_active
+        entitymap[PolicyKeys.KW_PER_USER_STATE] =       self.conn_mgr.per_user_state
+        entitymap[PolicyKeys.KW_PER_HOST_STATE] =       self.conn_mgr.per_host_state
+        self._manager.get_agent().qd.qd_dispatch_policy_c_counts_refresh(self._cstats, entitymap)
+        attributes.update(entitymap)
+
+    def can_connect(self, conn_id, user, host, diags):
+        return self.conn_mgr.can_connect(conn_id, user, host, diags)
+
+    def disconnect(self, conn_id, user, host):
+        self.conn_mgr.disconnect(conn_id, user, host)
+
+    def count_other_denial(self):
+        self.conn_mgr.count_other_denial()
+
+    def get_cstats(self):
+        return self._cstats
+
+#
+#
+class ConnectionFacts:
+    def __init__(self, user, host, app, conn_name):
+        self.user = user
+        self.host = host
+        self.app = app
+        self.conn_name = conn_name
+
+#
+#
+class PolicyLocal(object):
+    """
+    The local policy database.
+    """
+
+    def __init__(self, manager):
+        """
+        Create instance
+        @params manager policy manager class
+        """
+        # manager is a class
+        #  It provides access the dispatch system functions
+        self._manager = manager
+
+        # rulesetdb is a map
+        #  key : application name
+        #  val : ruleset for this app
+        # created by configuration
+        # augmented by policy compiler
+        self.rulesetdb = {}
+
+        # settingsdb is a map
+        #  key : <application name>
+        #  val : a map
+        #   key : <user group name>
+        #   val : settings to use for user's connection
+        # created by configuration
+        self.settingsdb = {}
+
+        # statsdb is a map
+        #  key : <application name>
+        #  val : AppStats object
+        self.statsdb = {}
+
+        # _policy_compiler is a function
+        #  validates incoming policy and readies it for internal use
+        self._policy_compiler = PolicyCompiler()
+
+        # _connections is a map
+        #  key : numeric connection id
+        #  val : ConnectionFacts
+        # Entries created as connection AMQP Opens arrive
+        # Entries destroyed as sockets closed
+        self._connections = {}
+    #
+    # Service interfaces
+    #
+    def create_ruleset(self, attributes):
+        """
+        Create named policy ruleset
+        @param[in] attributes: from config
+        """
+        warnings = []
+        diag = []
+        candidate = {}
+        name = attributes[PolicyKeys.KW_APPLICATION_NAME]
+        result = self._policy_compiler.compile_access_ruleset(name, attributes, candidate, warnings, diag)
+        if not result:
+            raise PolicyError( "Policy '%s' is invalid: %s" % (name, diag[0]) )
+        if len(warnings) > 0:
+            for warning in warnings:
+                self._manager.log_warning(warning)
+        if name not in self.rulesetdb:
+            self.statsdb[name] = AppStats(name, self._manager, candidate)
+            self._manager.log_info("Created policy rules for application %s" % name)
+        else:
+            self.statsdb[name].update_ruleset(candidate)
+            self._manager.log_info("Updated policy rules for application %s" % name)
+        self.rulesetdb[name] = {}
+        self.rulesetdb[name].update(candidate)
+
+    def policy_read(self, name):
+        """
+        Read policy for named application
+        @param[in] name application name
+        @return policy data in raw user format
+        """
+        return self.rulesetdb[name]
+
+    def policy_update(self, name, policy):
+        """
+        Update named policy
+        @param[in] name application name
+        @param[in] policy data in raw user input
+        """
+        if not name in self.rulesetdb:
+            raise PolicyError("Policy '%s' does not exist" % name)
+        self.policy_create(name, policy)
+
+    def policy_delete(self, name):
+        """
+        Delete named policy
+        @param[in] name application name
+        """
+        if not name in self.rulesetdb:
+            raise PolicyError("Policy '%s' does not exist" % name)
+        del self.rulesetdb[name]
+
+    #
+    # db enumerator
+    #
+    def policy_db_get_names(self):
+        """
+        Return a list of application names in this policy
+        """
+        return self.rulesetdb.keys()
+
+
+    #
+    # Runtime query interface
+    #
+    def lookup_user(self, user, host, app, conn_name, conn_id):
+        """
+        Lookup function called from C.
+        Determine if a user on host accessing app through AMQP Open is allowed
+        according to the policy access rules.
+        If allowed then return the policy settings name. If stats.can_connect
+        returns true then it has registered and counted the connection.
+        @param[in] user connection authId
+        @param[in] host connection remote host numeric IP address as string
+        @param[in] app application user is accessing
+        @param[in] conn_name connection name used for tracking reports
+        @param[in] conn_id internal connection id
+        @return settings user-group name if allowed; "" if not allowed
+        """
+        try:
+            if not app in self.rulesetdb:
+                self._manager.log_trace(
+                    "lookup_user failed for user '%s', host '%s', application '%s': "
+                    "No policy defined for application" % (user, host, app))
+                return ""
+
+            ruleset = self.rulesetdb[app]
+            if not app in self.statsdb:
+                msg = (
+                    "lookup_user failed for user '%s', host '%s', application '%s': "
+                    "Policy is defined but stats are missing" % (user, host, app))
+                raise PolicyError(msg)
+            stats = self.statsdb[app]
+            # User in a group or default?
+            if user in ruleset[PolicyKeys.RULESET_U2G_MAP]:
+                usergroup = ruleset[PolicyKeys.RULESET_U2G_MAP][user]
+            else:
+                if ruleset[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
+                    usergroup = PolicyKeys.KW_DEFAULT_SETTINGS
+                else:
+                    self._manager.log_trace(
+                        "lookup_user failed for user '%s', host '%s', application '%s': "
+                        "User must be in a user group" % (user, host, app))
+                    stats.count_other_denial()
+                    return ""
+            # User in usergroup allowed to connect from host?
+            if usergroup in ruleset[PolicyKeys.KW_INGRESS_POLICIES]:
+                # User's usergroup is restricted to connecting from a host
+                # defined by the group's ingress policy
+                allowed = False
+                uhs = HostStruct(host)
+                cglist = ruleset[PolicyKeys.KW_INGRESS_POLICIES][usergroup]
+                for cg in cglist:
+                    for cohost in ruleset[PolicyKeys.KW_INGRESS_HOST_GROUPS][cg]:
+                        if cohost.match_bin(uhs):
+                            allowed = True
+                            break
+                    if allowed:
+                        break
+            else:
+                # User's usergroup has no ingress policy so allow
+                allowed = True
+            if not allowed:
+                self._manager.log_trace(
+                    "lookup_user failed for user '%s', host '%s', application '%s': "
+                    "User is not allowed to connect from this host" % (user, host, app))
+                stats.count_other_denial()
+                return ""
+
+            # This user passes administrative approval.
+            # Now check live connection counts
+            diags = []
+            if not stats.can_connect(conn_name, user, host, diags):
+                for diag in diags:
+                    self._manager.log_trace(
+                        "lookup_user failed for user '%s', host '%s', application '%s': "
+                        "%s" % (user, host, app, diag))
+                return ""
+
+            # Record facts about this connection to use during teardown
+            facts = ConnectionFacts(user, host, app, conn_name)
+            self._connections[conn_id] = facts
+
+            # Return success
+            return usergroup
+
+        except Exception, e:
+            self._manager.log_error(
+                "lookup_user failed for user '%s', host '%s', application '%s': "
+                "Internal error: %s" % (user, host, app, e))
+            # return failure
+            return ""
+
+    def lookup_settings(self, appname, name, upolicy):
+        """
+        Given a settings name, return the aggregated policy blob.
+        @param[in] appname: application user is accessing
+        @param[in] name: user group name
+        @param[out] upolicy: dict holding policy values - the settings blob
+                    TODO: make this a c struct
+        @return if lookup worked
+        # Note: the upolicy output is a non-nested dict with settings of interest
+        """
+        try:
+            if not appname in self.rulesetdb:
+                self._manager.log_trace(
+                        "lookup_settings fail for application '%s', user group '%s': "
+                        "No policy defined for this application" % (appname, name))
+                return False
+
+            ruleset = self.rulesetdb[appname]
+
+            if not name in ruleset[PolicyKeys.KW_SETTINGS]:
+                self._manager.log_trace(
+                        "lookup_settings fail for application '%s', user group '%s': "
+                        "This application has no settings for the user group" % (appname, name))
+                return False
+
+            upolicy.update(ruleset[PolicyKeys.KW_SETTINGS][name])
+            upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[appname].get_cstats()
+            return True
+        except Exception, e:
+            return False
+
+    def close_connection(self, conn_id):
+        """
+        Close the connection.
+        @param conn_id:
+        @return:
+        """
+        try:
+            facts = self._connections[conn_id]
+            stats = self.statsdb[facts.app]
+            stats.disconnect(facts.conn_name, facts.user, facts.host)
+            del self._connections[conn_id]
+        except Exception, e:
+            self._manager.log_trace(
+                "Policy internal error closing connection id %s. %s" % (conn_id, str(e)))
+
+    #
+    #
+    def test_load_config(self):
+        """
+        Test function to load a policy.
+        @return:
+        """
+        ruleset_str = '["policyAccessRuleset", {"applicationName": "photoserver","maxConnections": 50,"maxConnPerUser": 5,"maxConnPerHost": 20,"userGroups": {"anonymous":       "anonymous","users":           "u1, u2","paidsubscribers": "p1, p2","test":            "zeke, ynot","admin":           "alice, bob","superuser":       "ellen"},"ingressHostGroups": {"Ten18":     "10.18.0.0-10.18.255.255","EllensWS":  "72.135.2.9","TheLabs":   "10.48.0.0-10.48.255.255, 192.168.100.0-192.168.100.255","localhost": "127.0.0.1, ::1","TheWorld":  "*"},"ingressPolicies": {"anonymous":       "TheWorld","users":           "TheWorld","paidsubscribers": "TheWorld","test":            "TheLabs","admin":           "Ten18, TheLabs, localhost","superuser":       "EllensWS, localhost"},"connectionAllowDefault": true,'
+        ruleset_str += '"settings": {'
+        ruleset_str += '"anonymous":      {"maxFrameSize": 111111,"maxMessageSize":   111111,"maxSessionWindow": 111111,"maxSessions":           1,"maxSenders":           11,"maxReceivers":         11,"allowDynamicSrc":      false,"allowAnonymousSender": false,"sources": "public",                           "targets": ""},'
+        ruleset_str += '"users":          {"maxFrameSize": 222222,"maxMessageSize":   222222,"maxSessionWindow": 222222,"maxSessions":           2,"maxSenders":           22,"maxReceivers":         22,"allowDynamicSrc":      false,"allowAnonymousSender": false,"sources": "public, private",                  "targets": "public"},'
+        ruleset_str += '"paidsubscribers":{"maxFrameSize": 333333,"maxMessageSize":   333333,"maxSessionWindow": 333333,"maxSessions":           3,"maxSenders":           33,"maxReceivers":         33,"allowDynamicSrc":      true, "allowAnonymousSender": false,"sources": "public, private",                  "targets": "public, private"},'
+        ruleset_str += '"test":           {"maxFrameSize": 444444,"maxMessageSize":   444444,"maxSessionWindow": 444444,"maxSessions":           4,"maxSenders":           44,"maxReceivers":         44,"allowDynamicSrc":      true, "allowAnonymousSender": true, "sources": "private",                          "targets": "private"},'
+        ruleset_str += '"admin":          {"maxFrameSize": 555555,"maxMessageSize":   555555,"maxSessionWindow": 555555,"maxSessions":           5,"maxSenders":           55,"maxReceivers":         55,"allowDynamicSrc":      true, "allowAnonymousSender": true, "sources": "public, private, management",      "targets": "public, private, management"},'
+        ruleset_str += '"superuser":      {"maxFrameSize": 666666,"maxMessageSize":   666666,"maxSessionWindow": 666666,"maxSessions":           6,"maxSenders":           66,"maxReceivers":         66,"allowDynamicSrc":      false,"allowAnonymousSender": false,"sources": "public, private, management, root","targets": "public, private, management, root"},'
+        ruleset_str += '"default":        {"maxFrameSize": 222222,"maxMessageSize":   222222,"maxSessionWindow": 222222,"maxSessions":           2,"maxSenders":           22,"maxReceivers":         22,"allowDynamicSrc":      false,"allowAnonymousSender": false,"sources": "public, private",                  "targets": "public"}'
+        ruleset_str += '}}]'
+        ruleset = json.loads(ruleset_str)
+
+        self.create_ruleset(ruleset[1])

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch_internal/policy/policy_manager.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_manager.py b/python/qpid_dispatch_internal/policy/policy_manager.py
new file mode 100644
index 0000000..a2823c8
--- /dev/null
+++ b/python/qpid_dispatch_internal/policy/policy_manager.py
@@ -0,0 +1,157 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+"""
+
+"""
+
+import json
+import traceback
+from policy_local import PolicyLocal
+from ..dispatch import LogAdapter, LOG_INFO, LOG_TRACE, LOG_DEBUG, LOG_ERROR
+
+
+
+"""
+Entity implementing the glue between the policy engine and the rest of the system.
+"""
+
+class PolicyManager(object):
+    """
+
+    """
+
+    def __init__(self, agent):
+        """
+        """
+        self._agent = agent
+        self._policy_local = PolicyLocal(self)
+        self.log_adapter = LogAdapter("POLICY")
+
+    def log(self, level, text):
+        info = traceback.extract_stack(limit=2)[0] # Caller frame info
+        self.log_adapter.log(level, text, info[0], info[1])
+
+    def _log(self, level, text):
+        info = traceback.extract_stack(limit=3)[0] # Caller's caller frame info
+        self.log_adapter.log(level, text, info[0], info[1])
+
+    def log_debug(self, text):
+        self._log(LOG_DEBUG, text)
+
+    def log_info(self, text):
+        self._log(LOG_INFO, text)
+
+    def log_trace(self, text):
+        self._log(LOG_TRACE, text)
+
+    def log_error(self, text):
+        self._log(LOG_ERROR, text)
+
+    def get_agent(self):
+        return self._agent
+
+    #
+    # Management interface to create a ruleset
+    #
+    def create_ruleset(self, attributes):
+        """
+        Create named policy ruleset
+        @param[in] attributes: from config
+        """
+        self._policy_local.create_ruleset(attributes)
+
+    #
+    # Runtime query interface
+    #
+    def lookup_user(self, user, host, app, conn_name, conn_id):
+        """
+        Lookup function called from C.
+        Determine if a user on host accessing app through AMQP Open is allowed
+        according to the policy access rules.
+        If allowed then return the policy settings name
+        @param[in] user connection authId
+        @param[in] host connection remote host numeric IP address as string
+        @param[in] app application user is accessing
+        @param[in] conn_name connection name for accounting purposes
+        @param[in] conn_id internal connection id
+        @return settings user-group name if allowed; "" if not allowed
+        """
+        return self._policy_local.lookup_user(user, host, app, conn_name, conn_id)
+
+    def lookup_settings(self, appname, name, upolicy):
+        """
+        Given a settings name, return the aggregated policy blob.
+        @param[in] appname: application user is accessing
+        @param[in] name: user group name
+        @param[out] upolicy: map that receives the settings
+        @return settings were retrieved or not
+        """
+        return self._policy_local.lookup_settings(appname, name, upolicy)
+
+    def close_connection(self, conn_id):
+        """
+        The connection identifed is closing. Remove it from the connection
+        accounting tables.
+        @param facts:
+        @return: none
+        """
+        self._policy_local.close_connection(conn_id)
+#
+#
+#
+def policy_lookup_user(mgr, user, host, app, conn_name, conn_id):
+    """
+    Look up a user in the policy database
+    Called by C code
+    @param mgr:
+    @param user:
+    @param host:
+    @param app:
+    @param conn_name:
+    @return:
+    """
+    return mgr.lookup_user(user, host, app, conn_name, conn_id)
+
+#
+#
+#
+def policy_close_connection(mgr, conn_id):
+    """
+    Close the connection.
+    Called by C code
+    @param mgr:
+    @param conn_id:
+    @return:
+    """
+    mgr.close_connection(conn_id)
+
+#
+#
+#
+def policy_lookup_settings(mgr, appname, name, upolicy):
+    """
+    Return settings for <app, usergroup> in upolicy map
+    @param mgr:
+    @param appname:
+    @param name:
+    @param upolicy:
+    @return:
+    """
+    return mgr.lookup_settings(appname, name, upolicy)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/python/qpid_dispatch_internal/policy/policy_util.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_util.py b/python/qpid_dispatch_internal/policy/policy_util.py
new file mode 100644
index 0000000..71a42be
--- /dev/null
+++ b/python/qpid_dispatch_internal/policy/policy_util.py
@@ -0,0 +1,338 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+import sys, os
+import socket
+import binascii
+
+
+#
+#
+class PolicyError(Exception):
+    def __init__(self, value):
+        self.value = value
+    def __str__(self):
+        return repr(self.value)
+
+#
+#
+class HostStruct(object):
+    """
+    HostStruct represents a single, binary socket address from getaddrinfo
+        - name     : name given to constructor; numeric IP or host name
+        - saddr    : net name resolved by getaddrinfo; numeric IP
+        - family   : saddr.family; int
+        - binary   : saddr packed binary address; binary string
+    """
+    families = [socket.AF_INET]
+    famnames = ["IPv4"]
+    if socket.has_ipv6:
+        families.append(socket.AF_INET6)
+        famnames.append("IPv6")
+
+    def __init__(self, hostname):
+        """
+        Given a host name text string, return the socket info for it.
+        @param[in] hostname host IP address to parse
+        """
+        try:
+            res = socket.getaddrinfo(hostname, 0)
+            if len(res) == 0:
+                raise PolicyError("HostStruct: '%s' did not resolve to an IP address" % hostname)
+            foundFirst = False
+            saddr = ""
+            sfamily = socket.AF_UNSPEC
+            for i0 in range(0, len(res)):
+                family, dum0, dum1, dum2, sockaddr = res[i0]
+                if not foundFirst:
+                    if family in self.families:
+                        saddr = sockaddr[0]
+                        sfamily = family
+                        foundFirst = True
+                else:
+                    if family in self.families:
+                        if not saddr == sockaddr[0] or not sfamily == family:
+                            raise PolicyError("HostStruct: '%s' resolves to multiple IP addresses" %
+                                              hostname)
+
+            if not foundFirst:
+                raise PolicyError("HostStruct: '%s' did not resolve to one of the supported address family" %
+                        hostname)
+            self.name = hostname
+            self.saddr = saddr
+            self.family = sfamily
+            self.binary = socket.inet_pton(family, saddr)
+            return
+        except Exception, e:
+            raise PolicyError("HostStruct: '%s' failed to resolve: '%s'" %
+                              (hostname, e))
+
+    def __str__(self):
+        return self.name
+
+    def __repr__(self):
+        return self.__str__()
+
+    def dump(self):
+        return ("(%s, %s, %s, %s)" %
+                (self.name,
+                 self.saddr,
+                 "AF_INET" if self.family == socket.AF_INET else "AF_INET6",
+                 binascii.hexlify(self.binary)))
+
+#
+#
+class HostAddr(object):
+    """
+    Provide HostIP address ranges and comparison functions.
+    A HostIP may be:
+    - single address:      10.10.1.1
+    - a pair of addresses: 10.10.0.0,10.10.255.255
+    - a wildcard:          *
+    Only IPv4 and IPv6 are supported.
+    - No unix sockets.
+    HostIP names must resolve to a single IP address.
+    Address pairs define a range.
+    - The second address must be numerically larger than the first address.
+    - The addresses must be of the same address 'family', IPv4 or IPv6.
+    The wildcard '*' matches all address IPv4 or IPv6.
+    IPv6 support is conditional based on underlying OS network options.
+    Raises a PolicyError on validation error in constructor.
+    """
+
+    def has_ipv6(self):
+        return socket.has_ipv6
+
+    def __init__(self, hostspec, separator=","):
+        """
+        Parse host spec into binary structures to use for comparisons.
+        Validate the hostspec to enforce usage rules.
+        """
+        self.hoststructs = []
+
+        if hostspec == "*":
+            self.wildcard = True
+        else:
+            self.wildcard = False
+
+            hosts = [x.strip() for x in hostspec.split(separator)]
+
+            # hosts must contain one or two host specs
+            if len(hosts) not in [1, 2]:
+                raise PolicyError("hostspec must contain 1 or 2 host names")
+            self.hoststructs.append(HostStruct(hosts[0]))
+            if len(hosts) > 1:
+                self.hoststructs.append(HostStruct(hosts[1]))
+                if not self.hoststructs[0].family == self.hoststructs[1].family:
+                    raise PolicyError("mixed IPv4 and IPv6 host specs in range not allowed")
+                c0 = self.memcmp(self.hoststructs[0].binary, self.hoststructs[1].binary)
+                if c0 > 0:
+                    raise PolicyError("host specs in range must have lower numeric address first")
+
+    def __str__(self):
+        if self.wildcard:
+            return "*"
+        res = self.hoststructs[0].name
+        if len(self.hoststructs) > 1:
+            res += "," + self.hoststructs[1].name
+        return res
+
+    def __repr__(self):
+        return self.__str__()
+
+    def dump(self):
+        if self.wildcard:
+            return "(*)"
+        res = "(" + self.hoststructs[0].dump()
+        if len(self.hoststructs) > 1:
+            res += "," + self.hoststructs[1].dump()
+        res += ")"
+        return res
+
+    def memcmp(self, a, b):
+        res = 0
+        for i in range(0,len(a)):
+            if a[i] > b[i]:
+                res = 1
+                break;
+            elif a[i] < b[i]:
+                res = -1
+                break
+        return res
+
+    def match_bin(self, candidate):
+        """
+        Does the candidate hoststruct match the IP or range of IP addresses represented by this?
+        @param[in] candidate the IP address to be tested
+        @return candidate matches this or not
+        """
+        if self.wildcard:
+            return True
+        try:
+            if not candidate.family == self.hoststructs[0].family:
+                # sorry, wrong AF_INET family
+                return False
+            c0 = self.memcmp(candidate.binary, self.hoststructs[0].binary)
+            if len(self.hoststructs) == 1:
+                return c0 == 0
+            c1 = self.memcmp(candidate.binary, self.hoststructs[1].binary)
+            return c0 >= 0 and c1 <= 0
+        except PolicyError:
+            return False
+        except Exception, e:
+            assert isinstance(candidate, HostStruct), \
+                ("Wrong type. Expected HostStruct but received %s" % candidate.__class__.__name__)
+            return False
+
+    def match_str(self, candidate):
+        """
+        Does the candidate string match the IP or range represented by this?
+        @param[in] candidate the IP address to be tested
+        @return candidate matches this or not
+        """
+        try:
+            hoststruct = HostStruct(candidate)
+        except PolicyError:
+            return False
+        return self.match_bin(hoststruct)
+
+#
+#
+class PolicyAppConnectionMgr(object):
+    """
+    Track policy user/host connection limits and statistics for one app.
+    # limits - set at creation and by update()
+    max_total            : 20
+    max_per_user         : 5
+    max_per_host         : 10
+    # statistics - maintained for the lifetime of corresponding application
+    connections_approved : N
+    connections_denied   : N
+    # live state - maintained for the lifetime of corresponding application
+    connections_active   : 5
+    per_host_state : { 'host1' : [conn1, conn2, conn3],
+                       'host2' : [conn4, conn5] }
+    per_user_state : { 'user1' : [conn1, conn2, conn3],
+                       'user2' : [conn4, conn5] }
+    """
+    def __init__(self, maxconn, maxconnperuser, maxconnperhost):
+        """
+        The object is constructed with the policy limits and zeroed counts.
+        @param[in] maxconn maximum total concurrent connections
+        @param[in] maxconnperuser maximum total conncurrent connections for each user
+        @param[in] maxconnperuser maximum total conncurrent connections for each host
+        """
+        if maxconn < 0 or maxconnperuser < 0 or maxconnperhost < 0:
+            raise PolicyError("PolicyAppConnectionMgr settings must be >= 0")
+        self.max_total    = maxconn
+        self.max_per_user = maxconnperuser
+        self.max_per_host = maxconnperhost
+        self.connections_approved = 0
+        self.connections_denied   = 0
+        self.connections_active   = 0
+        self.per_host_state = {}
+        self.per_user_state = {}
+
+    def __str__(self):
+        res = ("Connection Limits: total: %s, per user: %s, per host: %s\n" %
+            (self.max_total, self.max_per_user, self.max_per_host))
+        res += ("Connections Statistics: total approved: %s, total denied: %s" %
+                (self.connections_approved, self.connections_denied))
+        res += ("Connection State: total current: %s" % self.connections_active)
+        res += ("User state: %s\n" % self.per_user_state)
+        res += ("Host state: %s"   % self.per_host_state)
+        return res
+
+    def __repr__(self):
+        return self.__str__()
+
+    def update(self, maxconn, maxconnperuser, maxconnperhost):
+        """
+        Reset connection limits
+        @param[in] maxconn maximum total concurrent connections
+        @param[in] maxconnperuser maximum total conncurrent connections for each user
+        @param[in] maxconnperuser maximum total conncurrent connections for each host
+        """
+        if maxconn < 0 or maxconnperuser < 0 or maxconnperhost < 0:
+            raise PolicyError("PolicyAppConnectionMgr settings must be >= 0")
+        self.max_total    = maxconn
+        self.max_per_user = maxconnperuser
+        self.max_per_host = maxconnperhost
+
+    def can_connect(self, conn_id, user, host, diags):
+        """
+        Register a connection attempt.
+        If all the connection limit rules pass then add the
+        user/host to the connection tables.
+        @param[in] conn_id unique ID for connection, usually IP:port
+        @param[in] user authenticated user ID
+        @param[in] host IP address of host
+        @param[out] diags on failure holds 1, 2, or 3 error strings
+        @return connection is allowed and tracked in state tables
+        """
+        n_user = 0
+        if user in self.per_user_state:
+            n_user = len(self.per_user_state[user])
+        n_host = 0
+        if host in self.per_host_state:
+            n_host = len(self.per_host_state[host])
+
+        allowbytotal = self.max_total == 0 or self.connections_active < self.max_total
+        allowbyuser  = self.max_per_user == 0 or n_user < self.max_per_user
+        allowbyhost  = self.max_per_host == 0 or n_host < self.max_per_host
+
+        if allowbytotal and allowbyuser and allowbyhost:
+            if not user in self.per_user_state:
+                self.per_user_state[user] = []
+            self.per_user_state[user].append(conn_id)
+            if not host in self.per_host_state:
+                self.per_host_state[host] = []
+            self.per_host_state[host].append(conn_id)
+            self.connections_active += 1
+            self.connections_approved += 1
+            return True
+        else:
+            if not allowbytotal:
+                diags.append("Connection denied by total connection limit")
+            if not allowbyuser:
+                diags.append("Connection denied by per user limit")
+            if not allowbyhost:
+                diags.append("Connection denied by per host limit")
+            self.connections_denied += 1
+            return False
+
+    def disconnect(self, conn_id, user, host):
+        """
+        Unregister a connection
+        """
+        assert(self.connections_active > 0)
+        assert(user in self.per_user_state)
+        assert(conn_id in self.per_user_state[user])
+        assert(conn_id in self.per_host_state[host])
+        self.connections_active -= 1
+        self.per_user_state[user].remove(conn_id)
+        self.per_host_state[host].remove(conn_id)
+
+
+    def count_other_denial(self):
+        """
+        Record the statistic for a connection denied by some other process
+        @return:
+        """
+        self.connections_denied += 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8faf28e..cbfae10 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -59,6 +59,7 @@ set(qpid_dispatch_SOURCES
   log.c
   message.c
   parse.c
+  policy.c
   posix/driver.c
   posix/threading.c
   python_embedded.c


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-dispatch git commit: DISPATCH-188: Implement policy. merge from branch crolke-DISPATCH-188-1.

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index ddd85ae..11575bc 100644
--- a/src/container.c
+++ b/src/container.c
@@ -21,6 +21,7 @@
 #include <string.h>
 #include "dispatch_private.h"
 #include "connection_manager_private.h"
+#include "policy.h"
 #include <qpid/dispatch/container.h>
 #include <qpid/dispatch/server.h>
 #include <qpid/dispatch/message.h>
@@ -267,6 +268,10 @@ static void notify_opened(qd_container_t *container, qd_connection_t *conn, void
     }
 }
 
+void policy_notify_opened(void *container, qd_connection_t *conn, void *context)
+{
+	notify_opened((qd_container_t *)container, (qd_connection_t *)conn, context);
+}
 
 static void notify_closed(qd_container_t *container, qd_connection_t *conn, void *context)
 {
@@ -367,11 +372,18 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
 
     switch (pn_event_type(event)) {
     case PN_CONNECTION_REMOTE_OPEN :
-        if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
-            pn_connection_open(conn);
-        qd_connection_set_user(qd_conn);
-        qd_connection_manager_connection_opened(qd_conn);
-        notify_opened(container, qd_conn, conn_context);
+        if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
+            // This Open is an externally initiated connection
+            // Let policy engine decide
+            qd_connection_set_event_stall(qd_conn, true);
+            qd_conn->open_container = (void *)container;
+            qd_conn->conn_context = conn_context;
+            qd_connection_invoke_deferred(qd_conn, qd_policy_amqp_open, qd_conn);
+        } else {
+            // This Open is in response to an internally initiated connection
+            qd_connection_manager_connection_opened(qd_conn);
+            notify_opened(container, qd_conn, conn_context);
+        }
         break;
 
     case PN_CONNECTION_REMOTE_CLOSE :
@@ -383,7 +395,13 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
     case PN_SESSION_REMOTE_OPEN :
         ssn = pn_event_session(event);
         if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
-            pn_session_set_incoming_capacity(ssn, 1000000);
+            if (qd_conn->policy_settings) {
+                if (!qd_policy_approve_amqp_session(ssn, qd_conn)) {
+                    break;
+                }
+                qd_conn->n_sessions++;
+            }
+            qd_policy_apply_session_settings(ssn, qd_conn);
             pn_session_open(ssn);
         }
         break;
@@ -400,6 +418,15 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
                 if (pn_link_session(pn_link) == ssn) {
                     qd_link_t *qd_link = (qd_link_t *)pn_link_get_context(pn_link);
                     if (qd_link && qd_link->node) {
+                        if (qd_conn->policy_settings) {
+                            if (qd_link->direction == QD_OUTGOING) {
+                                qd_conn->n_senders--;
+                                assert(qd_conn->n_senders >= 0);
+                            } else {
+                                qd_conn->n_receivers--;
+                                assert(qd_conn->n_receivers >= 0);
+                            }
+                        }
                         qd_log(container->log_source, QD_LOG_NOTICE,
                                "Aborting link '%s' due to parent session end",
                                pn_link_name(pn_link));
@@ -409,6 +436,9 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
                 }
                 pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
             }
+            if (qd_conn->policy_settings) {
+                qd_conn->n_sessions--;
+            }
             pn_session_close(ssn);
         }
         break;
@@ -416,10 +446,23 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
     case PN_LINK_REMOTE_OPEN :
         pn_link = pn_event_link(event);
         if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
-            if (pn_link_is_sender(pn_link))
+            if (pn_link_is_sender(pn_link)) {
+               if (qd_conn->policy_settings) {
+                   if (!qd_policy_approve_amqp_receiver_link(pn_link, qd_conn)) {
+                        break;
+                    }
+                    qd_conn->n_senders++;
+                }
                 setup_outgoing_link(container, pn_link);
-            else
+            } else {
+                if (qd_conn->policy_settings) {
+                    if (!qd_policy_approve_amqp_sender_link(pn_link, qd_conn)) {
+                        break;
+                    }
+                    qd_conn->n_receivers++;
+                }
                 setup_incoming_link(container, pn_link);
+            }
         } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
             handle_link_open(container, pn_link);
         break;
@@ -433,8 +476,20 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
             qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED;
             if (node)
                 node->ntype->link_detach_handler(node->context, qd_link, dt);
-            else if (qd_link->pn_link == pn_link)
+            else if (qd_link->pn_link == pn_link) {
+                if (qd_conn->policy_settings) {
+                    if (pn_link_is_sender(pn_link)) {
+                        qd_conn->n_senders--;
+                        assert (qd_conn->n_senders >= 0);
+                    } else {
+                        qd_conn->n_receivers--;
+                        assert (qd_conn->n_receivers >= 0);
+                    }
+                } else {
+                    // no policy - links not counted
+                }
                 pn_link_close(pn_link);
+            }
             if (qd_link->close_sess_with_link && qd_link->pn_sess &&
                 pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED))
                 pn_session_close(qd_link->pn_sess);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index c05d24c..73bd0b5 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -30,6 +30,7 @@
 #include "log_private.h"
 #include "router_private.h"
 #include "message_private.h"
+#include "policy.h"
 #include "entity.h"
 #include "entity_cache.h"
 #include <dlfcn.h>
@@ -42,6 +43,8 @@ qd_server_t    *qd_server(qd_dispatch_t *qd, int tc, const char *container_name,
 void            qd_server_free(qd_server_t *server);
 qd_container_t *qd_container(qd_dispatch_t *qd);
 void            qd_container_free(qd_container_t *container);
+qd_policy_t    *qd_policy(qd_dispatch_t *qd);
+void            qd_policy_free(qd_policy_t *policy);
 qd_router_t    *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id);
 void            qd_router_setup_late(qd_dispatch_t *qd);
 void            qd_router_free(qd_router_t *router);
@@ -67,6 +70,7 @@ qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
     qd_message_initialize();
     if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
     qd->log_source = qd_log_source("DISPATCH");
+    qd->dl_handle = 0;
     return qd;
 }
 
@@ -76,15 +80,15 @@ STATIC_ASSERT(sizeof(long) >= sizeof(void*), pointer_is_bigger_than_long);
 
 qd_error_t qd_dispatch_load_config(qd_dispatch_t *qd, const char *config_path)
 {
-    void *handle = dlopen(QPID_DISPATCH_LIB, RTLD_LAZY | RTLD_NOLOAD);
-    if (!handle)
+    qd->dl_handle = dlopen(QPID_DISPATCH_LIB, RTLD_LAZY | RTLD_NOLOAD);
+    if (!qd->dl_handle)
         return qd_error(QD_ERROR_RUNTIME, "Cannot locate library %s", QPID_DISPATCH_LIB);
 
     qd_python_lock_state_t lock_state = qd_python_lock();
     PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.management.config");
     PyObject *configure_dispatch = module ? PyObject_GetAttrString(module, "configure_dispatch") : NULL;
     Py_XDECREF(module);
-    PyObject *result = configure_dispatch ? PyObject_CallFunction(configure_dispatch, "(lls)", (long)qd, handle, config_path) : NULL;
+    PyObject *result = configure_dispatch ? PyObject_CallFunction(configure_dispatch, "(lls)", (long)qd, qd->dl_handle, config_path) : NULL;
     Py_XDECREF(configure_dispatch);
     if (!result) qd_error_py();
     Py_XDECREF(result);
@@ -155,12 +159,45 @@ qd_error_t qd_dispatch_configure_auto_link(qd_dispatch_t *qd, qd_entity_t *entit
     return qd_error_code();
 }
 
+qd_error_t qd_dispatch_configure_policy(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+    qd_error_t err;
+    err = qd_entity_configure_policy(qd->policy, entity);
+    if (err)
+        return err;
+    return QD_ERROR_NONE;
+}
+
+
+qd_error_t qd_dispatch_register_policy_manager(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+    return qd_register_policy_manager(qd->policy, entity);
+}
+
+
+long qd_dispatch_policy_c_counts_alloc()
+{
+    return qd_policy_c_counts_alloc();
+}
+
+
+void qd_dispatch_policy_c_counts_free(long ccounts)
+{
+    qd_policy_c_counts_free(ccounts);
+}
+
+void qd_dispatch_policy_c_counts_refresh(long ccounts, qd_entity_t *entity)
+{
+    qd_policy_c_counts_refresh(ccounts, entity);
+}
+
 qd_error_t qd_dispatch_prepare(qd_dispatch_t *qd)
 {
     qd->server             = qd_server(qd, qd->thread_count, qd->container_name, qd->sasl_config_path, qd->sasl_config_name);
     qd->container          = qd_container(qd);
     qd->router             = qd_router(qd, qd->router_mode, qd->router_area, qd->router_id);
     qd->connection_manager = qd_connection_manager(qd);
+    qd->policy             = qd_policy(qd);
     return qd_error_code();
 }
 
@@ -177,6 +214,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
     free(qd->container_name);
     free(qd->router_area);
     qd_connection_manager_free(qd->connection_manager);
+    qd_policy_free(qd->policy);
     Py_XDECREF((PyObject*) qd->agent);
     qd_router_free(qd->router);
     qd_container_free(qd->container);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/dispatch_private.h
----------------------------------------------------------------------
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index b955e10..d624f5d 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -40,6 +40,7 @@ typedef struct qd_config_address_t   qd_config_address_t;
 #include <qpid/dispatch/container.h>
 #include <qpid/dispatch/router.h>
 #include <qpid/dispatch/connection_manager.h>
+#include "policy.h"
 #include "server_private.h"
 #include "router_private.h"
 
@@ -49,6 +50,8 @@ struct qd_dispatch_t {
     qd_router_t             *router;
     void                    *agent;
     qd_connection_manager_t *connection_manager;
+    qd_policy_t             *policy;
+    void                    *dl_handle;
 
     int    thread_count;
     char  *container_name;
@@ -107,6 +110,16 @@ qd_error_t qd_dispatch_configure_lrp(qd_dispatch_t *qd, qd_entity_t *entity);
 qd_error_t qd_dispatch_configure_route(qd_dispatch_t *qd, qd_entity_t *entity);
 
 /**
+ * Configure security policy, must be called after qd_dispatch_prepare
+ */
+qd_error_t qd_dispatch_configure_policy(qd_dispatch_t *qd, qd_entity_t *entity);
+
+/**
+ * Configure security policy manager, must be called after qd_dispatch_prepare
+ */
+qd_error_t qd_dispatch_register_policy_manager(qd_dispatch_t *qd, qd_entity_t *entity);
+
+/**
  * \brief Configure the logging module from the
  *        parsed configuration file.  This must be called after the
  *        call to qd_dispatch_prepare completes.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
new file mode 100644
index 0000000..a0f5295
--- /dev/null
+++ b/src/policy.c
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <Python.h>
+#include "qpid/dispatch/python_embedded.h"
+#include "policy.h"
+#include "policy_internal.h"
+#include <stdio.h>
+#include <string.h>
+#include "dispatch_private.h"
+#include "connection_manager_private.h"
+#include "qpid/dispatch/container.h"
+#include "qpid/dispatch/server.h"
+#include <proton/message.h>
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/transport.h>
+#include <proton/error.h>
+#include <proton/event.h>
+
+
+//
+// TODO: when policy dev is more complete lower the log level
+//
+#define POLICY_LOG_LEVEL QD_LOG_CRITICAL
+
+//
+// The current statistics maintained globally through multiple
+// reconfiguration of policy settings.
+//
+static int n_connections = 0;
+static int n_denied = 0;
+static int n_processed = 0;
+
+//
+// error conditions signaled to effect denial
+//
+static char* RESOURCE_LIMIT_EXCEEDED     = "amqp:resource-limit-exceeded";
+//static char* UNAUTHORIZED_ACCESS         = "amqp:unauthorized-access";
+//static char* CONNECTION_FORCED           = "amqp:connection:forced";
+
+//
+// error descriptions signaled to effect denial
+//
+static char* CONNECTION_DISALLOWED         = "connection disallowed by local policy";
+static char* SESSION_DISALLOWED            = "session disallowed by local policy";
+static char* LINK_DISALLOWED               = "link disallowed by local policy";
+
+//
+// Policy configuration/statistics management interface
+//
+struct qd_policy_t {
+    qd_dispatch_t        *qd;
+    qd_log_source_t      *log_source;
+    void                 *py_policy_manager;
+                          // configured settings
+    int                   max_connection_limit;
+    char                 *policyFolder;
+    bool                  enableAccessRules;
+                          // live statistics
+    int                   connections_processed;
+    int                   connections_denied;
+    int                   connections_current;
+};
+
+/** Create the policy structure
+ * @param[in] qd pointer the the qd
+ **/
+qd_policy_t *qd_policy(qd_dispatch_t *qd)
+{
+    qd_policy_t *policy = NEW(qd_policy_t);
+
+    policy->qd                   = qd;
+    policy->log_source           = qd_log_source("POLICY");
+    policy->max_connection_limit = 0;
+    policy->policyFolder         = 0;
+    policy->enableAccessRules    = false;
+    policy->connections_processed= 0;
+    policy->connections_denied   = 0;
+    policy->connections_current  = 0;
+
+    qd_log(policy->log_source, QD_LOG_TRACE, "Policy Initialized");
+    return policy;
+}
+
+
+/** Free the policy structure
+ * @param[in] policy pointer to the policy
+ **/
+void qd_policy_free(qd_policy_t *policy)
+{
+    if (policy->policyFolder)
+        free(policy->policyFolder);
+    free(policy);
+}
+
+//
+//
+#define CHECK() if (qd_error_code()) goto error
+
+qd_error_t qd_entity_configure_policy(qd_policy_t *policy, qd_entity_t *entity)
+{
+    policy->max_connection_limit = qd_entity_opt_long(entity, "maximumConnections", 0); CHECK();
+    if (policy->max_connection_limit < 0)
+        return qd_error(QD_ERROR_CONFIG, "maximumConnections must be >= 0");
+    policy->policyFolder =
+        qd_entity_opt_string(entity, "policyFolder", 0); CHECK();
+    policy->enableAccessRules = qd_entity_opt_bool(entity, "enableAccessRules", false); CHECK();
+    qd_log(policy->log_source, QD_LOG_INFO, "Policy configured maximumConnections: %d, policyFolder: '%s', access rules enabled: '%s'",
+           policy->max_connection_limit, policy->policyFolder, (policy->enableAccessRules ? "true" : "false"));
+    return QD_ERROR_NONE;
+
+error:
+    if (policy->policyFolder)
+        free(policy->policyFolder);
+    qd_policy_free(policy);
+    return qd_error_code();
+}
+
+
+//
+//
+qd_error_t qd_register_policy_manager(qd_policy_t *policy, void *policy_manager)
+{
+    policy->py_policy_manager = policy_manager;
+    return QD_ERROR_NONE;
+}
+
+
+long qd_policy_c_counts_alloc()
+{
+    qd_policy_denial_counts_t * dc = NEW(qd_policy_denial_counts_t);
+    assert(dc);
+    memset(dc, 0, sizeof(qd_policy_denial_counts_t));
+    return (long)dc;
+}
+
+
+void qd_policy_c_counts_free(long ccounts)
+{
+    void *dc = (void *)ccounts;
+    assert(dc);
+    free(dc);
+}
+
+
+qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t *entity)
+{
+    qd_policy_denial_counts_t *dc = (qd_policy_denial_counts_t*)ccounts;
+    if (!qd_entity_set_long(entity, "sessionDenied", dc->sessionDenied) &&
+        !qd_entity_set_long(entity, "senderDenied", dc->senderDenied) &&
+        !qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied) &&
+        !qd_entity_set_long(entity, "dynamicSrcDenied", dc->dynamicSrcDenied) &&
+        !qd_entity_set_long(entity, "anonymousSenderDenied", dc->anonymousSenderDenied) &&
+        !qd_entity_set_long(entity, "linkSourceDenied", dc->linkSourceDenied) &&
+        !qd_entity_set_long(entity, "linkTargetDenied", dc->linkTargetDenied)
+    )
+        return QD_ERROR_NONE;
+    return qd_error_code();
+}
+
+
+/** Update the statistics in qdrouterd.conf["policy"]
+ * @param[in] entity pointer to the policy management object
+ **/
+qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, void *unused) {
+    // Return global stats
+    if (!qd_entity_set_long(entity, "connectionsProcessed", n_processed) &&
+        !qd_entity_set_long(entity, "connectionsDenied", n_denied) &&
+        !qd_entity_set_long(entity, "connectionsCurrent", n_connections)
+    )
+        return QD_ERROR_NONE;
+    return qd_error_code();
+}
+
+
+//
+// Functions related to absolute connection counts.
+// These handle connections at the socket level with
+// no regard to user identity. Simple yes/no decisions
+// are made and there is no AMQP channel for returning
+// error conditions.
+//
+
+bool qd_policy_socket_accept(void *context, const char *hostname)
+{
+    qd_policy_t *policy = (qd_policy_t *)context;
+    bool result = true;
+
+    if (policy->max_connection_limit == 0) {
+        // Policy not in force; connection counted and allowed
+        n_connections += 1;
+    } else {
+        // Policy in force
+        if (n_connections < policy->max_connection_limit) {
+            // connection counted and allowed
+            n_connections += 1;
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' allowed. N= %d", hostname, n_connections);
+        } else {
+            // connection denied
+            result = false;
+            n_denied += 1;
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' denied, N=%d", hostname, n_connections);
+        }
+    }
+    n_processed += 1;
+    return result;
+}
+
+
+//
+//
+void qd_policy_socket_close(void *context, const qd_connection_t *conn)
+{
+    qd_policy_t *policy = (qd_policy_t *)context;
+
+    n_connections -= 1;
+    assert (n_connections >= 0);
+    if (policy->enableAccessRules) {
+        // HACK ALERT: TODO: This should be deferred to a Python thread
+        qd_python_lock_state_t lock_state = qd_python_lock();
+        PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.policy.policy_manager");
+        if (module) {
+            PyObject *close_connection = PyObject_GetAttrString(module, "policy_close_connection");
+            if (close_connection) {
+                PyObject *result = PyObject_CallFunction(close_connection, "(OK)",
+                                                         (PyObject *)policy->py_policy_manager,
+                                                          conn->connection_id);
+                if (result) {
+                    Py_XDECREF(result);
+                } else {
+                    qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection close failed: result");
+                }
+                Py_XDECREF(close_connection);
+            } else {
+                qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection close failed: close_connection");
+            }
+            Py_XDECREF(module);
+        } else {
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection close failed: module");
+        }
+        qd_python_unlock(lock_state);
+    }
+    if (policy->max_connection_limit > 0) {
+        const char *hostname = qdpn_connector_name(conn->pn_cxtr);
+        qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' closed. N connections=%d", hostname, n_connections);
+    }
+}
+
+
+//
+// Functions related to authenticated connection denial.
+// An AMQP Open has been received over some connection.
+// Evaluate the connection auth and the Open fields to
+// allow or deny the Open. Denied Open attempts are
+// effected by returning Open and then Close_with_condition.
+//
+/** Look up user/host/app in python policyRuleset and give the AMQP Open
+ *  a go-no_go decision. Return false if the mechanics of calling python
+ *  fails. A policy lookup will deny the connection by returning a blank
+ *  usergroup name in the name buffer.
+ *  Connection and connection denial counting is done in the python code.
+ * @param[in] policy pointer to policy
+ * @param[in] username authenticated user name
+ * @param[in] hostip numeric host ip address
+ * @param[in] app application name received in remote AMQP Open.hostname
+ * @param[in] conn_name connection name for tracking
+ * @param[out] name_buf pointer to settings name buffer
+ * @param[in] name_buf_size size of settings_buf
+ **/
+bool qd_policy_open_lookup_user(
+    qd_policy_t *policy,
+    const char *username,
+    const char *hostip,
+    const char *app,
+    const char *conn_name,
+    char       *name_buf,
+    int         name_buf_size,
+    uint64_t    conn_id,
+    qd_policy_settings_t *settings)
+{
+    // TODO: crolke 2016-03-24 - Workaround for PROTON-1133: Port number is included in Open hostname
+    // Strip the ':NNNN', if any, from the app name so that policy will work with proton 0.12
+    char appname[HOST_NAME_MAX + 1];
+    strncpy(appname, app, HOST_NAME_MAX);
+    appname[HOST_NAME_MAX] = 0;
+    char * colonp = strstr(appname, ":");
+    if (colonp) {
+        *colonp = 0;
+    }
+    // Lookup the user/host/app for allow/deny and to get settings name
+    bool res = false;
+    qd_python_lock_state_t lock_state = qd_python_lock();
+    PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.policy.policy_manager");
+    if (module) {
+        PyObject *lookup_user = PyObject_GetAttrString(module, "policy_lookup_user");
+        if (lookup_user) {
+            PyObject *result = PyObject_CallFunction(lookup_user, "(OssssK)",
+                                                     (PyObject *)policy->py_policy_manager,
+                                                     username, hostip, appname, conn_name, conn_id);
+            if (result) {
+                const char *res_string = PyString_AsString(result);
+                strncpy(name_buf, res_string, name_buf_size);
+                Py_XDECREF(result);
+                res = true; // settings name returned
+            } else {
+                qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: result");
+            }
+            Py_XDECREF(lookup_user);
+        } else {
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: lookup_user");
+        }
+    }
+    if (!res) {
+        if (module) {
+            Py_XDECREF(module);
+        }
+        qd_python_unlock(lock_state);
+        return false;
+    }
+
+    // 
+    if (name_buf[0]) {
+        // Go get the named settings
+        res = false;
+        PyObject *upolicy = PyDict_New();
+        if (upolicy) {
+            PyObject *lookup_settings = PyObject_GetAttrString(module, "policy_lookup_settings");
+            if (lookup_settings) {
+                PyObject *result2 = PyObject_CallFunction(lookup_settings, "(OssO)",
+                                                        (PyObject *)policy->py_policy_manager,
+                                                        appname, name_buf, upolicy);
+                if (result2) {
+                    settings->maxFrameSize         = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0);
+                    settings->maxMessageSize       = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0);
+                    settings->maxSessionWindow     = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0);
+                    settings->maxSessions          = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0);
+                    settings->maxSenders           = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0);
+                    settings->maxReceivers         = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0);
+                    settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false);
+                    settings->allowDynamicSrc      = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSrc", false);
+                    settings->sources              = qd_entity_get_string((qd_entity_t*)upolicy, "sources");
+                    settings->targets              = qd_entity_get_string((qd_entity_t*)upolicy, "targets");
+                    settings->denialCounts         = (qd_policy_denial_counts_t*)
+                                                    qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts");
+                    Py_XDECREF(result2);
+                    res = true; // named settings content returned
+                } else {
+                    qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: result2");
+                }
+                Py_XDECREF(lookup_settings);
+            } else {
+                qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: lookup_settings");
+            }
+            Py_XDECREF(upolicy);
+        } else {
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: upolicy");
+        }
+    }
+    Py_XDECREF(module);
+    qd_python_unlock(lock_state);
+
+    qd_log(policy->log_source, 
+           POLICY_LOG_LEVEL, 
+           "Policy AMQP Open lookup_user: %s, hostip: %s, app: %s, connection: %s. Usergroup: '%s'%s",
+           username, hostip, appname, conn_name, name_buf, (res ? "" : " Internal error."));
+
+    return res;
+}
+
+
+//
+//
+void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char *cond_name, const char *cond_descr)
+{
+    pn_condition_t * cond = pn_connection_condition(conn);
+    (void) pn_condition_set_name(       cond, cond_name);
+    (void) pn_condition_set_description(cond, cond_descr);
+    pn_connection_close(conn);
+}
+
+
+//
+//
+void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
+{
+    pn_condition_t * cond = pn_session_condition(ssn);
+    (void) pn_condition_set_name(       cond, RESOURCE_LIMIT_EXCEEDED);
+    (void) pn_condition_set_description(cond, SESSION_DISALLOWED);
+    pn_session_close(ssn);
+
+    pn_connection_t *conn = qd_connection_pn(qd_conn);
+    qd_dispatch_t *qd = qd_conn->server->qd;
+    qd_policy_t *policy = qd->policy;
+    pn_transport_t *pn_trans = pn_connection_transport(conn);
+    const char *username = pn_transport_get_user(pn_trans);
+    const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
+    const char *app = pn_connection_remote_hostname(conn);
+    qd_log(policy->log_source, 
+           POLICY_LOG_LEVEL, 
+           "Policy AMQP Begin Session denied due to session limit. user: %s, hostip: %s, app: %s", 
+           username, hostip, app);
+
+    qd_conn->policy_settings->denialCounts->sessionDenied++;
+}
+
+
+//
+//
+bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
+{
+    if (qd_conn->policy_settings) {
+        if (qd_conn->policy_settings->maxSessions) {
+            if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) {
+                qd_policy_deny_amqp_session(ssn, qd_conn);
+                return false;
+            }
+        }
+    }
+    return true;
+}
+
+
+//
+//
+void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn)
+{
+    if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) {
+        pn_session_set_incoming_capacity(ssn, qd_conn->policy_settings->maxSessionWindow);
+    } else {
+        pn_session_set_incoming_capacity(ssn, 1000000);
+    }
+}
+
+//
+//
+void _qd_policy_deny_amqp_link(pn_link_t *link, qd_connection_t *qd_conn, char * s_or_r)
+{
+    pn_condition_t * cond = pn_link_condition(link);
+    (void) pn_condition_set_name(       cond, RESOURCE_LIMIT_EXCEEDED);
+    (void) pn_condition_set_description(cond, LINK_DISALLOWED);
+    pn_link_close(link);
+
+    pn_connection_t *conn = qd_connection_pn(qd_conn);
+    qd_dispatch_t *qd = qd_conn->server->qd;
+    qd_policy_t *policy = qd->policy;
+    pn_transport_t *pn_trans = pn_connection_transport(conn);
+    const char *username = pn_transport_get_user(pn_trans);
+    const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
+    const char *app = pn_connection_remote_hostname(conn);
+    qd_log(policy->log_source, 
+           POLICY_LOG_LEVEL, 
+           "Policy AMQP Attach Link denied due to %s limit. user: %s, hostip: %s, app: %s", 
+           s_or_r, username, hostip, app);
+}
+
+
+//
+//
+void _qd_policy_deny_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn)
+{
+    _qd_policy_deny_amqp_link(pn_link, qd_conn, "sender");
+    qd_conn->policy_settings->denialCounts->senderDenied++;
+}
+
+
+//
+//
+void _qd_policy_deny_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn)
+{
+    _qd_policy_deny_amqp_link(pn_link, qd_conn, "receiver");
+    qd_conn->policy_settings->denialCounts->receiverDenied++;
+}
+
+
+//
+//
+#define MIN(a,b) (((a)<(b))?(a):(b))
+
+char * _qd_policy_link_user_name_subst(const char *uname, const char *proposed, char *obuf, int osize)
+{
+    if (strlen(uname) == 0)
+        return NULL;
+    
+    const char *duser = "${user}";
+    char *retptr = obuf;
+    const char *wiptr = proposed;
+    const char *findptr = strstr(proposed, uname);
+    if (findptr == NULL) {
+        return NULL;
+    }
+
+    // Copy leading before match
+    int segsize = findptr - wiptr;
+    int copysize = MIN(osize, segsize);
+    if (copysize)
+        strncpy(obuf, wiptr, copysize);
+    wiptr += copysize;
+    osize -= copysize;
+    obuf  += copysize;
+
+    // Copy the substitution string
+    segsize = strlen(duser);
+    copysize = MIN(osize, segsize);
+    if (copysize)
+        strncpy(obuf, duser, copysize);
+    wiptr += strlen(uname);
+    osize -= copysize;
+    obuf  += copysize;
+
+    // Copy trailing after match
+    strncpy(obuf, wiptr, osize);
+    return retptr;
+}
+
+
+//
+//
+// Size of 'easy' temporary copy of allowed input string
+#define QPALN_SIZE 1024
+// Size of user-name-substituted proposed string.
+#define QPALN_USERBUFSIZE 300
+// C in the CSV string
+#define QPALN_COMMA_SEP ","
+// Wildcard character
+#define QPALN_WILDCARD '*'
+
+bool _qd_policy_approve_link_name(const char *username, const char *allowed, const char *proposed)
+{
+    // Verify string sizes are usable
+    size_t p_len = strlen(proposed);
+    if (p_len == 0) {
+        // degenerate case of blank name being opened. will never match anything.
+        return false;
+    }
+    size_t a_len = strlen(allowed);
+    if (a_len == 0) {
+        // no names in 'allowed'.
+        return false;
+    }
+
+    // Create a temporary writable copy of incoming allowed list
+    char t_allow[QPALN_SIZE + 1]; // temporary buffer for normal allow lists
+    char * pa = t_allow;
+    if (a_len > QPALN_SIZE) {
+        pa = (char *)malloc(a_len + 1); // malloc a buffer for larger allow lists
+    }
+    strncpy(pa, allowed, a_len);
+    pa[a_len] = 0;
+    // Do reverse user substitution into proposed
+    char substbuf[QPALN_USERBUFSIZE];
+    char * prop2 = _qd_policy_link_user_name_subst(username, proposed, substbuf, QPALN_USERBUFSIZE);
+    char *tok, *toknext;
+    tok = strtok_r(pa, QPALN_COMMA_SEP, &toknext);
+    assert (tok);
+    bool result = false;
+    while (tok != NULL) {
+        if (*tok == QPALN_WILDCARD) {
+            result = true;
+            break;
+        }
+        int matchlen = p_len;
+        int len = strlen(tok);
+        if (tok[len-1] == QPALN_WILDCARD) {
+            matchlen = len - 1;
+            assert(len > 0);
+        }
+        if (strncmp(tok, proposed, matchlen) == 0) {
+            result = true;
+            break;
+        }
+        if (prop2 && strncmp(tok, prop2, matchlen) == 0) {
+            result = true;
+            break;
+        }
+        tok = strtok_r(NULL, QPALN_COMMA_SEP, &toknext);
+    }
+    if (pa != t_allow) {
+        free(pa);
+    }
+    return result;
+}
+
+//
+//
+bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn)
+{
+    pn_connection_t *conn = qd_connection_pn(qd_conn);
+    pn_transport_t *pn_trans = pn_connection_transport(conn);
+    const char *username = pn_transport_get_user(pn_trans);
+    if (qd_conn->policy_settings->maxSenders) {
+        if (qd_conn->n_senders == qd_conn->policy_settings->maxSenders) {
+            // Max sender limit specified and violated.
+            _qd_policy_deny_amqp_sender_link(pn_link, qd_conn);
+            return false;
+        } else {
+            // max sender limit not violated
+        }
+    } else {
+        // max sender limit not specified
+    }
+    // Deny sender link based on target
+    const char * target = pn_terminus_get_address(pn_link_remote_target(pn_link));
+    bool lookup;
+    if (target && *target) {
+        // a target is specified
+        lookup = _qd_policy_approve_link_name(username, qd_conn->policy_settings->targets, target);
+
+        qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_TRACE,
+            "Approve sender link '%s' for user '%s': %s",
+            target, username, (lookup ? "ALLOW" : "DENY"));
+
+        if (!lookup) {
+            _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn);
+            return false;
+        }
+    } else {
+        // A sender with no remote target.
+        // This happens all the time with anonymous relay
+        lookup = qd_conn->policy_settings->allowAnonymousSender;
+        qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_TRACE,
+            "Approve anonymous sender for user '%s': %s",
+            username, (lookup ? "ALLOW" : "DENY"));
+        if (!lookup) {
+            _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn);
+            return false;
+        }
+    }
+    return true;
+}
+
+
+bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn)
+{
+    pn_connection_t *conn = qd_connection_pn(qd_conn);
+    pn_transport_t *pn_trans = pn_connection_transport(conn);
+    const char *username = pn_transport_get_user(pn_trans);
+    if (qd_conn->policy_settings->maxReceivers) {
+        if (qd_conn->n_receivers == qd_conn->policy_settings->maxReceivers) {
+            // Max sender limit specified and violated.
+            _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn);
+            return false;
+        } else {
+            // max receiver limit not violated
+        }
+    } else {
+        // max receiver limit not specified
+    }
+    // Deny receiver link based on source
+    bool dynamic_src = pn_terminus_is_dynamic(pn_link_remote_source(pn_link));
+    if (dynamic_src) {
+        bool lookup = qd_conn->policy_settings->allowDynamicSrc;
+        qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_TRACE,
+            "Approve dynamic source for user '%s': %s",
+            username, (lookup ? "ALLOW" : "DENY"));
+        // Dynamic source policy rendered the decision
+        if (!lookup) {
+            _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn);
+        }
+        return lookup;
+    }
+    const char * source = pn_terminus_get_address(pn_link_remote_source(pn_link));
+    if (source && *source) {
+        // a source is specified
+        bool lookup = _qd_policy_approve_link_name(username, qd_conn->policy_settings->sources, source);
+
+        qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_TRACE,
+            "Approve receiver link '%s' for user '%s': %s",
+            source, username, (lookup ? "ALLOW" : "DENY"));
+
+        if (!lookup) {
+            _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn);
+            return false;
+        }
+    } else {
+        // A receiver with no remote source.
+        qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_TRACE,
+               "Approve receiver link '' for user '%s': DENY",
+               username);
+
+        _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn);
+        return false;
+    }
+    return true;
+}
+
+
+//
+//
+void qd_policy_amqp_open(void *context, bool discard)
+{
+    qd_connection_t *qd_conn = (qd_connection_t *)context;
+    if (!discard) {
+        pn_connection_t *conn = qd_connection_pn(qd_conn);
+        qd_dispatch_t *qd = qd_conn->server->qd;
+        qd_policy_t *policy = qd->policy;
+        bool connection_allowed = true;
+
+        if (policy->enableAccessRules) {
+            // Open connection or not based on policy.
+            // username = pn_connection_get_user(conn) returns blank when
+            // the transport returns 'anonymous'.
+            pn_transport_t *pn_trans = pn_connection_transport(conn);
+            const char *username = pn_transport_get_user(pn_trans);
+
+            const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
+            const char *app = pn_connection_remote_hostname(conn);
+            const char *conn_name = qdpn_connector_name(qd_conn->pn_cxtr);
+#define SETTINGS_NAME_SIZE 256
+            char settings_name[SETTINGS_NAME_SIZE];
+            uint32_t conn_id = qd_conn->connection_id;
+            qd_conn->policy_settings = NEW(qd_policy_settings_t); // TODO: memory pool for settings
+            memset(qd_conn->policy_settings, 0, sizeof(qd_policy_settings_t));
+
+            if (qd_policy_open_lookup_user(policy, username, hostip, app, conn_name, 
+                                           settings_name, SETTINGS_NAME_SIZE, conn_id,
+                                           qd_conn->policy_settings) &&
+                settings_name[0]) {
+                // This connection is allowed by policy.
+                // Apply tranport policy settings
+                if (qd_conn->policy_settings->maxFrameSize > 0)
+                    pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize);
+                if (qd_conn->policy_settings->maxSessions > 0)
+                    pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions);
+            } else {
+                // This connection is denied by policy.
+                connection_allowed = false;
+                qd_policy_private_deny_amqp_connection(conn, RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED);
+            }
+        } else {
+            // This connection not subject to policy and implicitly allowed.
+            // Note that connections not goverened by policy have no policy_settings.
+        }
+        if (connection_allowed) {
+            if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+                pn_connection_open(conn);
+            qd_connection_manager_connection_opened(qd_conn);
+            policy_notify_opened(qd_conn->open_container, qd_conn, qd_conn->conn_context);
+        }
+    }
+    qd_connection_set_event_stall(qd_conn, false);
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/policy.h
----------------------------------------------------------------------
diff --git a/src/policy.h b/src/policy.h
new file mode 100644
index 0000000..d4fc2b2
--- /dev/null
+++ b/src/policy.h
@@ -0,0 +1,165 @@
+#ifndef __policy_h__
+#define __policy_h__
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/dispatch.h"
+#include "qpid/dispatch/server.h"
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/static_assert.h"
+
+#include "config.h"
+#include "alloc.h"
+#include "entity.h"
+#include "entity_cache.h"
+#include <dlfcn.h>
+
+typedef struct qd_policy_denial_counts_s qd_policy_denial_counts_t;
+
+// TODO: Provide locking
+struct qd_policy_denial_counts_s {
+    int sessionDenied;
+    int senderDenied;
+    int receiverDenied;
+    int dynamicSrcDenied;
+    int anonymousSenderDenied;
+    int linkSourceDenied;
+    int linkTargetDenied;
+};
+
+typedef struct qd_policy_t qd_policy_t;
+
+struct qd_policy__settings_s {
+    int  maxFrameSize;
+    int  maxMessageSize;
+    int  maxSessionWindow;
+    int  maxSessions;
+    int  maxSenders;
+    int  maxReceivers;
+    bool allowDynamicSrc;
+    bool allowAnonymousSender;
+    char *sources;
+    char *targets;
+    qd_policy_denial_counts_t *denialCounts;
+};
+
+typedef struct qd_policy__settings_s qd_policy_settings_t;
+
+/** Configure the C policy entity from the settings in qdrouterd.conf["policy"]
+ * Called python-to-C during config processing.
+ * @param[in] policy pointer to the policy
+ * @param[in] entity pointer to the managed entity
+ * @return error or not. If error then the policy is freed.
+ **/
+qd_error_t qd_entity_configure_policy(qd_policy_t *policy, qd_entity_t *entity);
+
+/** Memorize the address of python policy_manager object.
+ * This python object gets called by C to execute user lookups 
+ * @param[in] policy pointer to the policy
+ * @param[in] policy_manager the address of the policy_manager object
+ **/
+qd_error_t qd_register_policy_manager(qd_policy_t *policy, void *policy_manager);
+
+
+/** Allocate counts statistics block.
+ * Called from Python
+ */
+long qd_policy_c_counts_alloc();
+
+/** Free counts statistics block.
+ * Called from Python
+ */
+void qd_policy_c_counts_free(long ccounts);
+
+/** Refresh a counts statistics block
+ * Called from Python
+ */
+qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t*entity);
+
+
+/** Allow or deny an incoming connection based on connection count(s).
+ * A server listener has just accepted a socket.
+ * Allow or deny this connection based on the absolute number
+ *  of allowed connections.
+ * The identity of the connecting user has not been negotiated yet.
+ * @param[in] context the current policy
+ * @param[in] name the connector name
+ * @return the connection is allowed or not
+ **/
+bool qd_policy_socket_accept(void *context, const char *hostname);
+
+
+/** Record a closing connection.
+ * A server listener is closing a socket.
+ * Release the counted connection against provisioned limits
+ * 
+ * @param[in] context the current policy
+ * @param[in] conn qd_connection
+ **/
+void qd_policy_socket_close(void *context, const qd_connection_t *conn);
+
+
+/** Approve a new session based on connection's policy.
+ * Sessions denied are closed and counted.
+ *
+ * @param[in] ssn proton session being approved
+ * @param[in] qd_conn dispatch connection with policy settings and counts
+ **/
+bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn);
+
+
+/** Apply policy or default settings for a new session.
+ *
+ * @param[in] ssn proton session being set
+ * @param[in] qd_conn dispatch connection with policy settings and counts
+ **/
+void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn);
+
+
+/** Approve a new sender link based on connection's policy.
+ * Links denied are closed and counted.
+ *
+ * @param[in] pn_link proton link being approved
+ * @param[in] qd_conn dispatch connection with policy settings and counts
+ **/
+bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn);
+
+
+/** Approve a new receiver link based on connection's policy.
+ * Links denied are closed and counted.
+ *
+ * @param[in] pn_link proton link being approved
+ * @param[in] qd_conn dispatch connection with policy settings and counts
+ **/
+bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn);
+
+
+/** Allow or deny an incoming connection.
+ * An Open performative was received over a new connection.
+ * Consult local policy to determine if this host/user is
+ *  allowed to make this connection.
+ * Denied pn_connections are closed with a condition.
+ * Allowed connections are signaled through qd_connection_manager.
+ * This function is called from the deferred queue.
+ * @param[in] context a qd_connection_t object
+ * @param[in] discard callback switch
+ **/
+void qd_policy_amqp_open(void *context, bool discard);
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/policy_internal.h
----------------------------------------------------------------------
diff --git a/src/policy_internal.h b/src/policy_internal.h
new file mode 100644
index 0000000..9e865e6
--- /dev/null
+++ b/src/policy_internal.h
@@ -0,0 +1,103 @@
+#ifndef __policy_internal_h__
+#define __policy_internal_h__
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "policy.h"
+
+/**
+ * Private Function Prototypes
+ */
+/** Set the error condition and close the connection.
+ * Over the wire this will send an open frame followed
+ * immediately by a close frame with the error condition.
+ * @param[in] conn proton connection being closed
+ * @param[in] cond_name condition name
+ * @param[in] cond_descr condition description
+ **/ 
+void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char *cond_name, const char *cond_descr);
+
+
+/** Internal function to deny an amqp session
+ * The session is closed with a condition and the denial is logged and counted.
+ * @param[in,out] ssn proton session being closed
+ * @param[in,out] qd_conn dispatch connection
+ */
+void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn);
+
+
+/** Internal function to deny an amqp link
+ * The link is closed and the denial is logged but not counted.
+ * @param[in] link proton link being closed
+ * @param[in] qd_conn the qd conection
+ * @param[in] s_or_r 'sender' or 'receiver' for logging
+ */ 
+void _qd_policy_deny_amqp_link(pn_link_t *link, qd_connection_t *qd_conn, char * s_or_r);
+
+
+/** Internal function to deny a sender amqp link
+ * The link is closed and the denial is logged but not counted.
+ * @param[in] link proton link to close
+ * @param[in] qd_conn the qd conection
+ * @param[in] s_or_r 'sender' or 'receiver' for logging
+ */ 
+void _qd_policy_deny_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn);
+
+
+/** Internal function to deny a receiver amqp link
+ * The link is closed and the denial is logged but not counted.
+ * @param[in] link proton link to close
+ * @param[in] qd_conn the qd conection
+ * @param[in] s_or_r 'sender' or 'receiver' for logging
+ */ 
+void _qd_policy_deny_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn);
+
+
+/** Perform user name substitution into proposed link name.
+ * The scheme is to substitute '${user}' into the incoming link name whereever the
+ * the username is present. Then it can be matched against the original template with
+ * a minimum of substitutions. For example:
+ * uname    : joe
+ * proposed : temp_joe_1
+ * obuf     : temp_${user}_1
+ * Note: substituted names are limited to osize characters
+ * Note: only the first (leftmost) user name is substituted.
+ *
+ * @param[in] uname auth user name
+ * @param[in] proposed the link name from the AMQP frame
+ * @param[out] obuf where the constructed link name is returned
+ * @param[in] osize size in bytes of obuf
+ * @return NULL if uname is not present in proposed link name.
+ */
+char * _qd_policy_link_user_name_subst(const char *uname, const char *proposed, char *obuf, int osize);
+
+
+/** Approve link by source/target name.
+ * This match supports trailing wildcard match:
+ *    proposed 'temp-305' matches allowed 'temp-*'
+ * This match supports username substitution:
+ *    user 'joe', proposed 'temp-joe' matches allowed 'temp-${user}'
+ * Both username substitution and wildcards are allowed:
+ *    user 'joe', proposed 'temp-joe-100' matches allowed 'temp-${user}*'
+ * @param[in] username authenticated user name
+ * @param[in] allowed policy settings source/target string in packed CSV form.
+ * @param[in] proposed the link target name to be approved
+ */
+bool _qd_policy_approve_link_name(const char *username, const char *allowed, const char *proposed);
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/posix/driver.c
----------------------------------------------------------------------
diff --git a/src/posix/driver.c b/src/posix/driver.c
index 58c5e0c..98e1be4 100644
--- a/src/posix/driver.c
+++ b/src/posix/driver.c
@@ -104,6 +104,7 @@ struct qdpn_connector_t {
     DEQ_LINKS(qdpn_connector_t);
     qdpn_driver_t *driver;
     char name[PN_NAME_MAX];
+    char hostip[PN_NAME_MAX];
     pn_timestamp_t wakeup;
     pn_connection_t *connection;
     pn_transport_t *transport;
@@ -373,12 +374,13 @@ void qdpn_listener_set_context(qdpn_listener_t *listener, void *context)
     listener->context = context;
 }
 
-qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l)
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l, void *policy, bool (*policy_fn)(void *, const char *name))
 {
     if (!l || !l->pending) return NULL;
     char name[PN_NAME_MAX];
     char host[MAX_HOST];
     char serv[MAX_SERV];
+    char hostip[MAX_HOST];
 
     struct sockaddr_in addr = {0};
     addr.sin_family = AF_UNSPEC;
@@ -390,7 +392,8 @@ qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l)
         return 0;
     } else {
         int code;
-        if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, host, MAX_HOST, serv, MAX_SERV, 0))) {
+        if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, host, MAX_HOST, serv, MAX_SERV, 0)) ||
+            (code = getnameinfo((struct sockaddr *) &addr, addrlen, hostip, MAX_HOST, 0, 0, NI_NUMERICHOST))) {
             qd_log(l->driver->log, QD_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(code));
             close(sock);
             return 0;
@@ -400,11 +403,17 @@ qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l)
         }
     }
 
+    if (policy_fn && !(*policy_fn)(policy, name)) {
+        close(sock);
+        return 0;
+    }
+    
     if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
         fprintf(stderr, "Accepted from %s\n", name);
 
     qdpn_connector_t *c = qdpn_connector_fd(l->driver, sock, NULL);
     snprintf(c->name, PN_NAME_MAX, "%s", name);
+    snprintf(c->hostip, PN_NAME_MAX, "%s", hostip);
     c->listener = l;
     return c;
 }
@@ -608,6 +617,12 @@ const char *qdpn_connector_name(const qdpn_connector_t *ctor)
     return ctor->name;
 }
 
+const char *qdpn_connector_hostip(const qdpn_connector_t *ctor)
+{
+    if (!ctor) return 0;
+    return ctor->hostip;
+}
+
 qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *ctor)
 {
     return ctor ? ctor->listener : NULL;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 173c175..5248540 100644
--- a/src/server.c
+++ b/src/server.c
@@ -25,6 +25,7 @@
 #include "entity.h"
 #include "entity_cache.h"
 #include "dispatch_private.h"
+#include "policy.h"
 #include "server_private.h"
 #include "timer_private.h"
 #include "alloc.h"
@@ -75,6 +76,19 @@ static qd_thread_t *thread(qd_server_t *qd_server, int id)
     return thread;
 }
 
+static void free_qd_connection(qd_connection_t *ctx)
+{
+    if (ctx->policy_settings) {
+        if (ctx->policy_settings->sources)
+            free(ctx->policy_settings->sources);
+        if (ctx->policy_settings->targets)
+            free(ctx->policy_settings->targets);
+        free (ctx->policy_settings);
+        ctx->policy_settings = 0;
+    }
+    free_qd_connection_t(ctx);
+}
+
 qd_error_t qd_entity_update_connection(qd_entity_t* entity, void *impl);
 
 /**
@@ -455,7 +469,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
     qd_connection_t  *ctx;
 
     for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) {
-        cxtr = qdpn_listener_accept(listener);
+        cxtr = qdpn_listener_accept(listener, qd_server->qd->policy, &qd_policy_socket_accept);
         if (!cxtr)
             continue;
 
@@ -463,6 +477,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
 
         qd_log(qd_server->log_source, QD_LOG_DEBUG, "Accepting %s",
                log_incoming(logbuf, sizeof(logbuf), cxtr));
+        
         ctx = new_qd_connection_t();
         DEQ_ITEM_INIT(ctx);
         ctx->server        = qd_server;
@@ -480,8 +495,14 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
         ctx->link_context  = 0;
         ctx->ufd           = 0;
         ctx->connection_id = qd_server->next_connection_id++; // Increment the connection id so the next connection can use it
+        ctx->policy_settings = 0;
+        ctx->n_senders       = 0;
+        ctx->n_receivers     = 0;
+        ctx->open_container  = 0;
+        ctx->conn_context    = 0;
         DEQ_INIT(ctx->deferred_calls);
         ctx->deferred_call_lock = sys_mutex();
+        ctx->event_stall  = false;
 
         pn_connection_t *conn = pn_connection();
         ctx->collector = pn_collector();
@@ -652,40 +673,41 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
         pn_event_t      *event;
 
         events = 0;
-        event = pn_collector_peek(collector);
-        while (event) {
-            //
-            // If we are transitioning to the open state, notify the client via callback.
-            //
-            if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) {
-                ctx->opened = true;
-                qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN;
-
-                if (ctx->connector) {
-                    ce = QD_CONN_EVENT_CONNECTOR_OPEN;
-                    ctx->connector->delay = 0;
-                } else
-                    assert(ctx->listener);
-
-                qd_server->conn_handler(qd_server->conn_handler_context,
-                                        ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr));
-                events = 1;
-                break;  // Break without popping this event.  It will be re-processed in OPERATIONAL state.
-            } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) {
-                ctx->closed = true;
-                qdpn_connector_close(cxtr);
-                if (ctx->connector) {
-                    const qd_server_config_t *config = ctx->connector->config;
-                    qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+        if (!ctx->event_stall) {
+            event = pn_collector_peek(collector);
+            while (event) {
+                //
+                // If we are transitioning to the open state, notify the client via callback.
+                //
+                if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) {
+                    ctx->opened = true;
+                    qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN;
+
+                    if (ctx->connector) {
+                        ce = QD_CONN_EVENT_CONNECTOR_OPEN;
+                        ctx->connector->delay = 0;
+                    } else
+                        assert(ctx->listener);
+
+                    qd_server->conn_handler(qd_server->conn_handler_context,
+                                            ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr));
+                    events = 1;
+                } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) {
+                    ctx->closed = true;
+                    qdpn_connector_close(cxtr);
+                    if (ctx->connector) {
+                        const qd_server_config_t *config = ctx->connector->config;
+                        qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+                    }
                 }
-            }
 
-            events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
-            pn_collector_pop(collector);
-            event = pn_collector_peek(collector);
-        }
+                events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
+                pn_collector_pop(collector);
 
-        events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
+                event = ctx->event_stall ? 0 : pn_collector_peek(collector);
+            }
+            events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
+        }
     } while (events > 0);
 
     return passes > 1;
@@ -929,6 +951,10 @@ static void *thread_run(void *arg)
                 sys_mutex_lock(qd_server->lock);
                 DEQ_REMOVE(qd_server->connections, ctx);
 
+                if (!ctx->connector) {
+                    qd_policy_socket_close(qd_server->qd->policy, ctx);
+                }
+
                 qdpn_connector_free(cxtr);
                 if (conn) {
                     pn_connection_set_context(conn, 0);
@@ -938,7 +964,7 @@ static void *thread_run(void *arg)
                     pn_collector_free(ctx->collector);
                 invoke_deferred_calls(ctx, true);  // Discard any pending deferred calls
                 sys_mutex_free(ctx->deferred_call_lock);
-                free_qd_connection_t(ctx);
+                free_qd_connection(ctx);
                 qd_server->threads_active--;
                 sys_mutex_unlock(qd_server->lock);
             } else {
@@ -1027,9 +1053,15 @@ static void cxtr_try_open(void *context)
     ctx->user_context = 0;
     ctx->link_context = 0;
     ctx->ufd          = 0;
-
+    ctx->policy_settings = 0;
+    ctx->n_senders       = 0;
+    ctx->n_receivers     = 0;
+    ctx->open_container  = 0;
+    ctx->conn_context    = 0;
+    
     DEQ_INIT(ctx->deferred_calls);
     ctx->deferred_call_lock = sys_mutex();
+    ctx->event_stall  = false;
 
     qd_log(ct->server->log_source, QD_LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
 
@@ -1053,7 +1085,7 @@ static void cxtr_try_open(void *context)
 
     if (ctx->pn_cxtr == 0) {
         sys_mutex_free(ctx->deferred_call_lock);
-        free_qd_connection_t(ctx);
+        free_qd_connection(ctx);
         ct->delay = 10000;
         qd_timer_schedule(ct->timer, ct->delay);
         return;
@@ -1489,6 +1521,14 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo
 }
 
 
+void qd_connection_set_event_stall(qd_connection_t *conn, bool stall)
+{
+    conn->event_stall = stall;
+     if (!stall)
+         qd_server_activate(conn);
+}
+
+
 qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
 {
     qd_server_t   *qd_server = qd->server;
@@ -1591,8 +1631,14 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context)
     ctx->user_context = 0;
     ctx->link_context = 0;
     ctx->ufd          = ufd;
+    ctx->policy_settings = 0;
+    ctx->n_senders       = 0;
+    ctx->n_receivers     = 0;
+    ctx->open_container  = 0;
+    ctx->conn_context    = 0;
     DEQ_INIT(ctx->deferred_calls);
     ctx->deferred_call_lock = sys_mutex();
+    ctx->event_stall  = false;
 
     ufd->context = context;
     ufd->server  = qd_server;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index 7a0d1db..11da8e2 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -100,8 +100,15 @@ struct qd_connection_t {
     qd_user_fd_t             *ufd;
     uint64_t                  connection_id; // A unique identifier for the qd_connection_t. The underlying pn_connection already has one but it is long and clunky.
     const char               *user_id; // A unique identifier for the user on the connection. This is currently populated  from the client ssl cert. See ssl_uid_format in server.h for more info
+    qd_policy_settings_t     *policy_settings;
+    int                       n_sessions;
+    int                       n_senders;
+    int                       n_receivers;
+    void                     *open_container;
+    void                     *conn_context;
     qd_deferred_call_list_t   deferred_calls;
     sys_mutex_t              *deferred_call_lock;
+    bool                      event_stall;
 };
 
 DEQ_DECLARE(qd_connection_t, qd_connection_list_t);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index f4f35d9..cd79939 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -28,6 +28,7 @@ include_directories(
 set(unit_test_SOURCES
     compose_test.c
     parse_test.c
+    policy_test.c
     run_unit_tests.c
     server_test.c
     timer_test.c
@@ -65,6 +66,7 @@ add_test(unit_tests            ${TEST_WRAP} --vg unit_tests ${CMAKE_CURRENT_SOUR
 # Unit test python modules
 add_test(router_engine_test    ${TEST_WRAP} -m unittest -v router_engine_test)
 add_test(managemet_test        ${TEST_WRAP} -m unittest -v management)
+add_test(router_policy_test    ${TEST_WRAP} -m unittest -v router_policy_test)
 
 # System test python modules
 foreach(py_test_module
@@ -72,6 +74,7 @@ foreach(py_test_module
     system_tests_link_routes
     system_tests_management
     system_tests_one_router
+    system_tests_policy
     system_tests_protocol_family
     system_tests_qdmanage
     system_tests_qdstat
@@ -93,9 +96,17 @@ list(APPEND SYSTEM_TEST_FILES
 
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config-2/A-ssl.conf.in ${CMAKE_CURRENT_BINARY_DIR}/config-2/A-ssl.conf)
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config-2/B-ssl.conf.in ${CMAKE_CURRENT_BINARY_DIR}/config-2/B-ssl.conf)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/system_tests_policy.py.in ${CMAKE_CURRENT_BINARY_DIR}/system_tests_policy.py)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/policy-1/test-policy-conf-includes-folder.conf.in ${CMAKE_CURRENT_BINARY_DIR}/policy-1/test-policy-conf-includes-folder.conf)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/policy-2/policy-photoserver-sasl.conf.in ${CMAKE_CURRENT_BINARY_DIR}/policy-2/policy-photoserver-sasl.conf)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/policy-2/test-router-with-policy.json.in ${CMAKE_CURRENT_BINARY_DIR}/policy-2/test-router-with-policy.json)
 
 file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/ssl_certs DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
 file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/displayname_files DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
+file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-1/management-access.json  DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-1/)
+file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-1/policy-boardwalk.json   DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-1/)
+file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-1/policy-safari.json      DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-1/)
+file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-2/policy-photoserver-sasl.sasldb  DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-2)
 
 # following install() functions will be called only if you do a make "install"
 install(FILES ${SYSTEM_TEST_FILES}
@@ -114,4 +125,3 @@ install(DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/config-2
 
 install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/ssl_certs
         DESTINATION ${QPID_DISPATCH_HOME_INSTALLED}/tests)
-

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/config-1/A.json
----------------------------------------------------------------------
diff --git a/tests/config-1/A.json b/tests/config-1/A.json
new file mode 100644
index 0000000..01d62e1
--- /dev/null
+++ b/tests/config-1/A.json
@@ -0,0 +1,81 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+
+[
+##
+## Container section - Configures the general operation of the AMQP container.
+##
+    ["container", {
+##
+## workerThreads - The number of threads that will be created to
+## process message traffic and other application work (timers, nonAmqp
+## file descriptors, etc.)
+##
+## The number of threads should be related to the number of available
+## processor cores.  To fully utilize a quad-core system, set the
+## number of threads to 4.
+##
+	"workerThreads": 4,
+	
+##
+## containerName - The name of the AMQP container.  If not specified,
+## the container name will be set to a value of the container's
+## choosing.  The automatically assigned container name is not
+## guaranteed to be persistent across restarts of the container.
+##
+	"containerName": "Qpid.Dispatch.Router.A"
+    }],
+
+    ["router", {
+	"mode": "standalone",
+	"routerId": "QDR"
+    }],
+
+##
+## Listeners and Connectors
+##
+    ["listener", {
+	"addr": "0.0.0.0",
+	"port": 20000,
+	"saslMechanisms": "ANONYMOUS"
+    }],
+    ["fixedAddress", {
+	"prefix": "/closest/",
+	"fanout": "single",
+	"bias": "closest"
+    }],
+
+    ["fixedAddress", {
+	"prefix": "/spread/",
+	"fanout": "single",
+	"bias": "spread"
+    }],
+
+    ["fixedAddress", {
+	"prefix": "/multicast/",
+	"fanout": "multiple"
+    }],
+
+    ["fixedAddress", {
+	"prefix": "/",
+	"fanout": "multiple"
+    }]
+]
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy-1/management-access.json
----------------------------------------------------------------------
diff --git a/tests/policy-1/management-access.json b/tests/policy-1/management-access.json
new file mode 100644
index 0000000..960c4a1
--- /dev/null
+++ b/tests/policy-1/management-access.json
@@ -0,0 +1,45 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+# A policy to allow unrestricted access to management
+# from host 0.0.0.0
+[
+  ["policyRuleset", {
+      "applicationName": "0.0.0.0",
+      "maxConnections": 50,
+      "maxConnPerUser": 5,
+      "maxConnPerHost": 20,
+      "connectionAllowDefault": true,
+      "settings": {
+        "default" : {
+          "maxFrameSize":     222222,
+          "maxMessageSize":   222222,
+          "maxSessionWindow": 222222,
+          "maxSessions":           2,
+          "maxSenders":           22,
+          "maxReceivers":         22,
+          "allowDynamicSrc":      true,
+          "allowAnonymousSender": true,
+          "sources": "$management",
+          "targets": "$management"
+        }
+      }
+    }
+  ]
+]

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy-1/policy-boardwalk.json
----------------------------------------------------------------------
diff --git a/tests/policy-1/policy-boardwalk.json b/tests/policy-1/policy-boardwalk.json
new file mode 100644
index 0000000..174bdef
--- /dev/null
+++ b/tests/policy-1/policy-boardwalk.json
@@ -0,0 +1,96 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+[
+    # The boardwalk policy ruleset
+    ["policyRuleset",
+        {
+            "applicationName": "boardwalk",
+            "maxConnections": 10,
+            "maxConnPerUser": 2,
+            "maxConnPerHost": 5,
+            "userGroups": {
+                "anonymous": "anonymous",
+                "users": "u1 u2 u3",
+                "superuser": "ellen"
+            },
+            "ingressHostGroups": {
+                "Ten18": "10.18.0.0-10.18.255.255",
+                "EllensWS": "72.135.2.9",
+                "TheLabs": "10.48.0.0-10.48.255.255, 192.168.0.0-192.168.255.255",
+                "Localhost": "127.0.0.1, ::1"
+            },
+            "ingressPolicies": {
+                "anonymous": "Ten18, TheLabs",
+                "superuser": "Localhost, EllensWS"
+            },
+            "connectionAllowDefault": true,
+            "settings": {
+                "anonymous": {
+                    "maxFrameSize": 111111,
+                    "maxMessageSize": 111111,
+                    "maxSessionWindow": 111111,
+                    "maxSessions": 1,
+                    "maxSenders": 11,
+                    "maxReceivers": 11,
+                    "allowDynamicSrc": false,
+                    "allowAnonymousSender": false,
+                    "sources": "public",
+                    "targets": ""
+                },
+                "users": {
+                    "maxFrameSize": 222222,
+                    "maxMessageSize": 222222,
+                    "maxSessionWindow": 222222,
+                    "maxSessions": 2,
+                    "maxSenders": 22,
+                    "maxReceivers": 22,
+                    "allowDynamicSrc": false,
+                    "allowAnonymousSender": false,
+                    "sources": "public, private",
+                    "targets": "public"
+                },
+                "superuser": {
+                    "maxFrameSize": 666666,
+                    "maxMessageSize": 666666,
+                    "maxSessionWindow": 666666,
+                    "maxSessions": 6,
+                    "maxSenders": 66,
+                    "maxReceivers": 66,
+                    "allowDynamicSrc": false,
+                    "allowAnonymousSender": false,
+                    "sources": "public, private, management, root",
+                    "targets": "public, private, management, root"
+                },
+                "default": {
+                    "maxFrameSize": 222222,
+                    "maxMessageSize": 222222,
+                    "maxSessionWindow": 222222,
+                    "maxSessions": 2,
+                    "maxSenders": 22,
+                    "maxReceivers": 22,
+                    "allowDynamicSrc": false,
+                    "allowAnonymousSender": false,
+                    "sources": "public, private",
+                    "targets": "public"
+                }
+            }
+        }
+    ]
+]

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy-1/policy-safari.json
----------------------------------------------------------------------
diff --git a/tests/policy-1/policy-safari.json b/tests/policy-1/policy-safari.json
new file mode 100644
index 0000000..a61bbd9
--- /dev/null
+++ b/tests/policy-1/policy-safari.json
@@ -0,0 +1,96 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+[
+    # The safari policy ruleset
+    ["policyRuleset",
+        {
+            "applicationName": "safari",
+            "maxConnections": 10,
+            "maxConnPerUser": 2,
+            "maxConnPerHost": 5,
+            "userGroups": {
+                "anonymous": "anonymous",
+                "clients": "moja mbili",
+                "guides": "kata hapa"
+            },
+            "ingressHostGroups": {
+                "basecamp": "72.135.2.9",
+                "mobile": "10.48.0.0-10.48.255.255, 192.168.0.0-192.168.255.255",
+                "Localhost": "127.0.0.1, ::1",
+		"TheWorld": "*"
+            },
+            "ingressPolicies": {
+                "anonymous": "TheWorld",
+                "clients": "basecamp",
+		"guides": "basecamp, mobile, Localhost"
+            },
+            "connectionAllowDefault": true,
+            "settings": {
+                "anonymous": {
+                    "maxFrameSize": 111111,
+                    "maxMessageSize": 111111,
+                    "maxSessionWindow": 111111,
+                    "maxSessions": 1,
+                    "maxSenders": 11,
+                    "maxReceivers": 11,
+                    "allowDynamicSrc": false,
+                    "allowAnonymousSender": false,
+                    "sources": "public",
+                    "targets": ""
+                },
+                "clients": {
+                    "maxFrameSize": 222222,
+                    "maxMessageSize": 222222,
+                    "maxSessionWindow": 222222,
+                    "maxSessions": 2,
+                    "maxSenders": 22,
+                    "maxReceivers": 22,
+                    "allowDynamicSrc": false,
+                    "allowAnonymousSender": false,
+                    "sources": "public, private",
+                    "targets": "public"
+                },
+                "guides": {
+                    "maxFrameSize": 666666,
+                    "maxMessageSize": 666666,
+                    "maxSessionWindow": 666666,
+                    "maxSessions": 6,
+                    "maxSenders": 66,
+                    "maxReceivers": 66,
+                    "allowDynamicSrc": false,
+                    "allowAnonymousSender": false,
+                    "sources": "public, private, management, root",
+                    "targets": "public, private, management, root"
+                },
+                "default": {
+                    "maxFrameSize": 222222,
+                    "maxMessageSize": 222222,
+                    "maxSessionWindow": 222222,
+                    "maxSessions": 2,
+                    "maxSenders": 22,
+                    "maxReceivers": 22,
+                    "allowDynamicSrc": false,
+                    "allowAnonymousSender": false,
+                    "sources": "public, private",
+                    "targets": "public"
+                }
+            }
+        }
+    ]
+]

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy-1/test-policy-conf-includes-folder.conf.in
----------------------------------------------------------------------
diff --git a/tests/policy-1/test-policy-conf-includes-folder.conf.in b/tests/policy-1/test-policy-conf-includes-folder.conf.in
new file mode 100644
index 0000000..b6f0413
--- /dev/null
+++ b/tests/policy-1/test-policy-conf-includes-folder.conf.in
@@ -0,0 +1,64 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+
+##
+## Container section - Configures the general operation of the AMQP container.
+##
+container {
+    ##
+    ## workerThreads - The number of threads that will be created to
+    ## process message traffic and other application work (timers, nonAmqp
+    ## file descriptors, etc.)
+    ##
+    ## The number of threads should be related to the number of available
+    ## processor cores.  To fully utilize a quad-core system, set the
+    ## number of threads to 4.
+    ##
+    workerThreads: 4
+
+    ##
+    ## containerName - The name of the AMQP container.  If not specified,
+    ## the container name will be set to a value of the container's
+    ## choosing.  The automatically assigned container name is not
+    ## guaranteed to be persistent across restarts of the container.
+    ##
+    containerName: Qpid.Dispatch.Router.A
+}
+
+router {
+    mode: standalone
+    routerId: QDR
+}
+
+
+##
+## Listeners and Connectors
+##
+listener {
+    addr: 0.0.0.0
+    port: 22000
+    saslMechanisms: ANONYMOUS
+}
+
+policy {
+    maximumConnections: 10
+    enableAccessRules: true
+    policyFolder: ${CMAKE_CURRENT_BINARY_DIR}/policy-1
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy-2/make-sasl.sh
----------------------------------------------------------------------
diff --git a/tests/policy-2/make-sasl.sh b/tests/policy-2/make-sasl.sh
new file mode 100755
index 0000000..d56d529
--- /dev/null
+++ b/tests/policy-2/make-sasl.sh
@@ -0,0 +1,63 @@
+#!/bin/bash -ex
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Generate sasl files for policy tests using test setup for 'photoserver'
+# This file is used to generate the files which are then committed to git
+# and distributed in the make install.
+#
+# sasldb file is generated and copied to tests/policy-2/ by cmake
+#
+export sasl_file=./policy-photoserver-sasl.sasldb
+
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd anonymous
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd u1
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd u2
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd p1
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd p2
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd zeke
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd ynot
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd alice
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd bob
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd ellen
+echo password | saslpasswd2 -c -p -f ${sasl_file} -u qdrouterd charlie
+
+echo password | saslpasswd2 -c -p -f ${sasl_file} anonymous
+echo password | saslpasswd2 -c -p -f ${sasl_file} u1
+echo password | saslpasswd2 -c -p -f ${sasl_file} u2
+echo password | saslpasswd2 -c -p -f ${sasl_file} p1
+echo password | saslpasswd2 -c -p -f ${sasl_file} p2
+echo password | saslpasswd2 -c -p -f ${sasl_file} zeke
+echo password | saslpasswd2 -c -p -f ${sasl_file} ynot
+echo password | saslpasswd2 -c -p -f ${sasl_file} alice
+echo password | saslpasswd2 -c -p -f ${sasl_file} bob
+echo password | saslpasswd2 -c -p -f ${sasl_file} ellen
+echo password | saslpasswd2 -c -p -f ${sasl_file} charlie
+
+sasldblistusers2                  -f ${sasl_file}
+
+# Make sasl conf file
+# sasl.conf is generated and 'config'd by cmake
+cat > ./policy-photoserver-sasl.conf.in << "EOF"
+pwcheck_method: auxprop
+auxprop_plugin: sasldb
+sasldb_path: ${CMAKE_CURRENT_BINARY_DIR}/policy-2/policy-photoserver-sasl.sasldb
+mech_list: PLAIN ANONYMOUS
+EOF

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy-2/policy-photoserver-sasl.conf.in
----------------------------------------------------------------------
diff --git a/tests/policy-2/policy-photoserver-sasl.conf.in b/tests/policy-2/policy-photoserver-sasl.conf.in
new file mode 100644
index 0000000..e9650b6
--- /dev/null
+++ b/tests/policy-2/policy-photoserver-sasl.conf.in
@@ -0,0 +1,4 @@
+pwcheck_method: auxprop
+auxprop_plugin: sasldb
+sasldb_path: ${CMAKE_CURRENT_BINARY_DIR}/policy-2/policy-photoserver-sasl.sasldb
+mech_list: PLAIN ANONYMOUS

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy-2/policy-photoserver-sasl.sasldb
----------------------------------------------------------------------
diff --git a/tests/policy-2/policy-photoserver-sasl.sasldb b/tests/policy-2/policy-photoserver-sasl.sasldb
new file mode 100644
index 0000000..f145b16
Binary files /dev/null and b/tests/policy-2/policy-photoserver-sasl.sasldb differ

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/48e81620/tests/policy-2/ssl_certs/gencerts.sh
----------------------------------------------------------------------
diff --git a/tests/policy-2/ssl_certs/gencerts.sh b/tests/policy-2/ssl_certs/gencerts.sh
new file mode 100755
index 0000000..faca0a2
--- /dev/null
+++ b/tests/policy-2/ssl_certs/gencerts.sh
@@ -0,0 +1,39 @@
+#!/bin/bash -ex
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+export SERVER=A1.Good.Server.domain.com
+export CLIENT=127.0.0.1
+
+keytool -storetype pkcs12 -keystore ca.pkcs12 -storepass ca-password -alias ca -keypass ca-password -genkey -dname "O=Trust Me Inc.,CN=Trusted.CA.com" -validity 99999
+openssl pkcs12 -nokeys -passin pass:ca-password -in ca.pkcs12 -passout pass:ca-password -out ca-certificate.pem
+
+keytool -storetype pkcs12 -keystore bad-ca.pkcs12 -storepass bad-ca-password -alias bad-ca -keypass bad-ca-password -genkey -dname "O=Trust Me Inc.,CN=Trusted.CA.com" -validity 99999
+openssl pkcs12 -nokeys -passin pass:bad-ca-password -in bad-ca.pkcs12 -passout pass:bad-ca-password -out bad-ca-certificate.pem
+
+keytool -storetype pkcs12 -keystore server.pkcs12 -storepass server-password -alias server-certificate -keypass server-password -genkey  -dname "O=Server,CN=$SERVER" -validity 99999
+keytool -storetype pkcs12 -keystore server.pkcs12 -storepass server-password -alias server-certificate -keypass server-password -certreq -file server-request.pem
+keytool -storetype pkcs12 -keystore ca.pkcs12 -storepass ca-password -alias ca -keypass ca-password -gencert -rfc -validity 99999 -infile server-request.pem -outfile server-certificate.pem
+openssl pkcs12 -nocerts -passin pass:server-password -in server.pkcs12 -passout pass:server-password -out server-private-key.pem
+
+keytool -storetype pkcs12 -keystore client.pkcs12 -storepass client-password -alias client-certificate -keypass client-password -genkey  -dname "O=Client,CN=$CLIENT" -validity 99999
+keytool -storetype pkcs12 -keystore client.pkcs12 -storepass client-password -alias client-certificate -keypass client-password -certreq -file client-request.pem
+keytool -storetype pkcs12 -keystore ca.pkcs12 -storepass ca-password -alias ca -keypass ca-password -gencert -rfc -validity 99999 -infile client-request.pem -outfile client-certificate.pem
+openssl pkcs12 -nocerts -passin pass:client-password -in client.pkcs12 -passout pass:client-password -out client-private-key.pem


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org