You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2016/04/06 00:15:44 UTC

qpid-dispatch git commit: DISPATCH-257: Fix policy link counting * Add self test to verify sender/recevier link denial by count. * Fix code to account for sender/receiver links properly. Router 'incoming links' are 'senders' for policy. * Better log me

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master f46b3afa9 -> 15de0870d


DISPATCH-257: Fix policy link counting
* Add self test to verify sender/recevier link denial by count.
* Fix code to account for sender/receiver links properly.
  Router 'incoming links' are 'senders' for policy.
* Better log messages.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/15de0870
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/15de0870
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/15de0870

Branch: refs/heads/master
Commit: 15de0870d5430b69f94205bae33c0fc6e7d4a14f
Parents: f46b3af
Author: Chuck Rolke <cr...@redhat.com>
Authored: Tue Apr 5 18:07:56 2016 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Tue Apr 5 18:07:56 2016 -0400

----------------------------------------------------------------------
 .../policy/policy_local.py                      |  9 +-
 src/container.c                                 | 38 ++++-----
 src/policy.c                                    | 16 ++--
 tests/CMakeLists.txt                            |  1 +
 tests/policy-3/test-sender-receiver-limits.json | 26 ++++++
 tests/system_tests_policy.py                    | 89 ++++++++++++++++++++
 6 files changed, 150 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15de0870/python/qpid_dispatch_internal/policy/policy_local.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py
index 89e5016..c1bea13 100644
--- a/python/qpid_dispatch_internal/policy/policy_local.py
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -679,10 +679,11 @@ class PolicyLocal(object):
         @return:
         """
         try:
-            facts = self._connections[conn_id]
-            stats = self.statsdb[facts.app]
-            stats.disconnect(facts.conn_name, facts.user, facts.host)
-            del self._connections[conn_id]
+            if conn_id in self._connections:
+                facts = self._connections[conn_id]
+                stats = self.statsdb[facts.app]
+                stats.disconnect(facts.conn_name, facts.user, facts.host)
+                del self._connections[conn_id]
         except Exception, e:
             self._manager.log_trace(
                 "Policy internal error closing connection id %s. %s" % (conn_id, str(e)))

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15de0870/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 254e1ad..54b1138 100644
--- a/src/container.c
+++ b/src/container.c
@@ -423,11 +423,11 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
                     if (qd_link && qd_link->node) {
                         if (qd_conn->policy_settings) {
                             if (qd_link->direction == QD_OUTGOING) {
-                                qd_conn->n_senders--;
-                                assert(qd_conn->n_senders >= 0);
-                            } else {
                                 qd_conn->n_receivers--;
                                 assert(qd_conn->n_receivers >= 0);
+                            } else {
+                                qd_conn->n_senders--;
+                                assert(qd_conn->n_senders >= 0);
                             }
                         }
                         qd_log(container->log_source, QD_LOG_NOTICE,
@@ -454,7 +454,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
                     if (!qd_policy_approve_amqp_receiver_link(pn_link, qd_conn)) {
                         break;
                     }
-                    qd_conn->n_senders++;
+                    qd_conn->n_receivers++;
                 }
                 setup_outgoing_link(container, pn_link);
             } else {
@@ -462,7 +462,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
                     if (!qd_policy_approve_amqp_sender_link(pn_link, qd_conn)) {
                         break;
                     }
-                    qd_conn->n_receivers++;
+                    qd_conn->n_senders++;
                 }
                 setup_incoming_link(container, pn_link);
             }
@@ -480,22 +480,22 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
             if (node)
                 node->ntype->link_detach_handler(node->context, qd_link, dt);
             else if (qd_link->pn_link == pn_link) {
-
-                //
-                // This policy stuff is in the wrong place.  This else clause typically does not run.
-                //
-                if (qd_conn->policy_settings) {
-                    if (pn_link_is_sender(pn_link)) {
-                        qd_conn->n_senders--;
-                        assert (qd_conn->n_senders >= 0);
-                    } else {
-                        qd_conn->n_receivers--;
-                        assert (qd_conn->n_receivers >= 0);
-                    }
+                pn_link_close(pn_link);
+            }
+            if (qd_conn->policy_counted && qd_conn->policy_settings) {
+                if (pn_link_is_sender(pn_link)) {
+                    qd_conn->n_receivers--;
+                    qd_log(container->log_source, QD_LOG_TRACE,
+                           "Closed receiver link %s. n_receivers: %d",
+                           pn_link_name(pn_link), qd_conn->n_receivers);
+                    assert (qd_conn->n_receivers >= 0);
                 } else {
-                    // no policy - links not counted
+                    qd_conn->n_senders--;
+                    qd_log(container->log_source, QD_LOG_TRACE,
+                           "Closed sender link %s. n_senders: %d",
+                           pn_link_name(pn_link), qd_conn->n_senders);
+                    assert (qd_conn->n_senders >= 0);
                 }
-                pn_link_close(pn_link);
             }
             if (qd_link->close_sess_with_link && qd_link->pn_sess &&
                 pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED))

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15de0870/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
index df6ddd4..4d63753 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -254,7 +254,8 @@ void qd_policy_socket_close(void *context, const qd_connection_t *conn)
     }
     if (policy->max_connection_limit > 0) {
         const char *hostname = qdpn_connector_name(conn->pn_cxtr);
-        qd_log(policy->log_source, QD_LOG_DEBUG, "Connection '%s' closed. N connections=%d", hostname, n_connections);
+        qd_log(policy->log_source, QD_LOG_DEBUG, "Connection '%s' closed with resources n_sessions=%d, n_senders=%d, n_receivers=%d. Total connections=%d.",
+                hostname, conn->n_sessions, conn->n_senders, conn->n_receivers, n_connections);
     }
 }
 
@@ -426,6 +427,7 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
             }
         }
     }
+    // Approved
     return true;
 }
 
@@ -603,7 +605,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_
     } else {
         // max sender limit not specified
     }
-    // Deny sender link based on target
+    // Approve sender link based on target
     const char * target = pn_terminus_get_address(pn_link_remote_target(pn_link));
     bool lookup;
     if (target && *target) {
@@ -623,13 +625,14 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_
         // This happens all the time with anonymous relay
         lookup = qd_conn->policy_settings->allowAnonymousSender;
         qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_TRACE,
-            "Approve anonymous sender for user '%s': %s",
-			qd_conn->user_id, (lookup ? "ALLOW" : "DENY"));
+            "Approve anonymous relay sender link for user '%s': %s",
+            qd_conn->user_id, (lookup ? "ALLOW" : "DENY"));
         if (!lookup) {
             _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn);
             return false;
         }
     }
+    // Approved
     return true;
 }
 
@@ -647,7 +650,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q
     } else {
         // max receiver limit not specified
     }
-    // Deny receiver link based on source
+    // Approve receiver link based on source
     bool dynamic_src = pn_terminus_is_dynamic(pn_link_remote_source(pn_link));
     if (dynamic_src) {
         bool lookup = qd_conn->policy_settings->allowDynamicSrc;
@@ -677,11 +680,12 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q
         // A receiver with no remote source.
         qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_TRACE,
                "Approve receiver link '' for user '%s': DENY",
-			   qd_conn->user_id);
+               qd_conn->user_id);
 
         _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn);
         return false;
     }
+    // Approved
     return true;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15de0870/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 123884e..d4c55c3 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -106,6 +106,7 @@ file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-1/management-access.json  DESTINATI
 file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-1/policy-boardwalk.json   DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-1/)
 file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-1/policy-safari.json      DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-1/)
 file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-2/policy-photoserver-sasl.sasldb  DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-2)
+file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-3/test-sender-receiver-limits.json DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-3)
 
 # following install() functions will be called only if you do a make "install"
 install(FILES ${SYSTEM_TEST_FILES}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15de0870/tests/policy-3/test-sender-receiver-limits.json
----------------------------------------------------------------------
diff --git a/tests/policy-3/test-sender-receiver-limits.json b/tests/policy-3/test-sender-receiver-limits.json
new file mode 100644
index 0000000..2a5b367
--- /dev/null
+++ b/tests/policy-3/test-sender-receiver-limits.json
@@ -0,0 +1,26 @@
+[
+# Ruleset with differing number of senders and receivers
+# so tests can determine that correct limit is matched.
+  ["policyRuleset", {
+      "applicationName": "0.0.0.0",
+      "maxConnections": 50,
+      "maxConnPerUser": 2,
+      "maxConnPerHost": 4,
+      "connectionAllowDefault": true,
+      "settings": {
+        "default" : {
+          "maxFrameSize":     222222,
+          "maxMessageSize":   222222,
+          "maxSessionWindow": 222222,
+          "maxSessions":           2,
+          "maxSenders":            2,
+          "maxReceivers":          4,
+          "allowDynamicSrc":      true,
+          "allowAnonymousSender": true,
+          "sources": "*",
+          "targets": "*"
+        }
+      }
+    }
+  ]
+]

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15de0870/tests/system_tests_policy.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py
index e040a61..176d3c1 100644
--- a/tests/system_tests_policy.py
+++ b/tests/system_tests_policy.py
@@ -109,5 +109,94 @@ class LoadPolicyFromFolder(TestCase):
         rulesets = json.loads(self.run_qdmanage('query --type=policyRuleset'))
         self.assertEqual(len(rulesets), 3)
 
+class SenderReceiverLimits(TestCase):
+    """
+    Verify that specifying a policy folder from the router conf file
+    effects loading the policies in that folder.
+    This test relies on qdmanage utility.
+    """
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(SenderReceiverLimits, cls).setUpClass()
+        policy_config_path = os.path.join(cls.top_dir, 'policy-3')
+        config = Qdrouterd.Config([
+            ('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.Policy3'}),
+            ('router', {'mode': 'standalone', 'routerId': 'QDR.Policy'}),
+            ('listener', {'port': cls.tester.get_port()}),
+            ('policy', {'maximumConnections': 2, 'policyFolder': policy_config_path, 'enableAccessRules': 'true'})
+        ])
+
+        cls.router = cls.tester.qdrouterd('SenderReceiverLimits', config, wait=True)
+
+    def address(self):
+        return self.router.addresses[0]
+
+    def test_verify_n_receivers(self):
+        n = 4
+        addr = self.address()
+
+        # connection should be ok
+        denied = False
+        try:
+            br1 = BlockingConnection(addr)
+        except ConnectionException:
+            denied = True
+
+        self.assertFalse(denied) # assert if connections that should open did not open
+
+        # n receivers OK
+        try:
+            r1 = br1.create_receiver(address="****YES_1of4***")
+            r2 = br1.create_receiver(address="****YES_20f4****")
+            r3 = br1.create_receiver(address="****YES_3of4****")
+            r4 = br1.create_receiver(address="****YES_4of4****")
+        except Exception:
+            denied = True
+
+        self.assertFalse(denied) # n receivers should have worked
+
+        # receiver n+1 should be denied
+        try:
+            r5 = br1.create_receiver("****NO****")
+        except Exception:
+            denied = True
+
+        self.assertTrue(denied) # receiver n+1 should have failed
+
+        br1.close()
+
+    def test_verify_n_senders(self):
+        n = 2
+        addr = self.address()
+
+        # connection should be ok
+        denied = False
+        try:
+            bs1 = BlockingConnection(addr)
+        except ConnectionException:
+            denied = True
+
+        self.assertFalse(denied) # assert if connections that should open did not open
+
+        # n senders OK
+        try:
+            s1 = bs1.create_sender(address="****YES_1of2****")
+            s2 = bs1.create_sender(address="****YES_2of2****")
+        except Exception:
+            denied = True
+
+        self.assertFalse(denied) # n senders should have worked
+
+        # receiver n+1 should be denied
+        try:
+            s3 = bs1.create_sender("****NO****")
+        except Exception:
+            denied = True
+
+        self.assertTrue(denied) # sender n+1 should have failed
+
+        bs1.close()
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org