You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2015/10/28 20:51:42 UTC
qpid-dispatch git commit: DISPATCH-188: Initial policy hooks for
connection approval. All connections are allowed so self tests pass.
Uncomment policy.c allow_this toggle to demonstrate connection denial.
Repository: qpid-dispatch
Updated Branches:
refs/heads/crolke-DISPATCH-188-1 [created] a4c1df1ef
DISPATCH-188: Initial policy hooks for connection approval.
All connections are allowed so self tests pass.
Uncomment policy.c allow_this toggle to demonstrate connection denial.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a4c1df1e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a4c1df1e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a4c1df1e
Branch: refs/heads/crolke-DISPATCH-188-1
Commit: a4c1df1ef980c5b373bb7a0465837cb96f93919e
Parents: 3eee868
Author: Chuck Rolke <cr...@redhat.com>
Authored: Wed Oct 28 15:35:37 2015 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Wed Oct 28 15:35:37 2015 -0400
----------------------------------------------------------------------
include/qpid/dispatch/server.h | 12 ++++++
src/CMakeLists.txt | 1 +
src/container.c | 20 +++++++--
src/policy.c | 86 +++++++++++++++++++++++++++++++++++++
src/policy_private.h | 39 +++++++++++++++++
src/server.c | 70 +++++++++++++++++-------------
src/server_private.h | 1 +
7 files changed, 195 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 3eca75c..022c131 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -517,6 +517,18 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo
/**
+ * Write accessor to the connection's proton-event stall flag.
+ * When set no further events are processed on this connection.
+ * Used during processing of policy decisions to hold off incoming
+ * pipeline of amqp events.
+ *
+ * @param conn Connection object
+ * @param stall Value of stall flag
+ */
+void qd_connection_set_event_stall(qd_connection_t *conn, bool stall);
+
+
+/**
* Create a listener for incoming connections.
*
* @param qd The dispatch handle returned by qd_dispatch.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 63ccd60..99ec122 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -60,6 +60,7 @@ set(qpid_dispatch_SOURCES
log.c
message.c
parse.c
+ policy.c
posix/driver.c
posix/threading.c
python_embedded.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 2be163d..c52ed15 100644
--- a/src/container.c
+++ b/src/container.c
@@ -21,6 +21,7 @@
#include <string.h>
#include "dispatch_private.h"
#include "connection_manager_private.h"
+#include "policy_private.h"
#include <qpid/dispatch/container.h>
#include <qpid/dispatch/server.h>
#include <qpid/dispatch/message.h>
@@ -320,9 +321,22 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
switch (pn_event_type(event)) {
case PN_CONNECTION_REMOTE_OPEN :
- if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
- pn_connection_open(conn);
- qd_connection_manager_connection_opened(qd_conn);
+ if (true) { // TODO: detect if a policy engine is present
+ // Let policy engine decide about this connection
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
+ // This Open is an externally initiated connection
+ qd_connection_set_event_stall(qd_conn, true);
+ qd_connection_invoke_deferred(qd_conn, qd_policy_handle_open, qd_conn);
+ } else {
+ // This Open is in response to an internally initiated connection
+ qd_connection_manager_connection_opened(qd_conn);
+ }
+ } else {
+ // No policy engine; allow the connection
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+ pn_connection_open(conn);
+ qd_connection_manager_connection_opened(qd_conn);
+ }
break;
case PN_CONNECTION_REMOTE_CLOSE :
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
new file mode 100644
index 0000000..8f95854
--- /dev/null
+++ b/src/policy.c
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "policy_private.h"
+#include <stdio.h>
+#include <string.h>
+#include "dispatch_private.h"
+#include "connection_manager_private.h"
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/message.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/error.h>
+#include <proton/event.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/log.h>
+
+//
+// TODO: get a real policy engine
+// This engine accepts every other connection
+//
+static bool allow_this = true;
+
+//
+// error conditions
+//
+static char* RESOURCE_LIMIT_EXCEEDED = "amqp:resource-limit-exceeded";
+//static char* UNAUTHORIZED_ACCESS = "amqp:unauthorized-access";
+//static char* CONNECTION_FORCED = "amqp:connection:forced";
+
+//
+// error descriptions
+//
+static char* CONNECTION_DISALLOWED = "connection disallowed by local policy";
+
+
+void qd_policy_handle_open(void *context, bool discard)
+{
+ qd_connection_t *qd_conn = (qd_connection_t *)context;
+
+ if (!discard) {
+ pn_connection_t *conn = qd_connection_pn(qd_conn);
+
+ if (allow_this) { // TODO: Consult actual policy engine
+ // This connection is allowed.
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+ pn_connection_open(conn);
+ qd_connection_manager_connection_opened(qd_conn);
+ } else {
+ // This connection is denied.
+ // Set the error condition and close the connection.
+ // Over the wire this will send an open frame followed
+ // immediately by a close frame with the error condition.
+ pn_condition_t * cond = pn_connection_condition(conn);
+ (void) pn_condition_set_name( cond, RESOURCE_LIMIT_EXCEEDED);
+ (void) pn_condition_set_description(cond, CONNECTION_DISALLOWED);
+ pn_connection_close(conn);
+ }
+
+ // update the policy
+ //allow_this = !allow_this;
+ }
+ qd_connection_set_event_stall(qd_conn, false);
+}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/policy_private.h
----------------------------------------------------------------------
diff --git a/src/policy_private.h b/src/policy_private.h
new file mode 100644
index 0000000..9b06648
--- /dev/null
+++ b/src/policy_private.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/static_assert.h>
+
+#include "config.h"
+#include "alloc.h"
+#include "entity.h"
+#include "entity_cache.h"
+#include <dlfcn.h>
+
+/** Allow or deny an incoming connection.
+ * An Open performative was received over a new connection.
+ * Consult local policy to determine if this host/user is
+ * allow to make this connection. The underlying proton
+ * connection is either opened or closed.
+ * @param[in] context a qd_connection_t object
+ * @param[in] discard callback switch
+ **/
+void qd_policy_handle_open(void *context, bool discard);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index bca3eb2..512bcc0 100644
--- a/src/server.c
+++ b/src/server.c
@@ -436,40 +436,41 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
pn_event_t *event;
events = 0;
- event = pn_collector_peek(collector);
- while (event) {
- //
- // If we are transitioning to the open state, notify the client via callback.
- //
- if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) {
- ctx->opened = true;
- qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN;
-
- if (ctx->connector) {
- ce = QD_CONN_EVENT_CONNECTOR_OPEN;
- ctx->connector->delay = 0;
- } else
- assert(ctx->listener);
-
- qd_server->conn_handler(qd_server->conn_handler_context,
- ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr));
- events = 1;
- break; // Break without popping this event. It will be re-processed in OPERATIONAL state.
- } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) {
- ctx->closed = true;
- qdpn_connector_close(cxtr);
- if (ctx->connector) {
- const qd_server_config_t *config = ctx->connector->config;
- qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+ if (!ctx->event_stall) {
+ event = pn_collector_peek(collector);
+ while (event) {
+ //
+ // If we are transitioning to the open state, notify the client via callback.
+ //
+ if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) {
+ ctx->opened = true;
+ qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN;
+
+ if (ctx->connector) {
+ ce = QD_CONN_EVENT_CONNECTOR_OPEN;
+ ctx->connector->delay = 0;
+ } else
+ assert(ctx->listener);
+
+ qd_server->conn_handler(qd_server->conn_handler_context,
+ ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr));
+ events = 1;
+ } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) {
+ ctx->closed = true;
+ qdpn_connector_close(cxtr);
+ if (ctx->connector) {
+ const qd_server_config_t *config = ctx->connector->config;
+ qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+ }
}
- }
- events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
- pn_collector_pop(collector);
- event = pn_collector_peek(collector);
- }
+ events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
+ pn_collector_pop(collector);
- events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
+ event = ctx->event_stall ? 0 : pn_collector_peek(collector);
+ }
+ events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
+ }
} while (events > 0);
return passes > 1;
@@ -1260,6 +1261,12 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo
}
+void qd_connection_set_event_stall(qd_connection_t *conn, bool stall)
+{
+ conn->event_stall = stall;
+}
+
+
qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
{
qd_server_t *qd_server = qd->server;
@@ -1364,6 +1371,7 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context)
ctx->ufd = ufd;
DEQ_INIT(ctx->deferred_calls);
ctx->deferred_call_lock = sys_mutex();
+ ctx->event_stall = false;
ufd->context = context;
ufd->server = qd_server;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index a40a503..e61da61 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -102,6 +102,7 @@ struct qd_connection_t {
qd_deferred_call_list_t deferred_calls;
sys_mutex_t *deferred_call_lock;
+ bool event_stall;
};
DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org