You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/09/16 16:16:32 UTC
qpid-dispatch git commit: DISPATCH-479 - Use atomic ops for
ref_counts [Patch from Ulf Lilleengen] This closes #96
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 1ee98d495 -> 113228490
DISPATCH-479 - Use atomic ops for ref_counts [Patch from Ulf Lilleengen]
This closes #96
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/11322849
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/11322849
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/11322849
Branch: refs/heads/master
Commit: 113228490fcae32c9f1b6a898147e4f0ff0fe504
Parents: 1ee98d4
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 16 12:14:32 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 16 12:14:32 2016 -0400
----------------------------------------------------------------------
include/qpid/dispatch/atomic.h | 144 +++++++++++++++++++++++++++++
src/connection_manager.c | 43 ++++-----
src/message.c | 10 +-
src/message_private.h | 3 +-
src/router_core/forwarder.c | 4 +-
src/router_core/router_core_private.h | 3 +-
src/router_core/transfer.c | 22 ++---
7 files changed, 178 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/include/qpid/dispatch/atomic.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h
new file mode 100644
index 0000000..567b722
--- /dev/null
+++ b/include/qpid/dispatch/atomic.h
@@ -0,0 +1,144 @@
+#ifndef __sys_atomic_h__
+#define __sys_atomic_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**@file
+ * Portable atomic operations on uint32_t.
+ */
+
+#include <stdint.h>
+
+/******************************************************************************
+ * C11 atomics *
+ ******************************************************************************/
+#if defined(__STDC__) && (__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_ATOMICS__)
+
+#include <stdatomic.h>
+typedef atomic_uint sys_atomic_t;
+
+static inline void sys_atomic_init(sys_atomic_t *ref, uint32_t value)
+{
+ atomic_store(ref, value);
+}
+
+static inline uint32_t sys_atomic_add(sys_atomic_t *ref, uint32_t value)
+{
+ return atomic_fetch_add(ref, value);
+}
+
+static inline uint32_t sys_atomic_sub(sys_atomic_t *ref, uint32_t value)
+{
+ return atomic_fetch_sub(ref, value);
+}
+
+static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
+{
+ return atomic_load(ref);
+}
+
+static inline void sys_atomic_destroy(sys_atomic_t *ref) {}
+
+#elif defined(__GNUC__) || defined(__clang__)
+
+/******************************************************************************
+ * GCC specific atomics *
+ ******************************************************************************/
+
+typedef volatile uint32_t sys_atomic_t;
+
+static inline void sys_atomic_init(sys_atomic_t *ref, uint32_t value)
+{
+ *ref = value;
+}
+
+static inline uint32_t sys_atomic_add(sys_atomic_t *ref, uint32_t value)
+{
+ return __sync_fetch_and_add(ref, value);
+}
+
+static inline uint32_t sys_atomic_sub(sys_atomic_t *ref, uint32_t value)
+{
+ return __sync_fetch_and_sub(ref, value);
+}
+
+static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
+{
+ return *ref;
+}
+
+static inline void sys_atomic_destroy(sys_atomic_t *ref) {}
+
+#else
+
+/******************************************************************************
+ * Mutex fallback atomics *
+ ******************************************************************************/
+#include <qpid/dispatch/threading.h>
+
+struct sys_atomic_t {
+ sys_mutex_t *lock;
+ uint32_t value;
+};
+typedef struct sys_atomic_t sys_atomic_t;
+
+static inline void sys_atomic_init(sys_atomic_t *ref, uint32_t value)
+{
+ ref->lock = sys_mutex();
+ ref->value = value;
+}
+
+static inline uint32_t sys_atomic_add(sys_atomic_t *ref, uint32_t value)
+{
+ sys_mutex_lock(ref->lock);
+ uint32_t prev = ref->value;
+ ref->value += value;
+ sys_mutex_unlock(ref->lock);
+ return prev;
+}
+
+static inline uint32_t sys_atomic_sub(sys_atomic_t *ref, uint32_t value)
+{
+ sys_mutex_lock(ref->lock);
+ uint32_t prev = ref->value;
+ ref->value -= value;
+ sys_mutex_unlock(ref->lock);
+ return prev;
+}
+
+static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
+{
+ sys_mutex_lock(ref->lock);
+ uint32_t value = ref->value;
+ sys_mutex_unlock(ref->lock);
+ return value;
+}
+
+static inline void sys_atomic_destroy(sys_atomic_t *ref)
+{
+ sys_mutex_lock(ref->lock);
+ sys_mutex_free(ref->lock);
+}
+
+#endif
+
+#define sys_atomic_inc(ref) sys_atomic_add((ref), 1)
+#define sys_atomic_dec(ref) sys_atomic_sub((ref), 1)
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index bdea6aa..a779fd6 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -20,6 +20,7 @@
#include <qpid/dispatch/connection_manager.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/atomic.h>
#include "dispatch_private.h"
#include "connection_manager_private.h"
#include "server_private.h"
@@ -33,16 +34,16 @@ static char* HOST_ADDR_DEFAULT = "127.0.0.1";
struct qd_config_ssl_profile_t {
DEQ_LINKS(qd_config_ssl_profile_t);
- uint64_t identity;
- char *name;
- char *ssl_password;
- char *ssl_trusted_certificate_db;
- char *ssl_trusted_certificates;
- char *ssl_uid_format;
- char *ssl_display_name_file;
- char *ssl_certificate_file;
- char *ssl_private_key_file;
- int ref_count;
+ uint64_t identity;
+ char *name;
+ char *ssl_password;
+ char *ssl_trusted_certificate_db;
+ char *ssl_trusted_certificates;
+ char *ssl_uid_format;
+ char *ssl_display_name_file;
+ char *ssl_certificate_file;
+ char *ssl_private_key_file;
+ sys_atomic_t ref_count;
};
struct qd_config_listener_t {
@@ -224,9 +225,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
config->ssl_uid_format = (*ssl_profile)->ssl_uid_format;
config->ssl_display_name_file = (*ssl_profile)->ssl_display_name_file;
}
- sys_mutex_lock(qd->connection_manager->ssl_profile_lock);
- (*ssl_profile)->ref_count++;
- sys_mutex_unlock(qd->connection_manager->ssl_profile_lock);
+ sys_atomic_inc(&(*ssl_profile)->ref_count);
}
free(stripAnnotations);
@@ -254,9 +253,8 @@ qd_config_ssl_profile_t *qd_dispatch_configure_ssl_profile(qd_dispatch_t *qd, qd
ssl_profile->ssl_trusted_certificates = qd_entity_opt_string(entity, "trustedCerts", 0); CHECK();
ssl_profile->ssl_uid_format = qd_entity_opt_string(entity, "uidFormat", 0); CHECK();
ssl_profile->ssl_display_name_file = qd_entity_opt_string(entity, "displayNameFile", 0); CHECK();
- sys_mutex_lock(qd->connection_manager->ssl_profile_lock);
- ssl_profile->ref_count = 0;
- sys_mutex_unlock(qd->connection_manager->ssl_profile_lock);
+
+ sys_atomic_init(&ssl_profile->ref_count, 0);
qd_log(cm->log_source, QD_LOG_INFO, "Created SSL Profile with name %s ", ssl_profile->name);
return ssl_profile;
@@ -413,9 +411,7 @@ void qd_config_connector_free(qd_connection_manager_t *cm, qd_config_connector_t
qd_server_connector_free(cc->connector);
if (cc->ssl_profile) {
- sys_mutex_lock(cm->ssl_profile_lock);
- cc->ssl_profile->ref_count--;
- sys_mutex_unlock(cm->ssl_profile_lock);
+ sys_atomic_dec(&cc->ssl_profile->ref_count);
}
free(cc);
@@ -431,9 +427,7 @@ void qd_config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t *
}
if (cl->ssl_profile) {
- sys_mutex_lock(cm->ssl_profile_lock);
- cl->ssl_profile->ref_count--;
- sys_mutex_unlock(cm->ssl_profile_lock);
+ sys_atomic_dec(&cl->ssl_profile->ref_count);
}
free(cl);
@@ -442,12 +436,9 @@ void qd_config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t *
bool qd_config_ssl_profile_free(qd_connection_manager_t *cm, qd_config_ssl_profile_t *ssl_profile)
{
- sys_mutex_lock(cm->ssl_profile_lock);
- if (ssl_profile->ref_count != 0) {
- sys_mutex_unlock(cm->ssl_profile_lock);
+ if (sys_atomic_get(&ssl_profile->ref_count) != 0) {
return false;
}
- sys_mutex_unlock(cm->ssl_profile_lock);
DEQ_REMOVE(cm->config_ssl_profiles, ssl_profile);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 99fcae8..ab16e9d 100644
--- a/src/message.c
+++ b/src/message.c
@@ -561,7 +561,7 @@ qd_message_t *qd_message()
memset(msg->content, 0, sizeof(qd_message_content_t));
msg->content->lock = sys_mutex();
- msg->content->ref_count = 1;
+ sys_atomic_init(&msg->content->ref_count, 1);
msg->content->parse_depth = QD_DEPTH_NONE;
msg->content->parsed_message_annotations = 0;
@@ -581,9 +581,7 @@ void qd_message_free(qd_message_t *in_msg)
qd_message_content_t *content = msg->content;
- sys_mutex_lock(content->lock);
- rc = --content->ref_count;
- sys_mutex_unlock(content->lock);
+ rc = sys_atomic_dec(&content->ref_count) - 1;
if (rc == 0) {
if (content->parsed_message_annotations)
@@ -621,9 +619,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
copy->content = content;
- sys_mutex_lock(content->lock);
- content->ref_count++;
- sys_mutex_unlock(content->lock);
+ sys_atomic_inc(&content->ref_count);
return (qd_message_t*) copy;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 8ede2c7..0387302 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -22,6 +22,7 @@
#include <qpid/dispatch/message.h>
#include "alloc.h"
#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/atomic.h>
/** @file
* Message representation.
@@ -65,7 +66,7 @@ typedef struct {
typedef struct {
sys_mutex_t *lock;
- uint32_t ref_count; // The number of messages referencing this
+ sys_atomic_t ref_count; // The number of messages referencing this
qd_buffer_list_t buffers; // The buffer chain containing the message
qd_field_location_t section_message_header; // The message header list
qd_field_location_t section_delivery_annotation; // The delivery annotation map
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index edd540d..1f4031b 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -118,7 +118,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
dlv->peer = in_dlv;
in_dlv->peer = dlv;
- dlv->ref_count = 1;
+ sys_atomic_init(&dlv->ref_count, 1);
qdr_delivery_incref(in_dlv);
}
}
@@ -162,7 +162,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
DEQ_INSERT_TAIL(link->undelivered, dlv);
dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
- dlv->ref_count++; // We have the lock, don't use the incref function
+ sys_atomic_inc(&dlv->ref_count);
//
// If the link isn't already on the links_with_deliveries list, put it there.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 594e200..02568ce 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -22,6 +22,7 @@
#include "dispatch_private.h"
#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/atomic.h>
#include <qpid/dispatch/log.h>
#include <memory.h>
@@ -201,7 +202,7 @@ typedef enum {
struct qdr_delivery_t {
DEQ_LINKS(qdr_delivery_t);
void *context;
- int ref_count;
+ sys_atomic_t ref_count;
qdr_link_t *link;
qdr_delivery_t *peer;
qd_message_t *msg;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index b1348a1..9a30cdf 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -44,7 +44,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_i
qdr_delivery_t *dlv = new_qdr_delivery_t();
ZERO(dlv);
- dlv->ref_count = 1; // referenced by the action
+ sys_atomic_init(&dlv->ref_count, 1); // referenced by the action
dlv->link = link;
dlv->msg = msg;
dlv->to_addr = 0;
@@ -66,7 +66,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
qdr_delivery_t *dlv = new_qdr_delivery_t();
ZERO(dlv);
- dlv->ref_count = 1; // referenced by the action
+ sys_atomic_init(&dlv->ref_count, 1); // referenced by the action
dlv->link = link;
dlv->msg = msg;
dlv->to_addr = addr;
@@ -90,7 +90,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
qdr_delivery_t *dlv = new_qdr_delivery_t();
ZERO(dlv);
- dlv->ref_count = 1; // referenced by the action
+ sys_atomic_init(&dlv->ref_count, 1); // referenced by the action
dlv->link = link;
dlv->msg = msg;
dlv->settled = settled;
@@ -248,15 +248,12 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
return delivery->context;
}
-
void qdr_delivery_incref(qdr_delivery_t *delivery)
{
qdr_connection_t *conn = delivery->link ? delivery->link->conn : 0;
if (!!conn) {
- sys_mutex_lock(conn->work_lock);
- delivery->ref_count++;
- sys_mutex_unlock(conn->work_lock);
+ sys_atomic_inc(&delivery->ref_count);
}
}
@@ -267,12 +264,9 @@ static void qdr_delivery_decref_internal(qdr_delivery_t *delivery, bool lock_hel
bool delete = false;
if (!!conn) {
- if (!lock_held)
- sys_mutex_lock(conn->work_lock);
- assert(delivery->ref_count > 0);
- delete = --delivery->ref_count == 0;
- if (!lock_held)
- sys_mutex_unlock(conn->work_lock);
+ uint32_t ref_count = sys_atomic_dec(&delivery->ref_count);
+ assert(ref_count > 0);
+ delete = (ref_count - 1) == 0;
}
if (delete) {
@@ -805,7 +799,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
sys_mutex_lock(link->conn->work_lock);
if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
- dlv->ref_count++; // We have the lock, don't use the incref function
+ sys_atomic_inc(&dlv->ref_count);
qdr_add_delivery_ref(&link->updated_deliveries, dlv);
qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
activate = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org