You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2016/04/04 18:13:31 UTC

qpid-dispatch git commit: DISPATCH-255: Policy miscounting socket open/close. Eliminate redundant connection context.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 44f38e4c6 -> 7817e2b87


DISPATCH-255: Policy miscounting socket open/close.
Eliminate redundant connection context.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7817e2b8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7817e2b8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7817e2b8

Branch: refs/heads/master
Commit: 7817e2b8778f090407bfb636862993ed2fd06f30
Parents: 44f38e4
Author: Chuck Rolke <cr...@redhat.com>
Authored: Mon Apr 4 11:59:58 2016 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Mon Apr 4 11:59:58 2016 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/driver.h | 10 ++++++++--
 src/container.c                |  1 -
 src/policy.c                   |  2 +-
 src/posix/driver.c             | 17 ++++++++++++-----
 src/server.c                   | 13 +++++++------
 src/server_private.h           |  2 +-
 6 files changed, 29 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7817e2b8/include/qpid/dispatch/driver.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/driver.h b/include/qpid/dispatch/driver.h
index c3c1b79..cf8508b 100644
--- a/include/qpid/dispatch/driver.h
+++ b/include/qpid/dispatch/driver.h
@@ -178,11 +178,17 @@ void qdpn_listener_trace(qdpn_listener_t *listener, pn_trace_t trace);
 /** Accept a connection that is pending on the listener.
  *
  * @param[in] listener the listener to accept the connection on
- * @param[in] policy function that accepts remote host name and returns 
+ * @param[in] policy policy that holds absolute connection limits
+ * @param[in] policy_fn function that accepts remote host name and returns
  *            decision to allow or deny this connection
+ * @param[out] counted pointer to a bool set to true when the connection was
+ *             counted against absolute connection limits
  * @return a new connector for the remote, or NULL on error
  */
-qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener, void *policy, bool (*policy_fn)(void *, const char *));
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener,
+                                       void *policy,
+                                       bool (*policy_fn)(void *, const char *),
+                                       bool *counted);
 
 /** Access the application context that is associated with the listener.
  *

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7817e2b8/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index fa2f306..254e1ad 100644
--- a/src/container.c
+++ b/src/container.c
@@ -381,7 +381,6 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
             // Let policy engine decide
             qd_connection_set_event_stall(qd_conn, true);
             qd_conn->open_container = (void *)container;
-            qd_conn->conn_context = conn_context;
             qd_connection_invoke_deferred(qd_conn, qd_policy_amqp_open, qd_conn);
         } else {
             // This Open is in response to an internally initiated connection

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7817e2b8/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
index e881aa9..dbd3015 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -732,7 +732,7 @@ void qd_policy_amqp_open(void *context, bool discard)
             if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
                 pn_connection_open(conn);
             qd_connection_manager_connection_opened(qd_conn);
-            policy_notify_opened(qd_conn->open_container, qd_conn, qd_conn->conn_context);
+            policy_notify_opened(qd_conn->open_container, qd_conn, qd_conn->context);
         }
     }
     qd_connection_set_event_stall(qd_conn, false);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7817e2b8/src/posix/driver.c
----------------------------------------------------------------------
diff --git a/src/posix/driver.c b/src/posix/driver.c
index 3d4bbca..0273b2a 100644
--- a/src/posix/driver.c
+++ b/src/posix/driver.c
@@ -374,7 +374,10 @@ void qdpn_listener_set_context(qdpn_listener_t *listener, void *context)
     listener->context = context;
 }
 
-qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l, void *policy, bool (*policy_fn)(void *, const char *name))
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l,
+                                       void *policy,
+                                       bool (*policy_fn)(void *, const char *name),
+                                       bool *counted)
 {
     if (!l || !l->pending) return NULL;
     char name[PN_NAME_MAX];
@@ -403,11 +406,15 @@ qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l, void *policy, bool (*
         }
     }
 
-    if (policy_fn && !(*policy_fn)(policy, name)) {
-        close(sock);
-        return 0;
+    if (policy_fn) {
+        if (!(*policy_fn)(policy, name)) {
+            close(sock);
+            return 0;
+        } else {
+            *counted = true;
+        }
     }
-    
+
     if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
         fprintf(stderr, "Accepted from %s\n", name);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7817e2b8/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 1929775..30e57a8 100644
--- a/src/server.c
+++ b/src/server.c
@@ -479,7 +479,8 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
     qd_connection_t  *ctx;
 
     for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) {
-        cxtr = qdpn_listener_accept(listener, qd_server->qd->policy, &qd_policy_socket_accept);
+        bool policy_counted = false;
+        cxtr = qdpn_listener_accept(listener, qd_server->qd->policy, &qd_policy_socket_accept, &policy_counted);
         if (!cxtr)
             continue;
 
@@ -510,10 +511,10 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
         ctx->n_senders       = 0;
         ctx->n_receivers     = 0;
         ctx->open_container  = 0;
-        ctx->conn_context    = 0;
         DEQ_INIT(ctx->deferred_calls);
         ctx->deferred_call_lock = sys_mutex();
         ctx->event_stall  = false;
+        ctx->policy_counted = policy_counted;
 
         pn_connection_t *conn = pn_connection();
         ctx->collector = pn_collector();
@@ -957,7 +958,7 @@ static void *thread_run(void *arg)
                 sys_mutex_lock(qd_server->lock);
                 DEQ_REMOVE(qd_server->connections, ctx);
 
-                if (!ctx->connector && ctx->open_container) {
+                if (ctx->policy_counted) {
                     qd_policy_socket_close(qd_server->qd->policy, ctx);
                 }
 
@@ -1058,11 +1059,11 @@ static void cxtr_try_open(void *context)
     ctx->n_senders       = 0;
     ctx->n_receivers     = 0;
     ctx->open_container  = 0;
-    ctx->conn_context    = 0;
-    
+
     DEQ_INIT(ctx->deferred_calls);
     ctx->deferred_call_lock = sys_mutex();
     ctx->event_stall  = false;
+    ctx->policy_counted = false;
 
     qd_log(ct->server->log_source, QD_LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
 
@@ -1636,10 +1637,10 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context)
     ctx->n_senders       = 0;
     ctx->n_receivers     = 0;
     ctx->open_container  = 0;
-    ctx->conn_context    = 0;
     DEQ_INIT(ctx->deferred_calls);
     ctx->deferred_call_lock = sys_mutex();
     ctx->event_stall  = false;
+    ctx->policy_counted = false;
 
     ufd->context = context;
     ufd->server  = qd_server;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7817e2b8/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index 11da8e2..562feb5 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -105,10 +105,10 @@ struct qd_connection_t {
     int                       n_senders;
     int                       n_receivers;
     void                     *open_container;
-    void                     *conn_context;
     qd_deferred_call_list_t   deferred_calls;
     sys_mutex_t              *deferred_call_lock;
     bool                      event_stall;
+    bool                      policy_counted;
 };
 
 DEQ_DECLARE(qd_connection_t, qd_connection_list_t);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org