You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2016/03/12 00:26:55 UTC

qpid-dispatch git commit: Implement policy for sessions and partial policy for links. Connections now have private copy of policy settings.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/crolke-DISPATCH-188-1 626d37043 -> 9e255e8b0


Implement policy for sessions and partial policy for links.
Connections now have private copy of policy settings.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9e255e8b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9e255e8b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9e255e8b

Branch: refs/heads/crolke-DISPATCH-188-1
Commit: 9e255e8b0ef4dc297a013da9ef3f972b3bef5e81
Parents: 626d370
Author: Chuck Rolke <cr...@redhat.com>
Authored: Fri Mar 11 18:23:44 2016 -0500
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Fri Mar 11 18:23:44 2016 -0500

----------------------------------------------------------------------
 src/container.c      |  82 +++++++++++++++++++++++++--
 src/policy.c         | 141 ++++++++++++++++++++++++++++++----------------
 src/policy_private.h |  24 ++++++--
 src/server.c         |  25 +++++++-
 src/server_private.h |   3 +
 5 files changed, 217 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 5f9d161..3573370 100644
--- a/src/container.c
+++ b/src/container.c
@@ -343,7 +343,20 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
     case PN_SESSION_REMOTE_OPEN :
         ssn = pn_event_session(event);
         if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
-            pn_session_set_incoming_capacity(ssn, 1000000);
+            if (qd_conn->policy_settings) {
+                if (qd_conn->policy_settings->maxSessions) {
+                    if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) {
+                        qd_policy_deny_amqp_session(ssn, qd_conn);
+                        break;
+                    }
+                }
+                qd_conn->n_sessions++;
+            }
+            if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) {
+                pn_session_set_incoming_capacity(ssn, qd_conn->policy_settings->maxSessionWindow);
+            } else {
+                pn_session_set_incoming_capacity(ssn, 1000000);
+            }
             pn_session_open(ssn);
         }
         break;
@@ -360,6 +373,15 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
                 if (pn_link_session(pn_link) == ssn) {
                     qd_link_t *qd_link = (qd_link_t *)pn_link_get_context(pn_link);
                     if (qd_link && qd_link->node) {
+                        if (qd_conn->policy_settings) {
+                            if (qd_link->direction == QD_OUTGOING) {
+                                qd_conn->n_senders--;
+                                assert(qd_conn->n_senders >= 0);
+                            } else {
+                                qd_conn->n_receivers--;
+                                assert(qd_conn->n_receivers >= 0);
+                            }
+                        }
                         qd_log(container->log_source, QD_LOG_NOTICE,
                                "Aborting link '%s' due to parent session end",
                                pn_link_name(pn_link));
@@ -369,6 +391,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
                 }
                 pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
             }
+            qd_conn->n_sessions--;
             pn_session_close(ssn);
         }
         break;
@@ -376,10 +399,49 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
     case PN_LINK_REMOTE_OPEN :
         pn_link = pn_event_link(event);
         if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
-            if (pn_link_is_sender(pn_link))
+            if (pn_link_is_sender(pn_link)) {
+               if (qd_conn->policy_settings) {
+                    // Open link or not based on policy.
+                    if (qd_conn->policy_settings->maxSenders) {
+                        if (qd_conn->n_senders == qd_conn->policy_settings->maxSenders) {
+                            // Max sender limit specified and violated.
+                            qd_policy_deny_amqp_link(pn_link, "sender", qd_conn);
+                            break;
+                        } else {
+                            // max sender limit not violated
+                        }
+                    } else {
+                        // max sender limit not specified
+                    }
+                    // TODO: Deny sender link based on target
+                    // Count sender link
+                    qd_conn->n_senders++;
+                } else {
+                    // This connection not controlled by policy. Link implicitly allowed.
+                }
                 setup_outgoing_link(container, pn_link);
-            else
+            } else {
+                if (qd_conn->policy_settings) {
+                    // Open link or not based on policy.
+                    if (qd_conn->policy_settings->maxReceivers) {
+                        if (qd_conn->n_receivers == qd_conn->policy_settings->maxReceivers) {
+                            // Max sender limit specified and violated.
+                            qd_policy_deny_amqp_link(pn_link, "receiver", qd_conn);
+                            break;
+                        } else {
+                            // max receiver limit not violated
+                        }
+                    } else {
+                        // max receiver limit not specified
+                    }
+                    // TODO: Deny receiver link based on source
+                    // Count receiver link
+                    qd_conn->n_receivers++;
+                } else {
+                    // This connection not controlled by policy. Link implicitly allowed.
+                }
                 setup_incoming_link(container, pn_link);
+            }
         } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
             handle_link_open(container, pn_link);
         break;
@@ -398,8 +460,20 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
             // If the qd_link does not reference the pn_link, we have already freed the pn_link.
             // If we attempt to free it again, proton will crash.
             //
-            if (qd_link->pn_link == pn_link)
+            if (qd_link->pn_link == pn_link) {
+                if (qd_conn->policy_settings) {
+                    if (pn_link_is_sender(pn_link)) {
+                        qd_conn->n_senders--;
+                        assert (qd_conn->n_senders >= 0);
+                    } else {
+                        qd_conn->n_receivers--;
+                        assert (qd_conn->n_receivers >= 0);
+                    }
+                } else {
+                    // no policy - links not counted
+                }
                 pn_link_close(pn_link);
+            }
         }
         break;
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
index 5873491..82a4281 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -18,15 +18,15 @@
  */
 
 #include <Python.h>
-#include <qpid/dispatch/python_embedded.h>
+#include "qpid/dispatch/python_embedded.h"
 #include "policy_private.h"
 #include <stdio.h>
 #include <string.h>
 #include "dispatch_private.h"
 #include "connection_manager_private.h"
-#include <qpid/dispatch/container.h>
-#include <qpid/dispatch/server.h>
-#include <qpid/dispatch/message.h>
+#include "qpid/dispatch/container.h"
+#include "qpid/dispatch/server.h"
+#include "qpid/dispatch/message.h"
 #include <proton/engine.h>
 #include <proton/message.h>
 #include <proton/condition.h>
@@ -34,11 +34,11 @@
 #include <proton/transport.h>
 #include <proton/error.h>
 #include <proton/event.h>
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/hash.h>
-#include <qpid/dispatch/threading.h>
-#include <qpid/dispatch/iterator.h>
-#include <qpid/dispatch/log.h>
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/hash.h"
+#include "qpid/dispatch/threading.h"
+#include "qpid/dispatch/iterator.h"
+#include "qpid/dispatch/log.h"
 
 
 //
@@ -65,7 +65,8 @@ static char* RESOURCE_LIMIT_EXCEEDED     = "amqp:resource-limit-exceeded";
 // error descriptions signaled to effect denial
 //
 static char* CONNECTION_DISALLOWED         = "connection disallowed by local policy";
-
+static char* SESSION_DISALLOWED            = "session disallowed by local policy";
+static char* LINK_DISALLOWED               = "link disallowed by local policy";
 
 //
 // Policy configuration/statistics management interface
@@ -334,6 +335,52 @@ void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char *c
 
 //
 //
+void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
+{
+    pn_condition_t * cond = pn_session_condition(ssn);
+    (void) pn_condition_set_name(       cond, RESOURCE_LIMIT_EXCEEDED);
+    (void) pn_condition_set_description(cond, SESSION_DISALLOWED);
+    pn_session_close(ssn);
+
+    pn_connection_t *conn = qd_connection_pn(qd_conn);
+    qd_dispatch_t *qd = qd_conn->server->qd;
+    qd_policy_t *policy = qd->policy;
+    pn_transport_t *pn_trans = pn_connection_transport(conn);
+    const char *username = pn_transport_get_user(pn_trans);
+    const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
+    const char *app = pn_connection_remote_hostname(conn);
+    qd_log(policy->log_source, 
+           POLICY_LOG_LEVEL, 
+           "Policy AMQP Begin denied due to session limit. user: %s, hostip: %s, app: %s", 
+           username, hostip, app);
+}
+
+
+//
+//
+void qd_policy_deny_amqp_link(pn_link_t *link, const char* s_or_r, qd_connection_t *qd_conn)
+{
+    pn_condition_t * cond = pn_link_condition(link);
+    (void) pn_condition_set_name(       cond, RESOURCE_LIMIT_EXCEEDED);
+    (void) pn_condition_set_description(cond, LINK_DISALLOWED);
+    pn_link_close(link);
+
+    pn_connection_t *conn = qd_connection_pn(qd_conn);
+    qd_dispatch_t *qd = qd_conn->server->qd;
+    qd_policy_t *policy = qd->policy;
+    pn_transport_t *pn_trans = pn_connection_transport(conn);
+    const char *username = pn_transport_get_user(pn_trans);
+    const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
+    const char *app = pn_connection_remote_hostname(conn);
+    qd_log(policy->log_source, 
+           POLICY_LOG_LEVEL, 
+           "Policy AMQP Attach denied due to %s limit. user: %s, hostip: %s, app: %s", 
+           s_or_r, username, hostip, app);
+}
+
+
+//
+//
 void qd_policy_amqp_open(void *context, bool discard)
 {
     qd_connection_t *qd_conn = (qd_connection_t *)context;
@@ -341,47 +388,47 @@ void qd_policy_amqp_open(void *context, bool discard)
         pn_connection_t *conn = qd_connection_pn(qd_conn);
         qd_dispatch_t *qd = qd_conn->server->qd;
         qd_policy_t *policy = qd->policy;
-
-        // username = pn_connection_get_user(conn) returns blank when
-        // the transport returns 'anonymous'.
-        pn_transport_t *pn_trans = pn_connection_transport(conn);
-        const char *username = pn_transport_get_user(pn_trans);
-
-        const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
-        const char *app = pn_connection_remote_hostname(conn);
-        const char *conn_name = qdpn_connector_name(qd_conn->pn_cxtr);
+        bool connection_allowed = true;
+
+        if (policy->enableAccessRules) {
+            // Open connection or not based on policy.
+            // username = pn_connection_get_user(conn) returns blank when
+            // the transport returns 'anonymous'.
+            pn_transport_t *pn_trans = pn_connection_transport(conn);
+            const char *username = pn_transport_get_user(pn_trans);
+
+            const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
+            const char *app = pn_connection_remote_hostname(conn);
+            const char *conn_name = qdpn_connector_name(qd_conn->pn_cxtr);
 #define SETTINGS_NAME_SIZE 256
-        char settings_name[SETTINGS_NAME_SIZE];
-        uint32_t conn_id = qd_conn->connection_id;
-        // TODO: settings need to be cached and kept beyond the open
-        qd_policy_settings_t settings;
-        memset(&settings, 0, sizeof(settings));
-
-        if (!policy->enableAccessRules ||
-            (qd_policy_open_lookup_user(policy, username, hostip, app, conn_name, 
-                                        settings_name, SETTINGS_NAME_SIZE, conn_id,
-                                        &settings) &&
-             settings_name[0])) {
-            // This connection is allowed.
-            // Apply received settings
-            if (settings.maxFrameSize > 0)
-                pn_transport_set_max_frame(pn_trans, settings.maxFrameSize);
-            if (settings.maxSessions > 0)
-                pn_transport_set_channel_max(pn_trans, settings.maxSessions);
-
-            // HACK ALERT: The settings were fetched, used for the Open,
-            // and now they discarded.
-            if (settings.sources)
-                free(settings.sources);
-            if (settings.targets)
-                free(settings.targets);
-
+            char settings_name[SETTINGS_NAME_SIZE];
+            uint32_t conn_id = qd_conn->connection_id;
+            qd_conn->policy_settings = NEW(qd_policy_settings_t); // TODO: memory pool for settings
+            memset(qd_conn->policy_settings, 0, sizeof(qd_policy_settings_t));
+
+            if (qd_policy_open_lookup_user(policy, username, hostip, app, conn_name, 
+                                           settings_name, SETTINGS_NAME_SIZE, conn_id,
+                                           qd_conn->policy_settings) &&
+                settings_name[0]) {
+                // This connection is allowed by policy.
+                // Apply tranport policy settings
+                if (qd_conn->policy_settings->maxFrameSize > 0)
+                    pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize);
+                if (qd_conn->policy_settings->maxSessions > 0)
+                    pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions);
+            } else {
+                // This connection is denied by policy.
+                connection_allowed = false;
+                qd_policy_private_deny_amqp_connection(conn, RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED);
+            }
+        } else {
+            // This connection not subject to policy and implicitly allowed.
+            // Note that connections not goverened by policy have no policy_settings.
+        }
+        if (connection_allowed) {
             if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
                 pn_connection_open(conn);
             qd_connection_manager_connection_opened(qd_conn);
-        } else {
-            // This connection is denied.
-            qd_policy_private_deny_amqp_connection(conn, RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED);
         }
     }
     qd_connection_set_event_stall(qd_conn, false);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/policy_private.h
----------------------------------------------------------------------
diff --git a/src/policy_private.h b/src/policy_private.h
index 03d58ed..9d246aa 100644
--- a/src/policy_private.h
+++ b/src/policy_private.h
@@ -19,10 +19,10 @@
  * under the License.
  */
 
-#include <qpid/dispatch.h>
-#include <qpid/dispatch/server.h>
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/static_assert.h>
+#include "qpid/dispatch.h"
+#include "qpid/dispatch/server.h"
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/static_assert.h"
 
 #include "config.h"
 #include "alloc.h"
@@ -85,6 +85,22 @@ bool qd_policy_socket_accept(void *context, const char *hostname);
 void qd_policy_socket_close(void *context, const qd_connection_t *conn);
 
 
+/** Set the error condition and close the session.
+ * Over the wire this will send an begin frame followed
+ * immediately by an end frame with the error condition.
+ * @param[in] ssn proton session being closed
+ **/ 
+void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn);
+
+
+/** Set the error condition and close the link.
+ * Over the wire this will send an attach frame followed
+ * immediately by a detach frame with the error condition.
+ * @param[in] link proton link being closed
+ **/ 
+void qd_policy_deny_amqp_link(pn_link_t *link, const char* s_or_r, qd_connection_t *qd_conn);
+
+
 /** Allow or deny an incoming connection.
  * An Open performative was received over a new connection.
  * Consult local policy to determine if this host/user is

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index ca0eed7..90a0fb4 100644
--- a/src/server.c
+++ b/src/server.c
@@ -63,6 +63,19 @@ static qd_thread_t *thread(qd_server_t *qd_server, int id)
     return thread;
 }
 
+static void free_qd_connection(qd_connection_t *ctx)
+{
+    if (ctx->policy_settings) {
+        if (ctx->policy_settings->sources)
+            free(ctx->policy_settings->sources);
+        if (ctx->policy_settings->targets)
+            free(ctx->policy_settings->targets);
+        free (ctx->policy_settings);
+        ctx->policy_settings = 0;
+    }
+    free_qd_connection_t(ctx);
+}
+
 qd_error_t qd_entity_update_connection(qd_entity_t* entity, void *impl);
 
 /**
@@ -267,6 +280,8 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
         ctx->ufd           = 0;
         ctx->connection_id = qd_server->next_connection_id++; // Increment the connection id so the next connection can use it
         ctx->policy_settings = 0;
+        ctx->n_senders       = 0;
+        ctx->n_receivers     = 0;
         DEQ_INIT(ctx->deferred_calls);
         ctx->deferred_call_lock = sys_mutex();
         ctx->event_stall  = false;
@@ -729,7 +744,7 @@ static void *thread_run(void *arg)
                     pn_collector_free(ctx->collector);
                 invoke_deferred_calls(ctx, true);  // Discard any pending deferred calls
                 sys_mutex_free(ctx->deferred_call_lock);
-                free_qd_connection_t(ctx);
+                free_qd_connection(ctx);
                 qd_server->threads_active--;
                 sys_mutex_unlock(qd_server->lock);
             } else {
@@ -819,7 +834,9 @@ static void cxtr_try_open(void *context)
     ctx->link_context = 0;
     ctx->ufd          = 0;
     ctx->policy_settings = 0;
-
+    ctx->n_senders       = 0;
+    ctx->n_receivers     = 0;
+    
     DEQ_INIT(ctx->deferred_calls);
     ctx->deferred_call_lock = sys_mutex();
     ctx->event_stall  = false;
@@ -846,7 +863,7 @@ static void cxtr_try_open(void *context)
 
     if (ctx->pn_cxtr == 0) {
         sys_mutex_free(ctx->deferred_call_lock);
-        free_qd_connection_t(ctx);
+        free_qd_connection(ctx);
         ct->delay = 10000;
         qd_timer_schedule(ct->timer, ct->delay);
         return;
@@ -1382,6 +1399,8 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context)
     ctx->link_context = 0;
     ctx->ufd          = ufd;
     ctx->policy_settings = 0;
+    ctx->n_senders       = 0;
+    ctx->n_receivers     = 0;
     DEQ_INIT(ctx->deferred_calls);
     ctx->deferred_call_lock = sys_mutex();
     ctx->event_stall  = false;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index 43706ed..da70ed3 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -100,6 +100,9 @@ struct qd_connection_t {
     qd_user_fd_t     *ufd;
     uint64_t         connection_id; // A unique identifier for the qd_connection_t. The underlying pn_connection already has one but it is long and clunky.
     qd_policy_settings_t *policy_settings;
+    int              n_sessions;
+    int              n_senders;
+    int              n_receivers;
 
     qd_deferred_call_list_t  deferred_calls;
     sys_mutex_t             *deferred_call_lock;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org