You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2016/03/12 00:26:55 UTC
qpid-dispatch git commit: Implement policy for sessions and partial
policy for links. Connections now have private copy of policy settings.
Repository: qpid-dispatch
Updated Branches:
refs/heads/crolke-DISPATCH-188-1 626d37043 -> 9e255e8b0
Implement policy for sessions and partial policy for links.
Connections now have private copy of policy settings.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9e255e8b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9e255e8b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9e255e8b
Branch: refs/heads/crolke-DISPATCH-188-1
Commit: 9e255e8b0ef4dc297a013da9ef3f972b3bef5e81
Parents: 626d370
Author: Chuck Rolke <cr...@redhat.com>
Authored: Fri Mar 11 18:23:44 2016 -0500
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Fri Mar 11 18:23:44 2016 -0500
----------------------------------------------------------------------
src/container.c | 82 +++++++++++++++++++++++++--
src/policy.c | 141 ++++++++++++++++++++++++++++++----------------
src/policy_private.h | 24 ++++++--
src/server.c | 25 +++++++-
src/server_private.h | 3 +
5 files changed, 217 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 5f9d161..3573370 100644
--- a/src/container.c
+++ b/src/container.c
@@ -343,7 +343,20 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
case PN_SESSION_REMOTE_OPEN :
ssn = pn_event_session(event);
if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
- pn_session_set_incoming_capacity(ssn, 1000000);
+ if (qd_conn->policy_settings) {
+ if (qd_conn->policy_settings->maxSessions) {
+ if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) {
+ qd_policy_deny_amqp_session(ssn, qd_conn);
+ break;
+ }
+ }
+ qd_conn->n_sessions++;
+ }
+ if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) {
+ pn_session_set_incoming_capacity(ssn, qd_conn->policy_settings->maxSessionWindow);
+ } else {
+ pn_session_set_incoming_capacity(ssn, 1000000);
+ }
pn_session_open(ssn);
}
break;
@@ -360,6 +373,15 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
if (pn_link_session(pn_link) == ssn) {
qd_link_t *qd_link = (qd_link_t *)pn_link_get_context(pn_link);
if (qd_link && qd_link->node) {
+ if (qd_conn->policy_settings) {
+ if (qd_link->direction == QD_OUTGOING) {
+ qd_conn->n_senders--;
+ assert(qd_conn->n_senders >= 0);
+ } else {
+ qd_conn->n_receivers--;
+ assert(qd_conn->n_receivers >= 0);
+ }
+ }
qd_log(container->log_source, QD_LOG_NOTICE,
"Aborting link '%s' due to parent session end",
pn_link_name(pn_link));
@@ -369,6 +391,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
}
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
+ qd_conn->n_sessions--;
pn_session_close(ssn);
}
break;
@@ -376,10 +399,49 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
case PN_LINK_REMOTE_OPEN :
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
- if (pn_link_is_sender(pn_link))
+ if (pn_link_is_sender(pn_link)) {
+ if (qd_conn->policy_settings) {
+ // Open link or not based on policy.
+ if (qd_conn->policy_settings->maxSenders) {
+ if (qd_conn->n_senders == qd_conn->policy_settings->maxSenders) {
+ // Max sender limit specified and violated.
+ qd_policy_deny_amqp_link(pn_link, "sender", qd_conn);
+ break;
+ } else {
+ // max sender limit not violated
+ }
+ } else {
+ // max sender limit not specified
+ }
+ // TODO: Deny sender link based on target
+ // Count sender link
+ qd_conn->n_senders++;
+ } else {
+ // This connection not controlled by policy. Link implicitly allowed.
+ }
setup_outgoing_link(container, pn_link);
- else
+ } else {
+ if (qd_conn->policy_settings) {
+ // Open link or not based on policy.
+ if (qd_conn->policy_settings->maxReceivers) {
+ if (qd_conn->n_receivers == qd_conn->policy_settings->maxReceivers) {
+ // Max sender limit specified and violated.
+ qd_policy_deny_amqp_link(pn_link, "receiver", qd_conn);
+ break;
+ } else {
+ // max receiver limit not violated
+ }
+ } else {
+ // max receiver limit not specified
+ }
+ // TODO: Deny receiver link based on source
+ // Count receiver link
+ qd_conn->n_receivers++;
+ } else {
+ // This connection not controlled by policy. Link implicitly allowed.
+ }
setup_incoming_link(container, pn_link);
+ }
} else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
handle_link_open(container, pn_link);
break;
@@ -398,8 +460,20 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
// If the qd_link does not reference the pn_link, we have already freed the pn_link.
// If we attempt to free it again, proton will crash.
//
- if (qd_link->pn_link == pn_link)
+ if (qd_link->pn_link == pn_link) {
+ if (qd_conn->policy_settings) {
+ if (pn_link_is_sender(pn_link)) {
+ qd_conn->n_senders--;
+ assert (qd_conn->n_senders >= 0);
+ } else {
+ qd_conn->n_receivers--;
+ assert (qd_conn->n_receivers >= 0);
+ }
+ } else {
+ // no policy - links not counted
+ }
pn_link_close(pn_link);
+ }
}
break;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
index 5873491..82a4281 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -18,15 +18,15 @@
*/
#include <Python.h>
-#include <qpid/dispatch/python_embedded.h>
+#include "qpid/dispatch/python_embedded.h"
#include "policy_private.h"
#include <stdio.h>
#include <string.h>
#include "dispatch_private.h"
#include "connection_manager_private.h"
-#include <qpid/dispatch/container.h>
-#include <qpid/dispatch/server.h>
-#include <qpid/dispatch/message.h>
+#include "qpid/dispatch/container.h"
+#include "qpid/dispatch/server.h"
+#include "qpid/dispatch/message.h"
#include <proton/engine.h>
#include <proton/message.h>
#include <proton/condition.h>
@@ -34,11 +34,11 @@
#include <proton/transport.h>
#include <proton/error.h>
#include <proton/event.h>
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/hash.h>
-#include <qpid/dispatch/threading.h>
-#include <qpid/dispatch/iterator.h>
-#include <qpid/dispatch/log.h>
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/hash.h"
+#include "qpid/dispatch/threading.h"
+#include "qpid/dispatch/iterator.h"
+#include "qpid/dispatch/log.h"
//
@@ -65,7 +65,8 @@ static char* RESOURCE_LIMIT_EXCEEDED = "amqp:resource-limit-exceeded";
// error descriptions signaled to effect denial
//
static char* CONNECTION_DISALLOWED = "connection disallowed by local policy";
-
+static char* SESSION_DISALLOWED = "session disallowed by local policy";
+static char* LINK_DISALLOWED = "link disallowed by local policy";
//
// Policy configuration/statistics management interface
@@ -334,6 +335,52 @@ void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char *c
//
//
+void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
+{
+ pn_condition_t * cond = pn_session_condition(ssn);
+ (void) pn_condition_set_name( cond, RESOURCE_LIMIT_EXCEEDED);
+ (void) pn_condition_set_description(cond, SESSION_DISALLOWED);
+ pn_session_close(ssn);
+
+ pn_connection_t *conn = qd_connection_pn(qd_conn);
+ qd_dispatch_t *qd = qd_conn->server->qd;
+ qd_policy_t *policy = qd->policy;
+ pn_transport_t *pn_trans = pn_connection_transport(conn);
+ const char *username = pn_transport_get_user(pn_trans);
+ const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
+ const char *app = pn_connection_remote_hostname(conn);
+ qd_log(policy->log_source,
+ POLICY_LOG_LEVEL,
+ "Policy AMQP Begin denied due to session limit. user: %s, hostip: %s, app: %s",
+ username, hostip, app);
+}
+
+
+//
+//
+void qd_policy_deny_amqp_link(pn_link_t *link, const char* s_or_r, qd_connection_t *qd_conn)
+{
+ pn_condition_t * cond = pn_link_condition(link);
+ (void) pn_condition_set_name( cond, RESOURCE_LIMIT_EXCEEDED);
+ (void) pn_condition_set_description(cond, LINK_DISALLOWED);
+ pn_link_close(link);
+
+ pn_connection_t *conn = qd_connection_pn(qd_conn);
+ qd_dispatch_t *qd = qd_conn->server->qd;
+ qd_policy_t *policy = qd->policy;
+ pn_transport_t *pn_trans = pn_connection_transport(conn);
+ const char *username = pn_transport_get_user(pn_trans);
+ const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
+ const char *app = pn_connection_remote_hostname(conn);
+ qd_log(policy->log_source,
+ POLICY_LOG_LEVEL,
+ "Policy AMQP Attach denied due to %s limit. user: %s, hostip: %s, app: %s",
+ s_or_r, username, hostip, app);
+}
+
+
+//
+//
void qd_policy_amqp_open(void *context, bool discard)
{
qd_connection_t *qd_conn = (qd_connection_t *)context;
@@ -341,47 +388,47 @@ void qd_policy_amqp_open(void *context, bool discard)
pn_connection_t *conn = qd_connection_pn(qd_conn);
qd_dispatch_t *qd = qd_conn->server->qd;
qd_policy_t *policy = qd->policy;
-
- // username = pn_connection_get_user(conn) returns blank when
- // the transport returns 'anonymous'.
- pn_transport_t *pn_trans = pn_connection_transport(conn);
- const char *username = pn_transport_get_user(pn_trans);
-
- const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
- const char *app = pn_connection_remote_hostname(conn);
- const char *conn_name = qdpn_connector_name(qd_conn->pn_cxtr);
+ bool connection_allowed = true;
+
+ if (policy->enableAccessRules) {
+ // Open connection or not based on policy.
+ // username = pn_connection_get_user(conn) returns blank when
+ // the transport returns 'anonymous'.
+ pn_transport_t *pn_trans = pn_connection_transport(conn);
+ const char *username = pn_transport_get_user(pn_trans);
+
+ const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr);
+ const char *app = pn_connection_remote_hostname(conn);
+ const char *conn_name = qdpn_connector_name(qd_conn->pn_cxtr);
#define SETTINGS_NAME_SIZE 256
- char settings_name[SETTINGS_NAME_SIZE];
- uint32_t conn_id = qd_conn->connection_id;
- // TODO: settings need to be cached and kept beyond the open
- qd_policy_settings_t settings;
- memset(&settings, 0, sizeof(settings));
-
- if (!policy->enableAccessRules ||
- (qd_policy_open_lookup_user(policy, username, hostip, app, conn_name,
- settings_name, SETTINGS_NAME_SIZE, conn_id,
- &settings) &&
- settings_name[0])) {
- // This connection is allowed.
- // Apply received settings
- if (settings.maxFrameSize > 0)
- pn_transport_set_max_frame(pn_trans, settings.maxFrameSize);
- if (settings.maxSessions > 0)
- pn_transport_set_channel_max(pn_trans, settings.maxSessions);
-
- // HACK ALERT: The settings were fetched, used for the Open,
- // and now they discarded.
- if (settings.sources)
- free(settings.sources);
- if (settings.targets)
- free(settings.targets);
-
+ char settings_name[SETTINGS_NAME_SIZE];
+ uint32_t conn_id = qd_conn->connection_id;
+ qd_conn->policy_settings = NEW(qd_policy_settings_t); // TODO: memory pool for settings
+ memset(qd_conn->policy_settings, 0, sizeof(qd_policy_settings_t));
+
+ if (qd_policy_open_lookup_user(policy, username, hostip, app, conn_name,
+ settings_name, SETTINGS_NAME_SIZE, conn_id,
+ qd_conn->policy_settings) &&
+ settings_name[0]) {
+ // This connection is allowed by policy.
+ // Apply tranport policy settings
+ if (qd_conn->policy_settings->maxFrameSize > 0)
+ pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize);
+ if (qd_conn->policy_settings->maxSessions > 0)
+ pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions);
+ } else {
+ // This connection is denied by policy.
+ connection_allowed = false;
+ qd_policy_private_deny_amqp_connection(conn, RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED);
+ }
+ } else {
+ // This connection not subject to policy and implicitly allowed.
+ // Note that connections not goverened by policy have no policy_settings.
+ }
+ if (connection_allowed) {
if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
pn_connection_open(conn);
qd_connection_manager_connection_opened(qd_conn);
- } else {
- // This connection is denied.
- qd_policy_private_deny_amqp_connection(conn, RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED);
}
}
qd_connection_set_event_stall(qd_conn, false);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/policy_private.h
----------------------------------------------------------------------
diff --git a/src/policy_private.h b/src/policy_private.h
index 03d58ed..9d246aa 100644
--- a/src/policy_private.h
+++ b/src/policy_private.h
@@ -19,10 +19,10 @@
* under the License.
*/
-#include <qpid/dispatch.h>
-#include <qpid/dispatch/server.h>
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/static_assert.h>
+#include "qpid/dispatch.h"
+#include "qpid/dispatch/server.h"
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/static_assert.h"
#include "config.h"
#include "alloc.h"
@@ -85,6 +85,22 @@ bool qd_policy_socket_accept(void *context, const char *hostname);
void qd_policy_socket_close(void *context, const qd_connection_t *conn);
+/** Set the error condition and close the session.
+ * Over the wire this will send an begin frame followed
+ * immediately by an end frame with the error condition.
+ * @param[in] ssn proton session being closed
+ **/
+void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn);
+
+
+/** Set the error condition and close the link.
+ * Over the wire this will send an attach frame followed
+ * immediately by a detach frame with the error condition.
+ * @param[in] link proton link being closed
+ **/
+void qd_policy_deny_amqp_link(pn_link_t *link, const char* s_or_r, qd_connection_t *qd_conn);
+
+
/** Allow or deny an incoming connection.
* An Open performative was received over a new connection.
* Consult local policy to determine if this host/user is
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index ca0eed7..90a0fb4 100644
--- a/src/server.c
+++ b/src/server.c
@@ -63,6 +63,19 @@ static qd_thread_t *thread(qd_server_t *qd_server, int id)
return thread;
}
+static void free_qd_connection(qd_connection_t *ctx)
+{
+ if (ctx->policy_settings) {
+ if (ctx->policy_settings->sources)
+ free(ctx->policy_settings->sources);
+ if (ctx->policy_settings->targets)
+ free(ctx->policy_settings->targets);
+ free (ctx->policy_settings);
+ ctx->policy_settings = 0;
+ }
+ free_qd_connection_t(ctx);
+}
+
qd_error_t qd_entity_update_connection(qd_entity_t* entity, void *impl);
/**
@@ -267,6 +280,8 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
ctx->ufd = 0;
ctx->connection_id = qd_server->next_connection_id++; // Increment the connection id so the next connection can use it
ctx->policy_settings = 0;
+ ctx->n_senders = 0;
+ ctx->n_receivers = 0;
DEQ_INIT(ctx->deferred_calls);
ctx->deferred_call_lock = sys_mutex();
ctx->event_stall = false;
@@ -729,7 +744,7 @@ static void *thread_run(void *arg)
pn_collector_free(ctx->collector);
invoke_deferred_calls(ctx, true); // Discard any pending deferred calls
sys_mutex_free(ctx->deferred_call_lock);
- free_qd_connection_t(ctx);
+ free_qd_connection(ctx);
qd_server->threads_active--;
sys_mutex_unlock(qd_server->lock);
} else {
@@ -819,7 +834,9 @@ static void cxtr_try_open(void *context)
ctx->link_context = 0;
ctx->ufd = 0;
ctx->policy_settings = 0;
-
+ ctx->n_senders = 0;
+ ctx->n_receivers = 0;
+
DEQ_INIT(ctx->deferred_calls);
ctx->deferred_call_lock = sys_mutex();
ctx->event_stall = false;
@@ -846,7 +863,7 @@ static void cxtr_try_open(void *context)
if (ctx->pn_cxtr == 0) {
sys_mutex_free(ctx->deferred_call_lock);
- free_qd_connection_t(ctx);
+ free_qd_connection(ctx);
ct->delay = 10000;
qd_timer_schedule(ct->timer, ct->delay);
return;
@@ -1382,6 +1399,8 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context)
ctx->link_context = 0;
ctx->ufd = ufd;
ctx->policy_settings = 0;
+ ctx->n_senders = 0;
+ ctx->n_receivers = 0;
DEQ_INIT(ctx->deferred_calls);
ctx->deferred_call_lock = sys_mutex();
ctx->event_stall = false;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9e255e8b/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index 43706ed..da70ed3 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -100,6 +100,9 @@ struct qd_connection_t {
qd_user_fd_t *ufd;
uint64_t connection_id; // A unique identifier for the qd_connection_t. The underlying pn_connection already has one but it is long and clunky.
qd_policy_settings_t *policy_settings;
+ int n_sessions;
+ int n_senders;
+ int n_receivers;
qd_deferred_call_list_t deferred_calls;
sys_mutex_t *deferred_call_lock;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org