You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2015/10/28 20:51:42 UTC

qpid-dispatch git commit: DISPATCH-188: Initial policy hooks for connection approval. All connections are allowed so self tests pass. Uncomment policy.c allow_this toggle to demonstrate connection denial.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/crolke-DISPATCH-188-1 [created] a4c1df1ef


DISPATCH-188: Initial policy hooks for connection approval.
All connections are allowed so self tests pass.
Uncomment policy.c allow_this toggle to demonstrate connection denial.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a4c1df1e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a4c1df1e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a4c1df1e

Branch: refs/heads/crolke-DISPATCH-188-1
Commit: a4c1df1ef980c5b373bb7a0465837cb96f93919e
Parents: 3eee868
Author: Chuck Rolke <cr...@redhat.com>
Authored: Wed Oct 28 15:35:37 2015 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Wed Oct 28 15:35:37 2015 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/server.h | 12 ++++++
 src/CMakeLists.txt             |  1 +
 src/container.c                | 20 +++++++--
 src/policy.c                   | 86 +++++++++++++++++++++++++++++++++++++
 src/policy_private.h           | 39 +++++++++++++++++
 src/server.c                   | 70 +++++++++++++++++-------------
 src/server_private.h           |  1 +
 7 files changed, 195 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 3eca75c..022c131 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -517,6 +517,18 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo
 
 
 /**
+ * Write accessor to the connection's proton-event stall flag.
+ * When set no further events are processed on this connection.
+ * Used during processing of policy decisions to hold off incoming
+ * pipeline of amqp events.
+ *
+ * @param conn Connection object
+ * @param stall Value of stall flag
+ */
+void qd_connection_set_event_stall(qd_connection_t *conn, bool stall);
+
+
+/**
  * Create a listener for incoming connections.
  *
  * @param qd The dispatch handle returned by qd_dispatch.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 63ccd60..99ec122 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -60,6 +60,7 @@ set(qpid_dispatch_SOURCES
   log.c
   message.c
   parse.c
+  policy.c
   posix/driver.c
   posix/threading.c
   python_embedded.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 2be163d..c52ed15 100644
--- a/src/container.c
+++ b/src/container.c
@@ -21,6 +21,7 @@
 #include <string.h>
 #include "dispatch_private.h"
 #include "connection_manager_private.h"
+#include "policy_private.h"
 #include <qpid/dispatch/container.h>
 #include <qpid/dispatch/server.h>
 #include <qpid/dispatch/message.h>
@@ -320,9 +321,22 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
 
     switch (pn_event_type(event)) {
     case PN_CONNECTION_REMOTE_OPEN :
-        if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
-            pn_connection_open(conn);
-        qd_connection_manager_connection_opened(qd_conn);
+        if (true) {  // TODO: detect if a policy engine is present
+            // Let policy engine decide about this connection
+            if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
+                // This Open is an externally initiated connection
+                qd_connection_set_event_stall(qd_conn, true);
+                qd_connection_invoke_deferred(qd_conn, qd_policy_handle_open, qd_conn);
+            } else {
+                // This Open is in response to an internally initiated connection
+                qd_connection_manager_connection_opened(qd_conn);
+            }
+        } else {
+            // No policy engine; allow the connection
+            if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+                pn_connection_open(conn);
+            qd_connection_manager_connection_opened(qd_conn);
+        }
         break;
 
     case PN_CONNECTION_REMOTE_CLOSE :

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
new file mode 100644
index 0000000..8f95854
--- /dev/null
+++ b/src/policy.c
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "policy_private.h"
+#include <stdio.h>
+#include <string.h>
+#include "dispatch_private.h"
+#include "connection_manager_private.h"
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/message.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/error.h>
+#include <proton/event.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/log.h>
+
+//
+// TODO: get a real policy engine
+//       This engine accepts every other connection
+//
+static bool allow_this = true;
+
+//
+// error conditions
+//
+static char* RESOURCE_LIMIT_EXCEEDED     = "amqp:resource-limit-exceeded";
+//static char* UNAUTHORIZED_ACCESS         = "amqp:unauthorized-access";
+//static char* CONNECTION_FORCED           = "amqp:connection:forced";
+
+//
+// error descriptions
+//
+static char* CONNECTION_DISALLOWED         = "connection disallowed by local policy";
+
+
+void qd_policy_handle_open(void *context, bool discard)
+{
+    qd_connection_t *qd_conn = (qd_connection_t *)context;
+    
+    if (!discard) {
+        pn_connection_t *conn = qd_connection_pn(qd_conn);
+
+        if (allow_this) { // TODO: Consult actual policy engine
+            // This connection is allowed.
+            if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+                pn_connection_open(conn);
+            qd_connection_manager_connection_opened(qd_conn);
+        } else {
+            // This connection is denied.
+            // Set the error condition and close the connection.
+            // Over the wire this will send an open frame followed
+            // immediately by a close frame with the error condition.
+            pn_condition_t * cond = pn_connection_condition(conn);
+            (void) pn_condition_set_name(       cond, RESOURCE_LIMIT_EXCEEDED);
+            (void) pn_condition_set_description(cond, CONNECTION_DISALLOWED);
+            pn_connection_close(conn);
+        }
+    
+        // update the policy
+        //allow_this = !allow_this;
+    }
+    qd_connection_set_event_stall(qd_conn, false);
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/policy_private.h
----------------------------------------------------------------------
diff --git a/src/policy_private.h b/src/policy_private.h
new file mode 100644
index 0000000..9b06648
--- /dev/null
+++ b/src/policy_private.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/static_assert.h>
+
+#include "config.h"
+#include "alloc.h"
+#include "entity.h"
+#include "entity_cache.h"
+#include <dlfcn.h>
+
+/** Allow or deny an incoming connection.
+ * An Open performative was received over a new connection.
+ * Consult local policy to determine if this host/user is
+ * allow to make this connection. The underlying proton 
+ * connection is either opened or closed.
+ * @param[in] context a qd_connection_t object
+ * @param[in] discard callback switch
+ **/
+void qd_policy_handle_open(void *context, bool discard);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index bca3eb2..512bcc0 100644
--- a/src/server.c
+++ b/src/server.c
@@ -436,40 +436,41 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
         pn_event_t      *event;
 
         events = 0;
-        event = pn_collector_peek(collector);
-        while (event) {
-            //
-            // If we are transitioning to the open state, notify the client via callback.
-            //
-            if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) {
-                ctx->opened = true;
-                qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN;
-
-                if (ctx->connector) {
-                    ce = QD_CONN_EVENT_CONNECTOR_OPEN;
-                    ctx->connector->delay = 0;
-                } else
-                    assert(ctx->listener);
-
-                qd_server->conn_handler(qd_server->conn_handler_context,
-                                        ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr));
-                events = 1;
-                break;  // Break without popping this event.  It will be re-processed in OPERATIONAL state.
-            } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) {
-                ctx->closed = true;
-                qdpn_connector_close(cxtr);
-                if (ctx->connector) {
-                    const qd_server_config_t *config = ctx->connector->config;
-                    qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+        if (!ctx->event_stall) {
+            event = pn_collector_peek(collector);
+            while (event) {
+                //
+                // If we are transitioning to the open state, notify the client via callback.
+                //
+                if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) {
+                    ctx->opened = true;
+                    qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN;
+
+                    if (ctx->connector) {
+                        ce = QD_CONN_EVENT_CONNECTOR_OPEN;
+                        ctx->connector->delay = 0;
+                    } else
+                        assert(ctx->listener);
+
+                    qd_server->conn_handler(qd_server->conn_handler_context,
+                                            ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr));
+                    events = 1;
+                } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) {
+                    ctx->closed = true;
+                    qdpn_connector_close(cxtr);
+                    if (ctx->connector) {
+                        const qd_server_config_t *config = ctx->connector->config;
+                        qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+                    }
                 }
-            }
 
-            events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
-            pn_collector_pop(collector);
-            event = pn_collector_peek(collector);
-        }
+                events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
+                pn_collector_pop(collector);
 
-        events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
+                event = ctx->event_stall ? 0 : pn_collector_peek(collector);
+            }
+            events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
+        }
     } while (events > 0);
 
     return passes > 1;
@@ -1260,6 +1261,12 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo
 }
 
 
+void qd_connection_set_event_stall(qd_connection_t *conn, bool stall)
+{
+    conn->event_stall = stall;
+}
+
+
 qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
 {
     qd_server_t   *qd_server = qd->server;
@@ -1364,6 +1371,7 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context)
     ctx->ufd          = ufd;
     DEQ_INIT(ctx->deferred_calls);
     ctx->deferred_call_lock = sys_mutex();
+    ctx->event_stall  = false;
 
     ufd->context = context;
     ufd->server  = qd_server;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a4c1df1e/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index a40a503..e61da61 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -102,6 +102,7 @@ struct qd_connection_t {
 
     qd_deferred_call_list_t  deferred_calls;
     sys_mutex_t             *deferred_call_lock;
+    bool             event_stall;
 };
 
 DEQ_DECLARE(qd_connection_t, qd_connection_list_t);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org