You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2019/04/09 13:04:31 UTC

[qpid-dispatch] branch master updated (c27d73b -> 2f91e20)

This is an automated email from the ASF dual-hosted git repository.

gsim pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


    from c27d73b  DISPATCH-1309 - Move the decrement of the content ref-count below the buffer-trimming logic in qd_message_free.  This eliminates the case where message content can be freed by one thread while another is doing buffer-trimming on the same content.
     new ade3b3b  DISPATCH-1313 - Added policyVhost attribute to the listener entity.  This optional field, if supplied, provides the vhost name to be used for policy lookup on connections arriving through the listener.  It allows multiple listeners to use different policy settings.
     new c354fa4  DISPATCH-1314: suppress spurious errors from log
     new dfadc67  DISPATCH-1315: ensure that the state shared by threads other than the http thread cannot be prematurely deleted
     new 2f91e20  DISPATCH-1316: atomic checking for deletability

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/qpid/dispatch/server.h                  |  6 ++
 python/qpid_dispatch/management/qdrouter.json   |  6 ++
 src/connection_manager.c                        |  3 +-
 src/http-libwebsockets.c                        | 80 ++++++++++++++++++++-----
 src/policy.c                                    |  6 +-
 src/remote_sasl.c                               | 53 ++++++++++------
 tests/policy-3/test-sender-receiver-limits.json | 23 +++++++
 tests/system_tests_policy.py                    | 54 +++++++++++++++++
 8 files changed, 195 insertions(+), 36 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/04: DISPATCH-1313 - Added policyVhost attribute to the listener entity. This optional field, if supplied, provides the vhost name to be used for policy lookup on connections arriving through the listener. It allows multiple listeners to use different policy settings.

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit ade3b3b2aa28b09942f91b34b6e473471476543a
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Fri Apr 5 16:47:00 2019 -0400

    DISPATCH-1313 - Added policyVhost attribute to the listener entity.  This optional field, if supplied, provides the vhost name to be used for policy lookup on connections arriving through the listener.  It allows multiple listeners to use different policy settings.
---
 include/qpid/dispatch/server.h                  |  6 +++
 python/qpid_dispatch/management/qdrouter.json   |  6 +++
 src/connection_manager.c                        |  3 +-
 src/policy.c                                    |  6 ++-
 tests/policy-3/test-sender-receiver-limits.json | 23 +++++++++++
 tests/system_tests_policy.py                    | 54 +++++++++++++++++++++++++
 6 files changed, 96 insertions(+), 2 deletions(-)

diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 5a4c115..043baa5 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -352,6 +352,12 @@ typedef struct qd_server_config_t {
     bool multi_tenant;
 
     /**
+     * Optional vhost to use for policy lookup.  If non-null, this overrides the vhost supplied
+     * in the OPEN from the peer only for the purpose of identifying the policy to enforce.
+     */
+    char *policy_vhost;
+
+    /**
      * The specified role of the connection.  This can be used to control the behavior and
      * capabilities of the connections.
      */
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index cbf154f..195350e 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -874,6 +874,12 @@
                     "description": "A comma separated list that indicates which components of the message should be logged. Defaults to 'none' (log nothing). If you want all properties and application properties of the message logged use 'all'. Specific components of the message can be logged by indicating the components via a comma separated list. The components are message-id, user-id, to, subject, reply-to, correlation-id, content-type, content-encoding, absolute-expiry-time, creation [...]
                     "deprecationName": "logMessage",
                     "create": true
+                },
+                "policyVhost": {
+                    "type": "string",
+                    "required": false,
+                    "description": "A listener may optionally define a virtual host to index to a specific policy to restrict the remote container to access only specific resources. This attribute defines the name of the policy vhost for this listener.  If multi-tenancy is enabled for the listener, this vhost will override the peer-supplied vhost for the purposes of identifying the desired policy settings for the connections.",
+                    "create": true
                 }
             }
         },
diff --git a/src/connection_manager.c b/src/connection_manager.c
index bea3a4b..309f605 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -304,7 +304,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     bool requireEncryption  = qd_entity_opt_bool(entity, "requireEncryption", false);    CHECK();
     bool requireSsl         = qd_entity_opt_bool(entity, "requireSsl",        false);    CHECK();
 
-    memset(config, 0, sizeof(*config));
+    ZERO(config);
     config->log_message          = qd_entity_opt_string(entity, "messageLoggingComponents", 0);     CHECK();
     config->log_bits             = populate_log_message(config);
     config->port                 = qd_entity_get_string(entity, "port");              CHECK();
@@ -332,6 +332,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     config->sasl_plugin          = qd_entity_opt_string(entity, "saslPlugin", 0);   CHECK();
     config->link_capacity        = qd_entity_opt_long(entity, "linkCapacity", 0);     CHECK();
     config->multi_tenant         = qd_entity_opt_bool(entity, "multiTenant", false);  CHECK();
+    config->policy_vhost         = qd_entity_opt_string(entity, "policyVhost", 0);    CHECK();
     set_config_host(config, entity);
 
     //
diff --git a/src/policy.c b/src/policy.c
index a6e729e..12afd74 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -1103,12 +1103,16 @@ void qd_policy_amqp_open(qd_connection_t *qd_conn) {
     qd_policy_t *policy = qd->policy;
     bool connection_allowed = true;
 
+    const char *policy_vhost = 0;
+    if (!!qd_conn->listener)
+        policy_vhost = qd_conn->listener->config.policy_vhost;
+
     if (policy->enableVhostPolicy && (!qd_conn->role || strcmp(qd_conn->role, "inter-router"))) {
         // Open connection or not based on policy.
         pn_transport_t *pn_trans = pn_connection_transport(conn);
         const char *hostip = qd_connection_remote_ip(qd_conn);
         const char *pcrh = pn_connection_remote_hostname(conn);
-        const char *vhost = (pcrh ? pcrh : "");
+        const char *vhost = (policy_vhost ? policy_vhost : (pcrh ? pcrh : ""));
         const char *conn_name = qd_connection_name(qd_conn);
 #define SETTINGS_NAME_SIZE 256
         char settings_name[SETTINGS_NAME_SIZE];
diff --git a/tests/policy-3/test-sender-receiver-limits.json b/tests/policy-3/test-sender-receiver-limits.json
index 2d21f3e..0668800 100644
--- a/tests/policy-3/test-sender-receiver-limits.json
+++ b/tests/policy-3/test-sender-receiver-limits.json
@@ -69,5 +69,28 @@
         }
       }
     }
+  ],
+  ["vhost", {
+      "hostname": "override.host.com",
+      "maxConnections": 50,
+      "maxConnectionsPerUser": 2,
+      "maxConnectionsPerHost": 4,
+      "allowUnknownUser": true,
+      "groups": {
+        "$default" : {
+          "remoteHosts": "*",
+          "maxFrameSize":     222222,
+          "maxMessageSize":   222222,
+          "maxSessionWindow": 222222,
+          "maxSessions":           2,
+          "maxSenders":            3,
+          "maxReceivers":          5,
+          "allowDynamicSource":   true,
+          "allowAnonymousSender": true,
+          "sources": "*",
+          "targets": "*"
+        }
+      }
+    }
   ]
 ]
diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py
index d0ffc5b..1a260e8 100644
--- a/tests/system_tests_policy.py
+++ b/tests/system_tests_policy.py
@@ -313,6 +313,60 @@ class SenderReceiverLimits(TestCase):
         bs1.close()
 
 
+class PolicyVhostOverride(TestCase):
+    """
+    Verify that specifying a policy folder from the router conf file
+    effects loading the policies in that folder.
+    This test relies on qdmanage utility.
+    """
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(PolicyVhostOverride, cls).setUpClass()
+        policy_config_path = os.path.join(DIR, 'policy-3')
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR.Policy'}),
+            ('listener', {'port': cls.tester.get_port(), 'policyVhost': 'override.host.com'}),
+            ('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'})
+        ])
+
+        cls.router = cls.tester.qdrouterd('PolicyVhostOverride', config, wait=True)
+
+    def address(self):
+        return self.router.addresses[0]
+
+    def test_verify_n_receivers(self):
+        n = 4
+        addr = self.address()
+        br1 = BlockingConnection(addr)
+
+        # n receivers OK
+        br1.create_receiver(address="****YES_1of5***")
+        br1.create_receiver(address="****YES_20f5****")
+        br1.create_receiver(address="****YES_3of5****")
+        br1.create_receiver(address="****YES_4of5****")
+        br1.create_receiver(address="****YES_5of5****")
+
+        # receiver n+1 should be denied
+        self.assertRaises(LinkDetached, br1.create_receiver, "****NO****")
+
+        br1.close()
+
+    def test_verify_n_senders(self):
+        n = 2
+        addr = self.address()
+        bs1 = BlockingConnection(addr)
+
+        # n senders OK
+        bs1.create_sender(address="****YES_1of3****")
+        bs1.create_sender(address="****YES_2of3****")
+        bs1.create_sender(address="****YES_3of3****")
+        # sender n+1 should be denied
+        self.assertRaises(LinkDetached, bs1.create_sender, "****NO****")
+
+        bs1.close()
+
+
 class InterrouterLinksAllowed(TestCase):
 
     inter_router_port = None


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/04: DISPATCH-1314: suppress spurious errors from log

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit c354fa42cc7159e155639a16c9589bae85665799
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Apr 8 15:48:49 2019 +0100

    DISPATCH-1314: suppress spurious errors from log
---
 src/http-libwebsockets.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index 06f7045..2da2ff0 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -37,6 +37,7 @@
 #include "config.h"
 
 static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH"; /* Default */
+static const char *IGNORED = "ignore-this-log-message";
 
 /* Log for LWS messages. For dispatch server messages use qd_http_server_t::log */
 static qd_log_source_t* http_log;
@@ -52,6 +53,7 @@ static qd_log_level_t qd_level(int lll) {
 }
 
 static void logger(int lll, const char *line)  {
+    if (strstr(line, IGNORED)) return;
     size_t  len = strlen(line);
     while (len > 1 && isspace(line[len-1])) { /* Strip trailing newline */
         --len;
@@ -318,6 +320,7 @@ static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) {
         metrics->mountpoint_len = strlen(metrics->mountpoint);
         metrics->origin_protocol = LWSMPRO_CALLBACK;
         metrics->protocol = "http";
+        metrics->origin = IGNORED;
     }
     if (config->healthz) {
         struct lws_http_mount *healthz = &hl->healthz;
@@ -326,6 +329,7 @@ static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) {
         healthz->mountpoint_len = strlen(healthz->mountpoint);
         healthz->origin_protocol = LWSMPRO_CALLBACK;
         healthz->protocol = "healthz";
+        healthz->origin = IGNORED;
     }
 
     struct lws_context_creation_info info = {0};


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 04/04: DISPATCH-1316: atomic checking for deletability

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 2f91e200a1cf5739e0163709cbb1fa14f8302d1c
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Apr 8 22:20:16 2019 +0100

    DISPATCH-1316: atomic checking for deletability
---
 src/remote_sasl.c | 53 +++++++++++++++++++++++++++++++++++------------------
 1 file changed, 35 insertions(+), 18 deletions(-)

diff --git a/src/remote_sasl.c b/src/remote_sasl.c
index 6d79191..82b7887 100644
--- a/src/remote_sasl.c
+++ b/src/remote_sasl.c
@@ -46,7 +46,6 @@ const int8_t DOWNSTREAM_MECHANISMS_RECEIVED = 3;
 const int8_t DOWNSTREAM_CHALLENGE_RECEIVED = 4;
 const int8_t DOWNSTREAM_OUTCOME_RECEIVED = 5;
 const int8_t DOWNSTREAM_CLOSED = 6;
-const int8_t UPSTREAM_CLOSED = 7;
 
 typedef struct {
     size_t used;
@@ -110,6 +109,8 @@ typedef struct
     char* username;
     permissions_t permissions;
     pn_sasl_outcome_t outcome;
+
+    sys_mutex_t *lock;
 } qdr_sasl_relay_t;
 
 static void copy_bytes(const pn_bytes_t* from, qdr_owned_bytes_t* to)
@@ -132,6 +133,7 @@ static qdr_sasl_relay_t* new_qdr_sasl_relay_t(const char* address, const char* s
     }
     instance->proactor = proactor;
     init_permissions(&instance->permissions);
+    instance->lock = sys_mutex();
     return instance;
 }
 
@@ -147,6 +149,7 @@ static void delete_qdr_sasl_relay_t(qdr_sasl_relay_t* instance)
     if (instance->username) free(instance->username);
     free_buffer(&(instance->permissions.targets));
     free_buffer(&(instance->permissions.sources));
+    sys_mutex_free(instance->lock);
     free(instance);
 }
 
@@ -266,23 +269,40 @@ static bool notify_downstream(qdr_sasl_relay_t* impl, uint8_t state)
     }
 }
 
+static bool delete_on_downstream_freed(qdr_sasl_relay_t* impl)
+{
+    bool result;
+    sys_mutex_lock(impl->lock);
+    impl->downstream_released = true;
+    result = impl->upstream_released;
+    sys_mutex_unlock(impl->lock);
+    return result;
+}
+
+static bool delete_on_upstream_freed(qdr_sasl_relay_t* impl)
+{
+    bool result;
+    sys_mutex_lock(impl->lock);
+    impl->upstream_released = true;
+    result = impl->downstream_released || impl->downstream == 0;
+    sys_mutex_unlock(impl->lock);
+    return result;
+}
+
+static bool can_delete(pn_transport_t *transport, qdr_sasl_relay_t* impl)
+{
+    if (pnx_sasl_is_client(transport)) {
+        return delete_on_downstream_freed(impl);
+    } else {
+        return delete_on_upstream_freed(impl);
+    }
+}
+
 static void remote_sasl_free(pn_transport_t *transport)
 {
     qdr_sasl_relay_t* impl = (qdr_sasl_relay_t*) pnx_sasl_get_context(transport);
-    if (impl) {
-        if (pnx_sasl_is_client(transport)) {
-            impl->downstream_released = true;
-            if (impl->upstream_released) {
-                delete_qdr_sasl_relay_t(impl);
-            }
-        } else {
-            impl->upstream_released = true;
-            if (impl->downstream_released || impl->downstream == 0) {
-                delete_qdr_sasl_relay_t(impl);
-            } else {
-                notify_downstream(impl, UPSTREAM_CLOSED);
-            }
-        }
+    if (impl && can_delete(transport, impl)) {
+        delete_qdr_sasl_relay_t(impl);
     }
 }
 
@@ -317,9 +337,6 @@ static void remote_sasl_prepare(pn_transport_t *transport)
         } else if (impl->downstream_state == UPSTREAM_RESPONSE_RECEIVED) {
             pnx_sasl_set_bytes_out(transport, pn_bytes(impl->response.size, impl->response.start));
             pnx_sasl_set_desired_state(transport, SASL_POSTED_RESPONSE);
-        } else if (impl->downstream_state == UPSTREAM_CLOSED) {
-            impl->downstream_state = 0;
-            pn_transport_close_head(transport);
         }
         impl->downstream_state = 0;
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 03/04: DISPATCH-1315: ensure that the state shared by threads other than the http thread cannot be prematurely deleted

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit dfadc670b4b80db44054061d7d9166b2e56681aa
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Apr 8 19:39:54 2019 +0100

    DISPATCH-1315: ensure that the state shared by threads other than the http thread cannot be prematurely deleted
---
 src/http-libwebsockets.c | 76 ++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 60 insertions(+), 16 deletions(-)

diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index 2da2ff0..db1c10b 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -98,12 +98,18 @@ typedef struct connection_t {
     struct lws *wsi;
 } connection_t;
 
-typedef struct stats_t {
-    size_t current;
-    bool headers_sent;
+typedef struct stats_request_state_t {
+    bool callback_completed;
+    bool wsi_deleted;
     qdr_global_stats_t stats;
     qd_http_server_t *server;
     struct lws *wsi;
+} stats_request_state_t;
+
+typedef struct stats_t {
+    size_t current;
+    bool headers_sent;
+    stats_request_state_t *context;
 } stats_t;
 
 /* Navigating from WSI pointer to qd objects */
@@ -409,13 +415,33 @@ static void connection_wake(qd_connection_t *qd_conn)
     }
 }
 
+/**
+ * Called on router worker thread
+ */
 static void handle_stats_results(void *context)
 {
-    stats_t* stats = (stats_t*) context;
-    qd_http_server_t *hs = stats->server;
-    if (hs) {
-        work_t w = { W_HANDLE_STATS, stats->wsi };
-        work_push(hs, w);
+    stats_request_state_t* state = (stats_request_state_t*) context;
+    if (state->wsi_deleted) {
+        free(state);
+    } else {
+        qd_http_server_t *hs = state->server;
+        if (hs) {
+            work_t w = { W_HANDLE_STATS, state };
+            work_push(hs, w);
+        }
+    }
+}
+
+/**
+ * Called on http thread
+ */
+static void handle_stats_result_HT(stats_request_state_t* state)
+{
+    if (state->wsi_deleted) {
+        free(state);
+    } else {
+        state->callback_completed = true;
+        lws_callback_on_writable(state->wsi);
     }
 }
 
@@ -499,10 +525,12 @@ static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
     switch (reason) {
 
     case LWS_CALLBACK_HTTP: {
-        stats->wsi = wsi;
-        stats->server = hs;
+        stats->context = NEW(stats_request_state_t);
+        ZERO(stats->context);
+        stats->context->wsi = wsi;
+        stats->context->server = hs;
         //request stats from core thread
-        qdr_request_global_stats(hs->core, &stats->stats, handle_stats_results, (void*) stats);
+        qdr_request_global_stats(hs->core, &stats->context->stats, handle_stats_results, (void*) stats->context);
         return 0;
     }
 
@@ -519,7 +547,7 @@ static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
         }
 
         while (stats->current < metrics_length) {
-            if (write_metric(&position, end, &metrics[stats->current], &stats->stats)) {
+            if (write_metric(&position, end, &metrics[stats->current], &stats->context->stats)) {
                 stats->current++;
                 qd_log(hs->log, QD_LOG_DEBUG, "wrote metric %i of %i", stats->current, metrics_length);
             } else {
@@ -541,6 +569,13 @@ static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
         return 0;
     }
 
+    case LWS_CALLBACK_CLOSED: {
+        stats->context->wsi_deleted = true;
+        if (stats->context->callback_completed) {
+            free(stats->context);
+        }
+    }
+
     default:
         return 0;
     }
@@ -557,12 +592,14 @@ static int callback_healthz(struct lws *wsi, enum lws_callback_reasons reason,
     switch (reason) {
 
     case LWS_CALLBACK_HTTP: {
-        stats->wsi = wsi;
-        stats->server = hs;
+        stats->context = NEW(stats_request_state_t);
+        ZERO(stats->context);
+        stats->context->wsi = wsi;
+        stats->context->server = hs;
         //make dummy request for stats (pass in null ptr); this still excercises the
         //path through core thread and back through callback on io thread which is
         //a resonable initial liveness check
-        qdr_request_global_stats(hs->core, 0, handle_stats_results, (void*) stats);
+        qdr_request_global_stats(hs->core, 0, handle_stats_results, (void*) stats->context);
         return 0;
     }
 
@@ -589,6 +626,13 @@ static int callback_healthz(struct lws *wsi, enum lws_callback_reasons reason,
         else return 0;
     }
 
+    case LWS_CALLBACK_CLOSED: {
+        stats->context->wsi_deleted = true;
+        if (stats->context->callback_completed) {
+            free(stats->context);
+        }
+    }
+
     default:
         return 0;
     }
@@ -728,7 +772,7 @@ static void* http_thread_run(void* v) {
                 listener_close((qd_http_listener_t*)w.value, hs);
                 break;
             case W_HANDLE_STATS:
-                lws_callback_on_writable((struct lws*) w.value);
+                handle_stats_result_HT((stats_request_state_t*) w.value);
                 break;
             case W_WAKE: {
                 connection_t *c = w.value;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org