You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2019/03/22 15:55:19 UTC

[qpid-dispatch] branch master updated: DISPATCH-1288: Add policy controls for outbound connector connections

This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 60d2e75  DISPATCH-1288: Add policy controls for outbound connector connections
60d2e75 is described below

commit 60d2e75f1ace295a68a5d3878997e543b2b79cc5
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Fri Mar 22 11:42:34 2019 -0400

    DISPATCH-1288: Add policy controls for outbound connector connections
    
    This closes #472
---
 python/qpid_dispatch/management/qdrouter.json |   8 +-
 src/connection_manager.c                      |   6 +
 src/container.c                               |   4 +-
 src/policy.c                                  | 228 ++++++++----
 src/policy.h                                  |  11 +
 src/server.c                                  |   7 +
 src/server_private.h                          |   5 +
 tests/system_tests_policy.py                  | 486 +++++++++++++++++++++++++-
 8 files changed, 677 insertions(+), 78 deletions(-)

diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 45958b1..1bcf483 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1008,7 +1008,13 @@
                     "deprecationName": "failoverList",
                     "create": false
                     
-                }            
+                },
+                "policyVhost": {
+                    "type": "string",
+                    "required": false,
+                    "description": "A connector may optionally define a policy to restrict the remote container to access only specific resources. This attribute defines the name of the policy vhost for this connector. Within the vhost the connector will use the vhost policy settings from user group '$connector'. If the vhost policy is absent or if the user group '$connector' within that policy is absent then the connector will fail to start.  In policy specified via connector attribute  [...]
+                    "create": true
+                }
             }
         },
 
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 6549e56..fef6bdb 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -782,6 +782,10 @@ qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *
 {
     qd_connection_manager_t *cm = qd->connection_manager;
     qd_connector_t *ct = qd_server_connector(qd->server);
+
+    qd_error_clear();
+    ct->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); CHECK();
+
     if (ct && load_server_config(qd, &ct->config, entity, false) == QD_ERROR_NONE) {
         DEQ_ITEM_INIT(ct);
         DEQ_INSERT_TAIL(cm->connectors, ct);
@@ -809,6 +813,8 @@ qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *
 
         return ct;
     }
+
+  error:
     qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message());
     qd_connector_decref(ct);
     return 0;
diff --git a/src/container.c b/src/container.c
index 0ecd242..e046bfc 100644
--- a/src/container.c
+++ b/src/container.c
@@ -431,6 +431,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
 
     case PN_CONNECTION_REMOTE_OPEN :
         qd_connection_set_user(qd_conn);
+        qd_conn->open_container = (void *)container;
         if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
             // This Open is an externally initiated connection
             // Let policy engine decide
@@ -444,11 +445,10 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
              * connection since by stalling the current connection it will never be
              * run, so we need some other thread context to run it in.
              */
-            qd_conn->open_container = (void *)container;
             qd_policy_amqp_open(qd_conn);
         } else {
             // This Open is in response to an internally initiated connection
-            notify_opened(container, qd_conn, qd_connection_get_context(qd_conn));
+            qd_policy_amqp_open_connector(qd_conn);
         }
         break;
 
diff --git a/src/policy.c b/src/policy.c
index aa01ea8..a6e729e 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -66,6 +66,12 @@ static const char * const user_subst_i_embed    = "e";
 static const char * const user_subst_i_suffix   = "s";
 static const char * const user_subst_i_wildcard = "*";
 
+//
+// Fixed vhost policy usergroup used when storing connector policy.
+// The connector attribute 'policyVhost' defines a vhost and within
+// that vhost the connector policy values are in '$connector'.
+static const char * const POLICY_VHOST_GROUP = "$connector";
+
 static void hostname_tree_free(qd_parse_tree_t *hostname_tree);
 
 //
@@ -346,22 +352,23 @@ qd_parse_tree_t * qd_policy_parse_tree(const char *config_spec)
 //
 // Functions related to authenticated connection denial.
 // An AMQP Open has been received over some connection.
-// Evaluate the connection auth and the Open fields to
-// allow or deny the Open. Denied Open attempts are
-// effected by returning Open and then Close_with_condition.
+// * Evaluate the connection auth and the Open fields to allow or deny the Open. 
+// * If allowed then return the settings from the python vhost database.
 //
-/** Look up user/host/vhost in python vhost and give the AMQP Open
- *  a go-no_go decision. Return false if the mechanics of calling python
- *  fails. A policy lookup will deny the connection by returning a blank
- *  usergroup name in the name buffer.
+
+/** Look up user/host/vhost in python vhost database and give the AMQP Open
+ *  a go-no_go decision. 
+ *  * Return false if the mechanics of calling python fails or if name buf is blank. 
+ *  * Deny the connection by returning a blank usergroup name in the name buffer.
  *  Connection and connection denial counting is done in the python code.
- * @param[in] policy pointer to policy
- * @param[in] username authenticated user name
- * @param[in] hostip numeric host ip address
- * @param[in] vhost application name received in remote AMQP Open.hostname
- * @param[in] conn_name connection name for tracking
+ * @param[in]  policy pointer to policy
+ * @param[in]  username authenticated user name
+ * @param[in]  hostip numeric host ip address
+ * @param[in]  vhost application name received in remote AMQP Open.hostname
+ * @param[in]  conn_name connection name for tracking
  * @param[out] name_buf pointer to settings name buffer
- * @param[in] name_buf_size size of settings_buf
+ * @param[in]  name_buf_size size of settings_buf
+ * @param[in]  conn_id connection id for log tracking
  **/
 bool qd_policy_open_lookup_user(
     qd_policy_t *policy,
@@ -371,10 +378,8 @@ bool qd_policy_open_lookup_user(
     const char *conn_name,
     char       *name_buf,
     int         name_buf_size,
-    uint64_t    conn_id,
-    qd_policy_settings_t *settings)
+    uint64_t    conn_id)
 {
-    // Lookup the user/host/vhost for allow/deny and to get settings name
     bool res = false;
     name_buf[0] = 0;
     qd_python_lock_state_t lock_state = qd_python_lock();
@@ -404,18 +409,39 @@ bool qd_policy_open_lookup_user(
         } else {
             qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: lookup_user: lookup_user");
         }
+        Py_XDECREF(module);
     }
-    if (!res) {
-        if (module) {
-            Py_XDECREF(module);
-        }
-        qd_python_unlock(lock_state);
-        return false;
-    }
+    qd_python_unlock(lock_state);
 
-    // 
     if (name_buf[0]) {
-        // Go get the named settings
+        qd_log(policy->log_source,
+           QD_LOG_TRACE,
+           "[%"PRIu64"]: ALLOW AMQP Open lookup_user: %s, rhost: %s, vhost: %s, connection: %s. Usergroup: '%s'%s",
+           conn_id, username, hostip, vhost, conn_name, name_buf, (res ? "" : " Internal error."));
+    }
+    return res;
+}
+
+
+/** Fetch policy settings for a vhost/group
+ * A vhost database user group name has been returned by qd_policy_open_lookup_user
+ * or by some configuration value. Access the vhost database for that group and
+ * extract the run-time settings.
+ * @param[in] policy pointer to policy
+ * @param[in] vhost vhost name
+ * @param[in] group_name usergroup that holds the settings
+ * @param[out] settings pointer to settings object to be filled with policy values
+ **/
+bool qd_policy_open_fetch_settings(
+    qd_policy_t *policy,
+    const char *vhost,
+    const char *group_name,
+    qd_policy_settings_t *settings)
+{
+    bool res = false;
+    qd_python_lock_state_t lock_state = qd_python_lock();
+    PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.policy.policy_manager");
+    if (module) {
         res = false;
         PyObject *upolicy = PyDict_New();
         if (upolicy) {
@@ -423,41 +449,46 @@ bool qd_policy_open_lookup_user(
             if (lookup_settings) {
                 PyObject *result2 = PyObject_CallFunction(lookup_settings, "(OssO)",
                                                         (PyObject *)policy->py_policy_manager,
-                                                        vhost, name_buf, upolicy);
+                                                        vhost, group_name, upolicy);
                 if (result2) {
-                    settings->maxFrameSize         = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0);
-                    settings->maxSessionWindow     = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0);
-                    settings->maxSessions          = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0);
-                    settings->maxSenders           = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0);
-                    settings->maxReceivers         = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0);
-                    if (!settings->allowAnonymousSender) { //don't override if enabled by authz plugin
-                        settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false);
-                    }
-                    if (!settings->allowDynamicSource) { //don't override if enabled by authz plugin
-                        settings->allowDynamicSource   = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSource", false);
-                    }
-                    settings->allowUserIdProxy       = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false);
-                    settings->allowWaypointLinks     = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true);
-                    settings->allowDynamicLinkRoutes = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicLinkRoutes", true);
-
-                    //
-                    // By default, deleting connections are enabled. To disable, set the allowAdminStatusUpdate to false in a policy.
-                    //
-                    settings->allowAdminStatusUpdate = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAdminStatusUpdate", true);
-                    if (settings->sources == 0) { //don't override if configured by authz plugin
-                        settings->sources              = qd_entity_get_string((qd_entity_t*)upolicy, "sources");
+                    int truthy = PyObject_IsTrue(result2);
+                    if (truthy) {
+                        settings->maxFrameSize         = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0);
+                        settings->maxSessionWindow     = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0);
+                        settings->maxSessions          = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0);
+                        settings->maxSenders           = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0);
+                        settings->maxReceivers         = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0);
+                        if (!settings->allowAnonymousSender) { //don't override if enabled by authz plugin
+                            settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false);
+                        }
+                        if (!settings->allowDynamicSource) { //don't override if enabled by authz plugin
+                            settings->allowDynamicSource   = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSource", false);
+                        }
+                        settings->allowUserIdProxy       = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false);
+                        settings->allowWaypointLinks     = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true);
+                        settings->allowDynamicLinkRoutes = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicLinkRoutes", true);
+
+                        //
+                        // By default, deleting connections are enabled. To disable, set the allowAdminStatusUpdate to false in a policy.
+                        //
+                        settings->allowAdminStatusUpdate = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAdminStatusUpdate", true);
+                        if (settings->sources == 0) { //don't override if configured by authz plugin
+                            settings->sources              = qd_entity_get_string((qd_entity_t*)upolicy, "sources");
+                        }
+                        if (settings->targets == 0) { //don't override if configured by authz plugin
+                            settings->targets              = qd_entity_get_string((qd_entity_t*)upolicy, "targets");
+                        }
+                        settings->sourcePattern        = qd_entity_get_string((qd_entity_t*)upolicy, "sourcePattern");
+                        settings->targetPattern        = qd_entity_get_string((qd_entity_t*)upolicy, "targetPattern");
+                        settings->sourceParseTree      = qd_policy_parse_tree(settings->sourcePattern);
+                        settings->targetParseTree      = qd_policy_parse_tree(settings->targetPattern);
+                        settings->denialCounts         = (qd_policy_denial_counts_t*)
+                                                        qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts");
+                        res = true; // named settings content returned
+                    } else {
+                        // lookup failed: object did not exist in python database
                     }
-                    if (settings->targets == 0) { //don't override if configured by authz plugin
-                        settings->targets              = qd_entity_get_string((qd_entity_t*)upolicy, "targets");
-                    }
-                    settings->sourcePattern        = qd_entity_get_string((qd_entity_t*)upolicy, "sourcePattern");
-                    settings->targetPattern        = qd_entity_get_string((qd_entity_t*)upolicy, "targetPattern");
-                    settings->sourceParseTree      = qd_policy_parse_tree(settings->sourcePattern);
-                    settings->targetParseTree      = qd_policy_parse_tree(settings->targetPattern);
-                    settings->denialCounts         = (qd_policy_denial_counts_t*)
-                                                    qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts");
                     Py_XDECREF(result2);
-                    res = true; // named settings content returned
                 } else {
                     qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: lookup_user: result2");
                 }
@@ -469,19 +500,10 @@ bool qd_policy_open_lookup_user(
         } else {
             qd_log(policy->log_source, QD_LOG_DEBUG, "Internal: lookup_user: upolicy");
         }
+        Py_XDECREF(module);
     }
-    Py_XDECREF(module);
     qd_python_unlock(lock_state);
 
-    if (name_buf[0]) {
-        qd_log(policy->log_source,
-           QD_LOG_TRACE,
-           "[%"PRIu64"]: ALLOW AMQP Open lookup_user: %s, rhost: %s, vhost: %s, connection: %s. Usergroup: '%s'%s",
-           conn_id, username, hostip, vhost, conn_name, name_buf, (res ? "" : " Internal error."));
-    } else {
-        // Denials are logged in python code
-    }
-
     return res;
 }
 
@@ -550,7 +572,8 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
 void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn)
 {
     size_t capacity;
-    if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) {
+    if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow
+        && !qd_conn->policy_settings->outgoingConnection) {
         capacity = qd_conn->policy_settings->maxSessionWindow;
     } else {
         const qd_server_config_t * cf = qd_connection_config(qd_conn);
@@ -641,6 +664,9 @@ bool _qd_policy_approve_link_name(const char *username, const char *allowed, con
         return false;
     }
 
+    if (!username)
+        username = "";
+
     size_t username_len = strlen(username);
 
     // make a writable, disposable copy of the csv string
@@ -759,6 +785,9 @@ bool _qd_policy_approve_link_name_tree(const char *username, const char *allowed
         return false;
     }
 
+    if (!username)
+        username = "";
+
     size_t username_len = strlen(username);
     size_t usersubst_len = strlen(user_subst_key);
 
@@ -1090,15 +1119,19 @@ void qd_policy_amqp_open(qd_connection_t *qd_conn) {
         }
 
         if (qd_policy_open_lookup_user(policy, qd_conn->user_id, hostip, vhost, conn_name,
-                                       settings_name, SETTINGS_NAME_SIZE, conn_id,
-                                       qd_conn->policy_settings) &&
+                                       settings_name, SETTINGS_NAME_SIZE, conn_id) &&
             settings_name[0]) {
             // This connection is allowed by policy.
             // Apply transport policy settings
-            if (qd_conn->policy_settings->maxFrameSize > 0)
-                pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize);
-            if (qd_conn->policy_settings->maxSessions > 0)
-                pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions - 1);
+            if (qd_policy_open_fetch_settings(policy, vhost, settings_name, qd_conn->policy_settings)) {
+                if (qd_conn->policy_settings->maxFrameSize > 0)
+                    pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize);
+                if (qd_conn->policy_settings->maxSessions > 0)
+                    pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions - 1);
+            } else {
+                // failed to fetch settings
+                connection_allowed = false;
+            }
         } else {
             // This connection is denied by policy.
             connection_allowed = false;
@@ -1117,6 +1150,53 @@ void qd_policy_amqp_open(qd_connection_t *qd_conn) {
 }
 
 
+void qd_policy_amqp_open_connector(qd_connection_t *qd_conn) {
+    pn_connection_t *conn = qd_connection_pn(qd_conn);
+    qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server);
+    qd_policy_t *policy = qd->policy;
+    bool connection_allowed = true;
+
+    if (policy->enableVhostPolicy &&
+        (!qd_conn->role || !strcmp(qd_conn->role, "normal") || !strcmp(qd_conn->role, "route-container"))) {
+        // Open connection or not based on policy.
+        uint32_t conn_id = qd_conn->connection_id;
+
+        qd_connector_t *connector = qd_connection_connector(qd_conn);
+        const char *policy_vhost = qd_connector_policy_vhost(connector);
+
+        if (policy_vhost && strlen(policy_vhost) > 0) {
+            qd_conn->policy_settings = NEW(qd_policy_settings_t);
+            if (qd_conn->policy_settings) {
+                ZERO(qd_conn->policy_settings);
+
+                if (qd_policy_open_fetch_settings(policy, policy_vhost, POLICY_VHOST_GROUP, qd_conn->policy_settings)) {
+                    qd_conn->policy_settings->outgoingConnection = true;
+                    qd_conn->policy_counted = true; // Count senders and receivers for this connection
+                } else {
+                    qd_log(policy->log_source,
+                        QD_LOG_ERROR,
+                        "Failed to find policyVhost settings for connection '%d', policyVhost: '%s'",
+                        conn_id, policy_vhost);
+                    connection_allowed = false;
+                }
+            } else {
+                connection_allowed = false; // failed to allocate settings
+            }
+        } else {
+            // This connection is allowed since no policy is specified for the connector
+        }
+    } else {
+        // No policy implies automatic policy allow
+        // Note that connections not governed by policy have no policy_settings.
+    }
+    if (connection_allowed) {
+        policy_notify_opened(qd_conn->open_container, qd_conn, qd_conn->context);
+    } else {
+        qd_policy_private_deny_amqp_connection(conn, QD_AMQP_COND_RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED);
+    }
+}
+
+
 void qd_policy_settings_free(qd_policy_settings_t *settings)
 {
     if (!settings) return;
diff --git a/src/policy.h b/src/policy.h
index dfaed44..26803d3 100644
--- a/src/policy.h
+++ b/src/policy.h
@@ -54,6 +54,7 @@ struct qd_policy__settings_s {
     bool allowWaypointLinks;
     bool allowDynamicLinkRoutes;
     bool allowAdminStatusUpdate;
+    bool outgoingConnection;
     char *sources;
     char *targets;
     char *sourcePattern;
@@ -163,6 +164,16 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q
  **/
 void qd_policy_amqp_open(qd_connection_t *conn);
 
+
+/** Allow or deny an outgoing connector connection.
+ * An Open performative was received over a new connection.
+ * Consult local policy to determine if this host/user is
+ *  allowed to make this connection.
+ * Denied pn_connections are closed with a condition.
+ **/
+void qd_policy_amqp_open_connector(qd_connection_t *conn);
+
+
 /** Dispose of policy settings
  * 
  * @param settings the settings to be destroyed
diff --git a/src/server.c b/src/server.c
index 4b28ed6..8dc2fc1 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1467,6 +1467,12 @@ qd_connector_t *qd_server_connector(qd_server_t *server)
 }
 
 
+const char *qd_connector_policy_vhost(qd_connector_t* ct)
+{
+    return ct->policy_vhost;
+}
+
+
 bool qd_connector_connect(qd_connector_t *ct)
 {
     sys_mutex_lock(ct->lock);
@@ -1503,6 +1509,7 @@ bool qd_connector_decref(qd_connector_t* ct)
             item = DEQ_HEAD(ct->conn_info_list);
         }
         sys_mutex_free(ct->lock);
+        if (ct->policy_vhost) free(ct->policy_vhost);
         free_qd_connector_t(ct);
         return true;
     }
diff --git a/src/server_private.h b/src/server_private.h
index 9fcec6c..0a0a32f 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -124,11 +124,16 @@ struct qd_connector_t {
     /* This conn_list contains all the connection information needed to make a connection. It also includes failover connection information */
     qd_failover_item_list_t   conn_info_list;
     int                       conn_index; // Which connection in the connection list to connect to next.
+    
+    /* Optional policy vhost name */
+    char                     *policy_vhost;
+
     DEQ_LINKS(qd_connector_t);
 };
 
 DEQ_DECLARE(qd_connector_t, qd_connector_list_t);
 
+const char *qd_connector_policy_vhost(qd_connector_t* ct);
 
 /**
  * Connection objects wrap Proton connection objects.
diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py
index 1951adb..5997e69 100644
--- a/tests/system_tests_policy.py
+++ b/tests/system_tests_policy.py
@@ -24,6 +24,7 @@ from __future__ import print_function
 
 import unittest as unittest
 import os, json, re, signal
+import time
 
 from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, DIR
 from subprocess import PIPE, STDOUT
@@ -33,6 +34,7 @@ from proton.reactor import Container
 from proton.utils import BlockingConnection, LinkDetached, SyncRequestResponse
 from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled
 from qpid_dispatch_internal.compat import dict_iteritems
+from test_broker import FakeBroker
 
 class AbsoluteConnectionCountLimit(TestCase):
     """
@@ -355,7 +357,7 @@ class InterrouterLinksAllowed(TestCase):
         cls.routers[1].teardown()
 
     def test_01_router_links_allowed(self):
-        with  open('../setUpClass/A-2.out', 'r') as router_log:
+        with  open(self.routers[0].outfile + '.out', 'r') as router_log:
             log_lines = router_log.read().split("\n")
             disallow_lines = [s for s in log_lines if "link disallowed" in s]
             self.assertTrue(len(disallow_lines) == 0, msg='Inter-router links should be allowed but some were blocked by policy.')
@@ -1099,5 +1101,487 @@ class SenderAddressValidator(ClientAddressValidator):
         event.container.create_sender(self.url)
 
 
+#
+# Connector policy tests
+#
+
+class ConnectorPolicyMisconfiguredClient(FakeBroker):
+    '''
+    This client is targeted by a misconfigured connector whose policy
+    causes an immediate connection close.
+    '''
+    def __init__(self, url, container_id=None):
+        super(ConnectorPolicyMisconfiguredClient, self).__init__(url, container_id)
+        self.connection_opening = 0
+        self.connection_opened = 0
+        self.connection_error = 0
+        self.main_exited = False
+
+    def _main(self):
+        self._container.timeout = 1.0
+        self._container.start()
+
+        keep_running = True
+        while keep_running:
+            try:
+                self._container.process()
+            except:
+                self._stop_thread = True
+                keep_running = False
+            if self._stop_thread:
+                keep_running = False
+        self.main_exited = True
+
+    def join(self):
+        if not self._stop_thread:
+            self._stop_thread = True
+            self._container.wakeup()
+        if not self.main_exited:
+            self._thread.join(timeout=5)
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(10.0, Timeout(self))        
+        self.acceptor = event.container.listen(self.url)
+
+    def timeout(self):
+        self._error = "Timeout Expired"
+
+    def on_connection_opening(self, event):
+        self.connection_opening += 1
+        super(ConnectorPolicyMisconfiguredClient, self).on_connection_opening(event)
+        
+    def on_connection_opened(self, event):
+        self.connection_opened += 1
+        super(ConnectorPolicyMisconfiguredClient, self).on_connection_opened(event)
+
+    def on_connection_error(self, event):
+        self.connection_error += 1
+
+
+class ConnectorPolicyMisconfigured(TestCase):
+    """
+    Verify that a connector that has a vhostPolicy is not allowed
+    to open the connection if the policy is not defined
+    """
+    remoteListenerPort = None
+    
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(ConnectorPolicyMisconfigured, cls).setUpClass()
+        cls.remoteListenerPort = cls.tester.get_port();
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
+            ('listener', {'port': cls.tester.get_port()}),
+            ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
+            ('connector', {'verifyHostname': 'false', 'name': 'novhost',
+                           'idleTimeoutSeconds': 120, 'saslMechanisms': 'ANONYMOUS',
+                           'host': '127.0.0.1', 'role': 'normal',
+                           'port': cls.remoteListenerPort, 'policyVhost': 'nosuch'
+                            }),
+
+            ('vhost', {
+                'hostname': '0.0.0.0', 'maxConnections': 2,
+                'allowUnknownUser': 'true',
+                'groups': [(
+                    '$default', {
+                        'users': '*', 'remoteHosts': '*',
+                        'sources': '*', 'targets': '*',
+                        'allowDynamicSource': 'true'
+                    }
+                ), (
+                    'anonymous', {
+                        'users': 'anonymous', 'remoteHosts': '*',
+                        'sourcePattern': 'addr/*/queue/*, simpleaddress, queue.${user}',
+                        'targets': 'addr/*, simpleaddress, queue.${user}',
+                        'allowDynamicSource': 'true',
+                        'allowAnonymousSender': 'true'
+                    }
+                )]
+            })
+        ])
+
+        cls.router = cls.tester.qdrouterd('connectorPolicyMisconfigured', config, wait=False)
+
+    def address(self):
+        return self.router.addresses[0]
+
+    def test_30_connector_policy_misconfigured(self):
+        url = "127.0.0.1:%d" % self.remoteListenerPort
+        tc = ConnectorPolicyMisconfiguredClient(url, "tc")
+        while tc.connection_error == 0 and tc._error == None:
+            time.sleep(0.1)
+        tc.join()
+        self.assertTrue(tc.connection_error == 1)
+        
+#
+
+class ConnectorPolicyClient(FakeBroker):
+    '''
+    This client is targeted by a configured connector whose policy
+    allows certain sources and targets.
+    '''
+    def __init__(self, url, container_id=None):
+        super(ConnectorPolicyClient, self).__init__(url, container_id)
+        self.connection_opening = 0
+        self.connection_opened = 0
+        self.connection_error = 0
+        self.main_exited = False
+        self.senders = []
+        self.receivers = []
+        self.link_error = False
+        self.sender_request = ""
+        self.receiver_request = ""
+        self.request_in_flight = False
+        self.req_close_sender = False
+        self.req_close_receiver = False
+        self.req_anonymous_sender = False
+
+    def _main(self):
+        self._container.timeout = 1.0
+        self._container.start()
+
+        keep_running = True
+        while keep_running:
+            try:
+                self._container.process()
+                if not self.request_in_flight:
+                    if self.sender_request != "":
+                        sndr = self._container.create_sender(
+                                self._connections[0], self.sender_request)
+                        self.senders.append(sndr)
+                        self.request_in_flight = True
+                        self.sender_request = ""
+                    elif self.receiver_request != "":
+                        rcvr = self._container.create_receiver(
+                                self._connections[0], self.receiver_request)
+                        self.receivers.append(rcvr)
+                        self.request_in_flight = True
+                        self.receiver_request = ""
+                    elif self.req_close_sender:
+                        self.senders[0].close()
+                        self.req_close_sender = False
+                    elif self.req_close_receiver:
+                        self.receivers[0].close()
+                        self.req_close_receiver = False
+                    elif self.req_anonymous_sender:
+                        sndr = self._container.create_sender(
+                                self._connections[0], name="anon")
+                        self.senders.append(sndr)
+                        self.request_in_flight = True
+                        self.req_anonymous_sender = False
+
+            except:
+                self._stop_thread = True
+                keep_running = False
+            if self._stop_thread:
+                keep_running = False
+        self.main_exited = True
+
+    def join(self):
+        if not self._stop_thread:
+            self._stop_thread = True
+            self._container.wakeup()
+        if not self.main_exited:
+            self._thread.join(timeout=5)
+
+    def on_start(self, event):
+        self.timer    = event.reactor.schedule(60, Timeout(self))        
+        self.acceptor = event.container.listen(self.url)
+
+    def timeout(self):
+        self._error = "Timeout Expired"
+
+    def on_connection_opening(self, event):
+        self.connection_opening += 1
+        super(ConnectorPolicyClient, self).on_connection_opening(event)
+
+    def on_connection_opened(self, event):
+        self.connection_opened += 1
+        super(ConnectorPolicyClient, self).on_connection_opened(event)
+
+    def on_connection_error(self, event):
+        self.connection_error += 1
+
+    def on_link_opened(self, event):
+        self.request_in_flight = False
+
+    def on_link_error(self, event):
+        self.link_error = True
+        self.request_in_flight = False
+
+    def try_sender(self, addr):
+        self.link_error = False
+        self.sender_request = addr
+        while (self.sender_request == addr or self.request_in_flight) \
+            and self.link_error == False and self._error is None:
+            time.sleep(0.10)
+        time.sleep(0.10)
+        return self.link_error == False
+
+    def try_receiver(self, addr):
+        self.link_error = False
+        self.receiver_request = addr
+        while (self.receiver_request == addr or self.request_in_flight) \
+            and self.link_error == False and self._error is None:
+            time.sleep(0.10)
+        time.sleep(0.10)
+        return self.link_error == False
+
+    def close_sender(self):
+        self.req_close_sender = True
+        while self.req_close_sender:
+            time.sleep(0.05)
+
+    def close_receiver(self):
+        self.req_close_receiver = True
+        while self.req_close_receiver:
+            time.sleep(0.05)
+
+    def try_anonymous_sender(self):
+        self.link_error = False
+        self.req_anonymous_sender = True
+        while (self.req_anonymous_sender or self.request_in_flight) \
+            and self.link_error == False and self._error is None:
+            time.sleep(0.10)
+        time.sleep(0.10)
+        return self.link_error == False
+
+
+class ConnectorPolicySrcTgt(TestCase):
+    """
+    Verify that a connector that has a vhostPolicy
+     * may open the connection
+     * may access allowed sources and targets
+     * may not access disallowed sources and targets
+    """
+    remoteListenerPort = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(ConnectorPolicySrcTgt, cls).setUpClass()
+        cls.remoteListenerPort = cls.tester.get_port();
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
+            ('listener', {'port': cls.tester.get_port()}),
+            ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
+            ('connector', {'verifyHostname': 'false', 'name': 'novhost',
+                           'idleTimeoutSeconds': 120, 'saslMechanisms': 'ANONYMOUS',
+                           'host': '127.0.0.1', 'role': 'normal',
+                           'port': cls.remoteListenerPort, 'policyVhost': 'test'
+                            }),
+            # Set up the prefix 'node' as a prefix for waypoint addresses
+            ('address',  {'prefix': 'node', 'waypoint': 'yes'}),
+            # Create a pair of default auto-links for 'node.1'
+            ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'direction': 'in'}),
+            ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'direction': 'out'}),
+            ('vhost', {
+                'hostname': 'test',
+                'groups': [(
+                    '$connector', {
+                        'sources': 'test,examples,work*',
+                        'targets': 'examples,$management,play*',
+                    }
+                )]
+            })
+        ])
+
+        cls.router = cls.tester.qdrouterd('ConnectorPolicySrcTgt', config, wait=False)
+
+    def address(self):
+        return self.router.addresses[0]
+
+    def test_31_connector_policy(self):
+        url = "127.0.0.1:%d" % self.remoteListenerPort
+        cpc = ConnectorPolicyClient(url, "cpc")
+        while cpc.connection_opened == 0 and cpc._error == None:
+            time.sleep(0.1)
+        time.sleep(0.05)
+        self.assertTrue(cpc.connection_error == 0) # expect connection to stay up
+        self.assertTrue(cpc._error is None)
+
+        # senders that should work
+        for addr in ["examples", "$management", "playtime"]: # allowed targets
+            try:
+                res = cpc.try_sender(addr)
+            except:
+                res = False
+            self.assertTrue(res)
+
+        # senders that should fail
+        for addr in ["test", "a/bad/addr"]: # denied targets
+            try:
+                res = cpc.try_sender(addr)
+            except:
+                res = False
+            self.assertFalse(res)
+
+        # receivers that should work
+        for addr in ["examples", "test", "workaholic"]: # allowed sources
+            try:
+                res = cpc.try_receiver(addr)
+            except:
+                res = False
+            self.assertTrue(res)
+
+        # receivers that should fail
+        for addr in ["$management", "a/bad/addr"]: # denied sources
+            try:
+                res = cpc.try_receiver(addr)
+            except:
+                res = False
+            self.assertFalse(res)
+
+        # anonomyous sender should be disallowed
+        res = cpc.try_anonymous_sender()
+        self.assertFalse(res)
+
+        # waypoint links should be disallowed
+        res = cpc.try_sender("node.1")
+        self.assertFalse(res)
+        res = cpc.try_receiver("node.1")
+        self.assertFalse(res)
+
+
+class ConnectorPolicyNSndrRcvr(TestCase):
+    """
+    Verify that a connector that has a vhostPolicy is allowed
+     * to open the connection
+     * is limited to the number of senders and receivers specified in the policy
+    """
+    remoteListenerPort = None
+    MAX_SENDERS = 4
+    MAX_RECEIVERS = 3
+
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(ConnectorPolicyNSndrRcvr, cls).setUpClass()
+        cls.remoteListenerPort = cls.tester.get_port();
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
+            ('listener', {'port': cls.tester.get_port()}),
+            ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
+            ('connector', {'verifyHostname': 'false', 'name': 'novhost',
+                           'idleTimeoutSeconds': 120, 'saslMechanisms': 'ANONYMOUS',
+                           'host': '127.0.0.1', 'role': 'normal',
+                           'port': cls.remoteListenerPort, 'policyVhost': 'test'
+                            }),
+            # Set up the prefix 'node' as a prefix for waypoint addresses
+            ('address',  {'prefix': 'node', 'waypoint': 'yes'}),
+            # Create a pair of default auto-links for 'node.1'
+            ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'direction': 'in'}),
+            ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'direction': 'out'}),
+            ('vhost', {
+                'hostname': 'test',
+                'groups': [(
+                    '$connector', {
+                        'sources': '*',
+                        'targets': '*',
+                        'maxSenders': cls.MAX_SENDERS,
+                        'maxReceivers': cls.MAX_RECEIVERS,
+                        'allowAnonymousSender': 'true',
+                        'allowWaypointLinks': 'true'
+                    }
+                )]
+            })
+        ])
+
+        cls.router = cls.tester.qdrouterd('ConnectorPolicyNSndrRcvr', config, wait=False)
+
+    def address(self):
+        return self.router.addresses[0]
+
+    def test_32_connector_policy_max_sndr_rcvr(self):
+        url = "127.0.0.1:%d" % self.remoteListenerPort
+        cpc = ConnectorPolicyClient(url, "cpc")
+        while cpc.connection_opened == 0 and cpc._error == None:
+            time.sleep(0.1)
+        time.sleep(0.05)
+        self.assertTrue(cpc.connection_error == 0) # expect connection to stay up
+        self.assertTrue(cpc._error is None)
+
+        # senders that should work
+        # anonomyous sender should be allowed
+        res = cpc.try_anonymous_sender()     # sender 1
+        self.assertTrue(res)
+
+        # waypoint links should be allowed
+        res = cpc.try_sender("node.1")       # semder 2
+        self.assertTrue(res)
+        res = cpc.try_receiver("node.1")     # receiver 1
+        self.assertTrue(res)
+
+        addr = "vermillion"
+        for i in range(self.MAX_SENDERS - 2):
+            try:
+                res = cpc.try_sender(addr)
+            except:
+                res = False
+            self.assertTrue(res)
+
+        # senders that should fail
+        for i in range(2):
+            try:
+                res = cpc.try_sender(addr)
+            except:
+                res = False
+            self.assertFalse(res)
+
+        # receivers that should work
+        for i in range(self.MAX_RECEIVERS - 1):
+            try:
+                res = cpc.try_receiver(addr)
+            except:
+                res = False
+            self.assertTrue(res)
+
+        # receivers that should fail
+        for i in range(2):
+            try:
+                res = cpc.try_receiver(addr)
+            except:
+                res = False
+            self.assertFalse(res)
+
+        # close a sender and verify that another one only may open
+        addr="skyblue"
+        cpc.close_sender()
+
+        for i in range(1):
+            try:
+                res = cpc.try_sender(addr)
+            except:
+                res = False
+            self.assertTrue(res)
+
+        # senders that should fail
+        for i in range(1):
+            try:
+                res = cpc.try_sender(addr)
+            except:
+                res = False
+            self.assertFalse(res)
+
+        # close a receiver and verify that another one only may open
+        cpc.close_receiver()
+
+        for i in range(1):
+            try:
+                res = cpc.try_receiver(addr)
+            except:
+                res = False
+            self.assertTrue(res)
+
+        # senders that should fail
+        for i in range(1):
+            try:
+                res = cpc.try_receiver(addr)
+            except:
+                res = False
+            self.assertFalse(res)
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org