You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/01 20:40:43 UTC
[1/2] celix git commit: admin mutexes replaced by std::mutex and
lock_guard
Repository: celix
Updated Branches:
refs/heads/nanomsg cb740b0d4 -> 3009e6470
admin mutexes replaced by std::mutex and lock_guard
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/95892a85
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/95892a85
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/95892a85
Branch: refs/heads/nanomsg
Commit: 95892a8577107eb4fa93410ae08913d51407763b
Parents: cb740b0
Author: Erjan Altena <er...@gmail.com>
Authored: Thu Nov 1 21:17:02 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Thu Nov 1 21:17:02 2018 +0100
----------------------------------------------------------------------
.../src/psa_nanomsg_activator.cc | 4 -
.../src/pubsub_nanomsg_admin.cc | 329 +++++++++----------
.../src/pubsub_nanomsg_admin.h | 10 +-
3 files changed, 166 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
index ec3ee7d..e599f01 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
@@ -87,10 +87,6 @@ private:
LogHelper logHelper;
pubsub_nanomsg_admin admin;
-
-// command_service_t cmdSvc{};
-
-// long cmdSvcId = -1L;
};
celix_status_t celix_bundleActivator_create(celix_bundle_context_t *ctx , void **userData) {
http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index bd1d0a5..c10431f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -105,64 +105,59 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadMutex_create(&serializers.mutex, nullptr);
serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
- celixThreadMutex_create(&topicSenders.mutex, nullptr);
topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
- celixThreadMutex_create(&topicReceivers.mutex, nullptr);
topicReceivers.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
- celixThreadMutex_create(&discoveredEndpoints.mutex, nullptr);
discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
}
pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
//note assuming al psa register services and service tracker are removed.
-
- celixThreadMutex_lock(&topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
- pubsub_nanoMsgTopicSender_destroy(sender);
+ {
+ std::lock_guard<std::mutex> lock(topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(&iter));
+ pubsub_nanoMsgTopicSender_destroy(sender);
+ }
}
- celixThreadMutex_unlock(&topicSenders.mutex);
- celixThreadMutex_lock(&topicReceivers.mutex);
- iter = hashMapIterator_construct(topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
- pubsub_nanoMsgTopicReceiver_destroy(recv);
+ {
+ std::lock_guard<std::mutex> lock(topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+ pubsub_nanoMsgTopicReceiver_destroy(recv);
+ }
}
- celixThreadMutex_unlock(&topicReceivers.mutex);
- celixThreadMutex_lock(&discoveredEndpoints.mutex);
- iter = hashMapIterator_construct(discoveredEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
- celix_properties_destroy(ep);
+ {
+ std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+ celix_properties_destroy(ep);
+ }
}
- celixThreadMutex_unlock(&discoveredEndpoints.mutex);
- celixThreadMutex_lock(&serializers.mutex);
- iter = hashMapIterator_construct(serializers.map);
- while (hashMapIterator_hasNext(&iter)) {
- auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
- free(entry);
+ {
+ std::lock_guard<std::mutex> lock(serializers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(serializers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
+ free(entry);
+ }
}
- celixThreadMutex_unlock(&serializers.mutex);
- celixThreadMutex_destroy(&topicSenders.mutex);
hashMap_destroy(topicSenders.map, true, false);
- celixThreadMutex_destroy(&topicReceivers.mutex);
hashMap_destroy(topicReceivers.map, true, false);
- celixThreadMutex_destroy(&discoveredEndpoints.mutex);
hashMap_destroy(discoveredEndpoints.map, false, false);
- celixThreadMutex_destroy(&serializers.mutex);
hashMap_destroy(serializers.map, false, false);
free(ipAddress);
@@ -259,16 +254,17 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t
return;
}
- celixThreadMutex_lock(&serializers.mutex);
- auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId));
- if (entry == nullptr) {
- entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
- entry->serType = serType;
- entry->svcId = svcId;
- entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
- hashMap_put(serializers.map, (void*)svcId, entry);
+ {
+ std::lock_guard<std::mutex> lock(serializers.mutex);
+ auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId));
+ if (entry == nullptr) {
+ entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
+ entry->serType = serType;
+ entry->svcId = svcId;
+ entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
+ hashMap_put(serializers.map, (void*)svcId, entry);
+ }
}
- celixThreadMutex_unlock(&serializers.mutex);
}
@@ -281,40 +277,41 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
// 3) loop and destroy all topic receivers using the serializer
// Note that it is the responsibility of the topology manager to create new topic senders/receivers
- celixThreadMutex_lock(&serializers.mutex);
+ std::lock_guard<std::mutex> lock(serializers.mutex);
auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(serializers.map, (void*)svcId));
if (entry != nullptr) {
- celixThreadMutex_lock(&topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry));
- if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
- char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
- hashMapIterator_remove(&iter);
- pubsub_nanoMsgTopicSender_destroy(sender);
- free(key);
+ {
+ std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapEntry_getValue(senderEntry));
+ if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+ char *key = static_cast<char *>(hashMapEntry_getKey(senderEntry));
+ hashMapIterator_remove(&iter);
+ pubsub_nanoMsgTopicSender_destroy(sender);
+ free(key);
+ }
}
}
- celixThreadMutex_unlock(&topicSenders.mutex);
- celixThreadMutex_lock(&topicReceivers.mutex);
- iter = hashMapIterator_construct(topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
- if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
- char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
- hashMapIterator_remove(&iter);
- pubsub_nanoMsgTopicReceiver_destroy(receiver);
- free(key);
+ {
+ std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+ if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+ char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
+ hashMapIterator_remove(&iter);
+ pubsub_nanoMsgTopicReceiver_destroy(receiver);
+ free(key);
+ }
}
}
- celixThreadMutex_unlock(&topicReceivers.mutex);
free(entry);
}
- celixThreadMutex_unlock(&serializers.mutex);
}
celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter,
@@ -353,7 +350,7 @@ celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t *end
celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic,
long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
- celix_status_t status = CELIX_SUCCESS;
+ celix_status_t status = CELIX_SUCCESS;
//1) Create TopicSender
//2) Store TopicSender
@@ -363,21 +360,22 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
celix_properties_t *newEndpoint = nullptr;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
- celixThreadMutex_lock(&serializers.mutex);
- celixThreadMutex_lock(&topicSenders.mutex);
- auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(topicSenders.map, key));
+ pubsub_nanomsg_topic_sender_t *sender = nullptr;
+ std::lock_guard<std::mutex> serializerLock(serializers.mutex);
+ std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
+ sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMap_get(topicSenders.map, key));
if (sender == nullptr) {
- auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId));
+ auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
+ (void *) serializerSvcId));
if (serEntry != nullptr) {
- sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc,
- ipAddress, basePort, maxPort);
+ sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
+ basePort, maxPort);
}
if (sender != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
const char *serType = serEntry->serType;
- newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, nullptr);
+ newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType,
+ nullptr);
celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
//if available also set container name
const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
@@ -393,9 +391,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
free(key);
L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
}
- celixThreadMutex_unlock(&topicSenders.mutex);
- celixThreadMutex_unlock(&serializers.mutex);
-
if (sender != nullptr && newEndpoint != nullptr) {
//TODO connect endpoints to sender, NOTE is this needed for a nanomsg topic sender?
}
@@ -414,7 +409,7 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
//2) destroy topic sender
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&topicSenders.mutex);
+ std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key);
if (entry != nullptr) {
char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
@@ -425,7 +420,6 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
} else {
L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
}
- celixThreadMutex_unlock(&topicSenders.mutex);
free(key);
return status;
@@ -437,41 +431,41 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
celix_properties_t *newEndpoint = nullptr;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&serializers.mutex);
- celixThreadMutex_lock(&topicReceivers.mutex);
- auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(topicReceivers.map, key));
- if (receiver == nullptr) {
- auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId));
- if (serEntry != nullptr) {
- receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId,
- serEntry->svc);
- } else {
- L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
- }
- if (receiver != nullptr) {
- const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
- const char *serType = serEntry->serType;
- newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, nullptr);
- //if available also set container name
- const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
- if (cn != nullptr) {
- celix_properties_set(newEndpoint, "container_name", cn);
+ pubsub_nanomsg_topic_receiver_t * receiver = nullptr;
+ {
+ std::lock_guard<std::mutex> serializerLock(serializers.mutex);
+ std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
+ receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMap_get(topicReceivers.map, key));
+ if (receiver == nullptr) {
+ auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
+ (void *) serializerSvcId));
+ if (serEntry != nullptr) {
+ receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+ } else {
+ L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
+ }
+ if (receiver != nullptr) {
+ const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
+ const char *serType = serEntry->serType;
+ newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+ serType, nullptr);
+ //if available also set container name
+ const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
+ if (cn != nullptr) {
+ celix_properties_set(newEndpoint, "container_name", cn);
+ }
+ hashMap_put(topicReceivers.map, key, receiver);
+ } else {
+ L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
+ free(key);
}
- hashMap_put(topicReceivers.map, key, receiver);
} else {
- L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
free(key);
+ L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
}
- } else {
- free(key);
- L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
}
- celixThreadMutex_unlock(&topicReceivers.mutex);
- celixThreadMutex_unlock(&serializers.mutex);
-
if (receiver != nullptr && newEndpoint != nullptr) {
- celixThreadMutex_lock(&discoveredEndpoints.mutex);
+ std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
while (hashMapIterator_hasNext(&iter)) {
auto *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
@@ -480,7 +474,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
connectEndpointToReceiver(receiver, endpoint);
}
}
- celixThreadMutex_unlock(&discoveredEndpoints.mutex);
}
if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) {
@@ -493,7 +486,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) {
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&topicReceivers.mutex);
+ std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key);
free(key);
if (entry != nullptr) {
@@ -504,7 +497,6 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co
free(receiverKey);
pubsub_nanoMsgTopicReceiver_destroy(receiver);
}
- celixThreadMutex_lock(&topicReceivers.mutex);
celix_status_t status = CELIX_SUCCESS;
return status;
@@ -542,20 +534,18 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- celixThreadMutex_lock(&topicReceivers.mutex);
+ std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
connectEndpointToReceiver(receiver, endpoint);
}
- celixThreadMutex_unlock(&topicReceivers.mutex);
}
- celixThreadMutex_lock(&discoveredEndpoints.mutex);
+ std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
celix_properties_t *cpy = celix_properties_copy(endpoint);
const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr);
hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy);
- celixThreadMutex_unlock(&discoveredEndpoints.mutex);
celix_status_t status = CELIX_SUCCESS;
return status;
@@ -592,20 +582,19 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- celixThreadMutex_lock(&topicReceivers.mutex);
+ std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
disconnectEndpointFromReceiver(receiver, endpoint);
}
- celixThreadMutex_unlock(&topicReceivers.mutex);
}
-
- celixThreadMutex_lock(&discoveredEndpoints.mutex);
- const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
- celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid));
- celixThreadMutex_unlock(&discoveredEndpoints.mutex);
-
+ celix_properties_t *found = nullptr;
+ {
+ std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
+ const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
+ found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid));
+ }
if (found != nullptr) {
celix_properties_destroy(found);
}
@@ -620,52 +609,56 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
- celixThreadMutex_lock(&serializers.mutex);
- celixThreadMutex_lock(&topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
- long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
- psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId));
- const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
- const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
- const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
- const char *url = pubsub_nanoMsgTopicSender_url(sender);
- fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
- fprintf(out, " |- serializer type = %s\n", serType);
- fprintf(out, " |- url = %s\n", url);
- }
- celixThreadMutex_unlock(&topicSenders.mutex);
- celixThreadMutex_unlock(&serializers.mutex);
-
- fprintf(out, "\n");
- fprintf(out, "\nTopic Receivers:\n");
- celixThreadMutex_lock(&serializers.mutex);
- celixThreadMutex_lock(&topicReceivers.mutex);
- iter = hashMapIterator_construct(topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
- long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
- psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId));
- const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
- const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
- const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
-
- std::vector<std::string> connected{};
- std::vector<std::string> unconnected{};
- pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
-
- fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
- fprintf(out, " |- serializer type = %s\n", serType);
- for (auto url : connected) {
- fprintf(out, " |- connected url = %s\n", url.c_str());
+ {
+ std::lock_guard<std::mutex> serializerLock(serializers.mutex);
+ std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(
+ &iter));
+ long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
+ psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
+ serializers.map, (void *) serSvcId));
+ const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+ const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
+ const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
+ const char *url = pubsub_nanoMsgTopicSender_url(sender);
+ fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+ fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- url = %s\n", url);
}
- for (auto url : unconnected) {
- fprintf(out, " |- unconnected url = %s\n", url.c_str());
+ }
+
+ {
+ fprintf(out, "\n");
+ fprintf(out, "\nTopic Receivers:\n");
+ std::lock_guard<std::mutex> serialerLock(serializers.mutex);
+ std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMapIterator_nextValue(
+ &iter));
+ long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
+ psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
+ serializers.map, (void *) serSvcId));
+ const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+ const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
+ const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+
+ std::vector<std::string> connected{};
+ std::vector<std::string> unconnected{};
+ pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
+
+ fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+ fprintf(out, " |- serializer type = %s\n", serType);
+ for (auto url : connected) {
+ fprintf(out, " |- connected url = %s\n", url.c_str());
+ }
+ for (auto url : unconnected) {
+ fprintf(out, " |- unconnected url = %s\n", url.c_str());
+ }
}
}
- celixThreadMutex_unlock(&topicReceivers.mutex);
- celixThreadMutex_unlock(&serializers.mutex);
fprintf(out, "\n");
return status;
http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 385b400..b06c887 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -20,6 +20,7 @@
#ifndef CELIX_PUBSUB_ZMQ_ADMIN_H
#define CELIX_PUBSUB_ZMQ_ADMIN_H
+#include <mutex>
#include <pubsub_admin.h>
#include "celix_api.h"
#include "log_helper.h"
@@ -98,22 +99,22 @@ private:
bool verbose{};
struct {
- celix_thread_mutex_t mutex;
+ std::mutex mutex;
hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
} serializers{};
struct {
- celix_thread_mutex_t mutex;
+ std::mutex mutex;
hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
} topicSenders{};
struct {
- celix_thread_mutex_t mutex;
+ std::mutex mutex;
hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
} topicReceivers{};
struct {
- celix_thread_mutex_t mutex;
+ std::mutex mutex;
hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints{};
@@ -127,7 +128,6 @@ extern "C" {
}
#endif
-celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
[2/2] celix git commit: Replaced celix-map with std::map
Posted by er...@apache.org.
Replaced celix-map with std::map
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/3009e647
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/3009e647
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/3009e647
Branch: refs/heads/nanomsg
Commit: 3009e64705e1be606ca308ffc7ec7c0633dfde77
Parents: 95892a8
Author: Erjan Altena <er...@gmail.com>
Authored: Thu Nov 1 21:40:24 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Thu Nov 1 21:40:24 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_admin.cc | 57 +++++++++++---------
.../src/pubsub_nanomsg_admin.h | 11 +++-
.../src/pubsub_nanomsg_topic_receiver.h | 3 +-
3 files changed, 44 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/3009e647/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index c10431f..9fe91d9 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -28,7 +28,6 @@
#include <netdb.h>
#include <ifaddrs.h>
#include <pubsub_endpoint.h>
-#include <pubsub_serializer.h>
#include "pubsub_utils.h"
#include "pubsub_nanomsg_admin.h"
@@ -105,7 +104,7 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
- serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
+ //serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
@@ -145,10 +144,9 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
{
std::lock_guard<std::mutex> lock(serializers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(serializers.map);
- while (hashMapIterator_hasNext(&iter)) {
- auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
- free(entry);
+ // todo: do not use pointer but type in map
+ for(auto kv: serializers.map) {
+ free(kv.second);
}
}
@@ -158,8 +156,6 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
hashMap_destroy(discoveredEndpoints.map, false, false);
- hashMap_destroy(serializers.map, false, false);
-
free(ipAddress);
}
@@ -256,13 +252,13 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t
{
std::lock_guard<std::mutex> lock(serializers.mutex);
- auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId));
- if (entry == nullptr) {
- entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
+ auto it = serializers.map.find(svcId);
+ if (it == serializers.map.end()) {
+ auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
entry->serType = serType;
entry->svcId = svcId;
entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
- hashMap_put(serializers.map, (void*)svcId, entry);
+ serializers.map[svcId] = entry;
}
}
}
@@ -278,7 +274,13 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
// Note that it is the responsibility of the topology manager to create new topic senders/receivers
std::lock_guard<std::mutex> lock(serializers.mutex);
- auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(serializers.map, (void*)svcId));
+
+ psa_nanomsg_serializer_entry_t* entry = nullptr;
+ auto kv = serializers.map.find(svcId);
+ if (kv != serializers.map.end()) {
+ entry = kv->second;
+ }
+ serializers.map.erase(svcId);
if (entry != nullptr) {
{
std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
@@ -365,8 +367,13 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMap_get(topicSenders.map, key));
if (sender == nullptr) {
- auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
- (void *) serializerSvcId));
+ //auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
+ // (void *) serializerSvcId));
+ psa_nanomsg_serializer_entry_t *serEntry = nullptr;
+ auto kv = serializers.map.find(serializerSvcId);
+ if (kv != serializers.map.end()) {
+ serEntry = kv->second;
+ }
if (serEntry != nullptr) {
sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
basePort, maxPort);
@@ -437,16 +444,16 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMap_get(topicReceivers.map, key));
if (receiver == nullptr) {
- auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
- (void *) serializerSvcId));
- if (serEntry != nullptr) {
+ auto kv = serializers.map.find(serializerSvcId);
+ if (kv != serializers.map.end()) {
+ auto serEntry = kv->second;
receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
} else {
L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
}
if (receiver != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
- const char *serType = serEntry->serType;
+ const char *serType = kv->second->serType;
newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
serType, nullptr);
//if available also set container name
@@ -617,9 +624,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(
&iter));
long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
- psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
- serializers.map, (void *) serSvcId));
- const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+ auto kv = serializers.map.find(serSvcId);
+ //psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
+ // serializers.map, (void *) serSvcId));
+ const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -639,9 +647,8 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMapIterator_nextValue(
&iter));
long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
- psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
- serializers.map, (void *) serSvcId));
- const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+ auto kv = serializers.map.find(serSvcId);
+ const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
http://git-wip-us.apache.org/repos/asf/celix/blob/3009e647/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index b06c887..98314b3 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -21,10 +21,13 @@
#define CELIX_PUBSUB_ZMQ_ADMIN_H
#include <mutex>
+#include <map>
#include <pubsub_admin.h>
#include "celix_api.h"
#include "log_helper.h"
#include "pubsub_nanomsg_topic_receiver.h"
+#include <pubsub_serializer.h>
+
#include "../../../shell/shell/include/command.h"
#define PUBSUB_NANOMSG_ADMIN_TYPE "zmq"
@@ -98,9 +101,15 @@ private:
bool verbose{};
+ typedef struct psa_nanomsg_serializer_entry {
+ const char *serType;
+ long svcId;
+ pubsub_serializer_service_t *svc;
+ } psa_nanomsg_serializer_entry_t;
struct {
std::mutex mutex;
- hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
+ std::map<long, psa_nanomsg_serializer_entry_t*> map;
+ //hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
} serializers{};
struct {
http://git-wip-us.apache.org/repos/asf/celix/blob/3009e647/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 786fb90..d584db8 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -20,7 +20,8 @@
#define CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
#include <string>
#include <vector>
-
+#include "pubsub_serializer.h"
+#include "log_helper.h"
#include "celix_bundle_context.h"
typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;