You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/09/16 16:16:32 UTC

qpid-dispatch git commit: DISPATCH-479 - Use atomic ops for ref_counts [Patch from Ulf Lilleengen] This closes #96

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 1ee98d495 -> 113228490


DISPATCH-479 - Use atomic ops for ref_counts [Patch from Ulf Lilleengen]
This closes #96


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/11322849
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/11322849
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/11322849

Branch: refs/heads/master
Commit: 113228490fcae32c9f1b6a898147e4f0ff0fe504
Parents: 1ee98d4
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 16 12:14:32 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 16 12:14:32 2016 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/atomic.h        | 144 +++++++++++++++++++++++++++++
 src/connection_manager.c              |  43 ++++-----
 src/message.c                         |  10 +-
 src/message_private.h                 |   3 +-
 src/router_core/forwarder.c           |   4 +-
 src/router_core/router_core_private.h |   3 +-
 src/router_core/transfer.c            |  22 ++---
 7 files changed, 178 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/include/qpid/dispatch/atomic.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h
new file mode 100644
index 0000000..567b722
--- /dev/null
+++ b/include/qpid/dispatch/atomic.h
@@ -0,0 +1,144 @@
+#ifndef __sys_atomic_h__
+#define __sys_atomic_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**@file
+ * Portable atomic operations on uint32_t.
+ */
+
+#include <stdint.h>
+
+/******************************************************************************
+ * C11 atomics                                                                *
+ ******************************************************************************/
+#if defined(__STDC__) && (__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_ATOMICS__)
+
+#include <stdatomic.h>
+typedef atomic_uint sys_atomic_t;
+
+static inline void sys_atomic_init(sys_atomic_t *ref, uint32_t value)
+{
+    atomic_store(ref, value);
+}
+
+static inline uint32_t sys_atomic_add(sys_atomic_t *ref, uint32_t value)
+{
+    return atomic_fetch_add(ref, value);
+}
+
+static inline uint32_t sys_atomic_sub(sys_atomic_t *ref, uint32_t value)
+{
+    return atomic_fetch_sub(ref, value);
+}
+
+static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
+{
+    return atomic_load(ref);
+}
+
+static inline void sys_atomic_destroy(sys_atomic_t *ref) {}
+
+#elif defined(__GNUC__) || defined(__clang__)
+
+/******************************************************************************
+ * GCC specific atomics                                                       *
+ ******************************************************************************/
+
+typedef volatile uint32_t sys_atomic_t;
+
+static inline void sys_atomic_init(sys_atomic_t *ref, uint32_t value)
+{
+    *ref = value;
+}
+
+static inline uint32_t sys_atomic_add(sys_atomic_t *ref, uint32_t value)
+{
+    return __sync_fetch_and_add(ref, value);
+}
+
+static inline uint32_t sys_atomic_sub(sys_atomic_t *ref, uint32_t value)
+{
+    return __sync_fetch_and_sub(ref, value);
+}
+
+static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
+{
+    return *ref;
+}
+
+static inline void sys_atomic_destroy(sys_atomic_t *ref) {}
+
+#else
+
+/******************************************************************************
+ * Mutex fallback atomics                                                     *
+ ******************************************************************************/
+#include <qpid/dispatch/threading.h>
+
+struct sys_atomic_t {
+    sys_mutex_t *lock;
+    uint32_t value;
+};
+typedef struct sys_atomic_t sys_atomic_t;
+
+static inline void sys_atomic_init(sys_atomic_t *ref, uint32_t value)
+{
+    ref->lock = sys_mutex();
+    ref->value = value;
+}
+
+static inline uint32_t sys_atomic_add(sys_atomic_t *ref, uint32_t value)
+{
+    sys_mutex_lock(ref->lock);
+    uint32_t prev = ref->value;
+    ref->value += value;
+    sys_mutex_unlock(ref->lock);
+    return prev;
+}
+
+static inline uint32_t sys_atomic_sub(sys_atomic_t *ref, uint32_t value)
+{
+    sys_mutex_lock(ref->lock);
+    uint32_t prev = ref->value;
+    ref->value -= value;
+    sys_mutex_unlock(ref->lock);
+    return prev;
+}
+
+static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
+{
+    sys_mutex_lock(ref->lock);
+    uint32_t value = ref->value;
+    sys_mutex_unlock(ref->lock);
+    return value;
+}
+
+static inline void sys_atomic_destroy(sys_atomic_t *ref)
+{
+    sys_mutex_lock(ref->lock);
+    sys_mutex_free(ref->lock);
+}
+
+#endif
+
+#define sys_atomic_inc(ref) sys_atomic_add((ref), 1)
+#define sys_atomic_dec(ref) sys_atomic_sub((ref), 1)
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index bdea6aa..a779fd6 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -20,6 +20,7 @@
 #include <qpid/dispatch/connection_manager.h>
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/atomic.h>
 #include "dispatch_private.h"
 #include "connection_manager_private.h"
 #include "server_private.h"
@@ -33,16 +34,16 @@ static char* HOST_ADDR_DEFAULT = "127.0.0.1";
 
 struct qd_config_ssl_profile_t {
     DEQ_LINKS(qd_config_ssl_profile_t);
-    uint64_t   identity;
-    char      *name;
-    char      *ssl_password;
-    char      *ssl_trusted_certificate_db;
-    char      *ssl_trusted_certificates;
-    char      *ssl_uid_format;
-    char      *ssl_display_name_file;
-    char      *ssl_certificate_file;
-    char      *ssl_private_key_file;
-    int       ref_count;
+    uint64_t     identity;
+    char        *name;
+    char        *ssl_password;
+    char        *ssl_trusted_certificate_db;
+    char        *ssl_trusted_certificates;
+    char        *ssl_uid_format;
+    char        *ssl_display_name_file;
+    char        *ssl_certificate_file;
+    char        *ssl_private_key_file;
+    sys_atomic_t ref_count;
 };
 
 struct qd_config_listener_t {
@@ -224,9 +225,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
             config->ssl_uid_format = (*ssl_profile)->ssl_uid_format;
             config->ssl_display_name_file = (*ssl_profile)->ssl_display_name_file;
         }
-        sys_mutex_lock(qd->connection_manager->ssl_profile_lock);
-        (*ssl_profile)->ref_count++;
-        sys_mutex_unlock(qd->connection_manager->ssl_profile_lock);
+        sys_atomic_inc(&(*ssl_profile)->ref_count);
     }
 
     free(stripAnnotations);
@@ -254,9 +253,8 @@ qd_config_ssl_profile_t *qd_dispatch_configure_ssl_profile(qd_dispatch_t *qd, qd
     ssl_profile->ssl_trusted_certificates   = qd_entity_opt_string(entity, "trustedCerts", 0); CHECK();
     ssl_profile->ssl_uid_format             = qd_entity_opt_string(entity, "uidFormat", 0); CHECK();
     ssl_profile->ssl_display_name_file      = qd_entity_opt_string(entity, "displayNameFile", 0); CHECK();
-    sys_mutex_lock(qd->connection_manager->ssl_profile_lock);
-    ssl_profile->ref_count                  = 0;
-    sys_mutex_unlock(qd->connection_manager->ssl_profile_lock);
+
+    sys_atomic_init(&ssl_profile->ref_count, 0);
     qd_log(cm->log_source, QD_LOG_INFO, "Created SSL Profile with name %s ", ssl_profile->name);
     return ssl_profile;
 
@@ -413,9 +411,7 @@ void qd_config_connector_free(qd_connection_manager_t *cm, qd_config_connector_t
         qd_server_connector_free(cc->connector);
 
     if (cc->ssl_profile) {
-        sys_mutex_lock(cm->ssl_profile_lock);
-        cc->ssl_profile->ref_count--;
-        sys_mutex_unlock(cm->ssl_profile_lock);
+        sys_atomic_dec(&cc->ssl_profile->ref_count);
     }
 
     free(cc);
@@ -431,9 +427,7 @@ void qd_config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t *
     }
 
     if (cl->ssl_profile) {
-        sys_mutex_lock(cm->ssl_profile_lock);
-        cl->ssl_profile->ref_count--;
-        sys_mutex_unlock(cm->ssl_profile_lock);
+        sys_atomic_dec(&cl->ssl_profile->ref_count);
     }
 
     free(cl);
@@ -442,12 +436,9 @@ void qd_config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t *
 
 bool qd_config_ssl_profile_free(qd_connection_manager_t *cm, qd_config_ssl_profile_t *ssl_profile)
 {
-    sys_mutex_lock(cm->ssl_profile_lock);
-    if (ssl_profile->ref_count != 0) {
-        sys_mutex_unlock(cm->ssl_profile_lock);
+    if (sys_atomic_get(&ssl_profile->ref_count) != 0) {
         return false;
     }
-    sys_mutex_unlock(cm->ssl_profile_lock);
 
     DEQ_REMOVE(cm->config_ssl_profiles, ssl_profile);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 99fcae8..ab16e9d 100644
--- a/src/message.c
+++ b/src/message.c
@@ -561,7 +561,7 @@ qd_message_t *qd_message()
 
     memset(msg->content, 0, sizeof(qd_message_content_t));
     msg->content->lock        = sys_mutex();
-    msg->content->ref_count   = 1;
+    sys_atomic_init(&msg->content->ref_count, 1);
     msg->content->parse_depth = QD_DEPTH_NONE;
     msg->content->parsed_message_annotations = 0;
 
@@ -581,9 +581,7 @@ void qd_message_free(qd_message_t *in_msg)
 
     qd_message_content_t *content = msg->content;
 
-    sys_mutex_lock(content->lock);
-    rc = --content->ref_count;
-    sys_mutex_unlock(content->lock);
+    rc = sys_atomic_dec(&content->ref_count) - 1;
 
     if (rc == 0) {
         if (content->parsed_message_annotations)
@@ -621,9 +619,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
 
     copy->content = content;
 
-    sys_mutex_lock(content->lock);
-    content->ref_count++;
-    sys_mutex_unlock(content->lock);
+    sys_atomic_inc(&content->ref_count);
 
     return (qd_message_t*) copy;
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 8ede2c7..0387302 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -22,6 +22,7 @@
 #include <qpid/dispatch/message.h>
 #include "alloc.h"
 #include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/atomic.h>
 
 /** @file
  * Message representation.
@@ -65,7 +66,7 @@ typedef struct {
 
 typedef struct {
     sys_mutex_t         *lock;
-    uint32_t             ref_count;                       // The number of messages referencing this
+    sys_atomic_t         ref_count;                       // The number of messages referencing this
     qd_buffer_list_t     buffers;                         // The buffer chain containing the message
     qd_field_location_t  section_message_header;          // The message header list
     qd_field_location_t  section_delivery_annotation;     // The delivery annotation map

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index edd540d..1f4031b 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -118,7 +118,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
             dlv->peer = in_dlv;
             in_dlv->peer = dlv;
 
-            dlv->ref_count = 1;
+            sys_atomic_init(&dlv->ref_count, 1);
             qdr_delivery_incref(in_dlv);
         }
     }
@@ -162,7 +162,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
 
     DEQ_INSERT_TAIL(link->undelivered, dlv);
     dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
-    dlv->ref_count++; // We have the lock, don't use the incref function
+    sys_atomic_inc(&dlv->ref_count);
 
     //
     // If the link isn't already on the links_with_deliveries list, put it there.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 594e200..02568ce 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -22,6 +22,7 @@
 #include "dispatch_private.h"
 #include <qpid/dispatch/router_core.h>
 #include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/atomic.h>
 #include <qpid/dispatch/log.h>
 #include <memory.h>
 
@@ -201,7 +202,7 @@ typedef enum {
 struct qdr_delivery_t {
     DEQ_LINKS(qdr_delivery_t);
     void                *context;
-    int                  ref_count;
+    sys_atomic_t         ref_count;
     qdr_link_t          *link;
     qdr_delivery_t      *peer;
     qd_message_t        *msg;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index b1348a1..9a30cdf 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -44,7 +44,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_i
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
-    dlv->ref_count      = 1;    // referenced by the action
+    sys_atomic_init(&dlv->ref_count, 1); // referenced by the action
     dlv->link           = link;
     dlv->msg            = msg;
     dlv->to_addr        = 0;
@@ -66,7 +66,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
-    dlv->ref_count      = 1;    // referenced by the action
+    sys_atomic_init(&dlv->ref_count, 1); // referenced by the action
     dlv->link           = link;
     dlv->msg            = msg;
     dlv->to_addr        = addr;
@@ -90,7 +90,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
-    dlv->ref_count = 1;    // referenced by the action
+    sys_atomic_init(&dlv->ref_count, 1); // referenced by the action
     dlv->link      = link;
     dlv->msg       = msg;
     dlv->settled   = settled;
@@ -248,15 +248,12 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
     return delivery->context;
 }
 
-
 void qdr_delivery_incref(qdr_delivery_t *delivery)
 {
     qdr_connection_t *conn = delivery->link ? delivery->link->conn : 0;
 
     if (!!conn) {
-        sys_mutex_lock(conn->work_lock);
-        delivery->ref_count++;
-        sys_mutex_unlock(conn->work_lock);
+        sys_atomic_inc(&delivery->ref_count);
     }
 }
 
@@ -267,12 +264,9 @@ static void qdr_delivery_decref_internal(qdr_delivery_t *delivery, bool lock_hel
     bool              delete = false;
     
     if (!!conn) {
-        if (!lock_held)
-            sys_mutex_lock(conn->work_lock);
-        assert(delivery->ref_count > 0);
-        delete = --delivery->ref_count == 0;
-        if (!lock_held)
-            sys_mutex_unlock(conn->work_lock);
+        uint32_t ref_count = sys_atomic_dec(&delivery->ref_count);
+        assert(ref_count > 0);
+        delete = (ref_count - 1) == 0;
     }
 
     if (delete) {
@@ -805,7 +799,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 
     sys_mutex_lock(link->conn->work_lock);
     if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
-        dlv->ref_count++; // We have the lock, don't use the incref function
+        sys_atomic_inc(&dlv->ref_count);
         qdr_add_delivery_ref(&link->updated_deliveries, dlv);
         qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
         activate = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org