You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/01 20:40:43 UTC

[1/2] celix git commit: admin mutexes replaced by std::mutex and lock_guard

Repository: celix
Updated Branches:
  refs/heads/nanomsg cb740b0d4 -> 3009e6470


admin mutexes replaced by std::mutex and lock_guard


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/95892a85
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/95892a85
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/95892a85

Branch: refs/heads/nanomsg
Commit: 95892a8577107eb4fa93410ae08913d51407763b
Parents: cb740b0
Author: Erjan Altena <er...@gmail.com>
Authored: Thu Nov 1 21:17:02 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Thu Nov 1 21:17:02 2018 +0100

----------------------------------------------------------------------
 .../src/psa_nanomsg_activator.cc                |   4 -
 .../src/pubsub_nanomsg_admin.cc                 | 329 +++++++++----------
 .../src/pubsub_nanomsg_admin.h                  |  10 +-
 3 files changed, 166 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
index ec3ee7d..e599f01 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
@@ -87,10 +87,6 @@ private:
     LogHelper logHelper;
 	pubsub_nanomsg_admin admin;
 
-
-//    command_service_t cmdSvc{};
-
-//	long cmdSvcId = -1L;
 };
 
 celix_status_t  celix_bundleActivator_create(celix_bundle_context_t *ctx , void **userData) {

http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index bd1d0a5..c10431f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -105,64 +105,59 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
     qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
     qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadMutex_create(&serializers.mutex, nullptr);
     serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
 
-    celixThreadMutex_create(&topicSenders.mutex, nullptr);
     topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 
-    celixThreadMutex_create(&topicReceivers.mutex, nullptr);
     topicReceivers.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 
-    celixThreadMutex_create(&discoveredEndpoints.mutex, nullptr);
     discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 }
 
 pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
     //note assuming al psa register services and service tracker are removed.
-
-    celixThreadMutex_lock(&topicSenders.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
-        pubsub_nanoMsgTopicSender_destroy(sender);
+    {
+        std::lock_guard<std::mutex> lock(topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(&iter));
+            pubsub_nanoMsgTopicSender_destroy(sender);
+        }
     }
-    celixThreadMutex_unlock(&topicSenders.mutex);
 
-    celixThreadMutex_lock(&topicReceivers.mutex);
-    iter = hashMapIterator_construct(topicReceivers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
-        pubsub_nanoMsgTopicReceiver_destroy(recv);
+    {
+        std::lock_guard<std::mutex> lock(topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+            pubsub_nanoMsgTopicReceiver_destroy(recv);
+        }
     }
-    celixThreadMutex_unlock(&topicReceivers.mutex);
 
-    celixThreadMutex_lock(&discoveredEndpoints.mutex);
-    iter = hashMapIterator_construct(discoveredEndpoints.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
-        celix_properties_destroy(ep);
+    {
+        std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+            celix_properties_destroy(ep);
+        }
     }
-    celixThreadMutex_unlock(&discoveredEndpoints.mutex);
 
-    celixThreadMutex_lock(&serializers.mutex);
-    iter = hashMapIterator_construct(serializers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
-        free(entry);
+    {
+        std::lock_guard<std::mutex> lock(serializers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(serializers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
+            free(entry);
+        }
     }
-    celixThreadMutex_unlock(&serializers.mutex);
 
-    celixThreadMutex_destroy(&topicSenders.mutex);
     hashMap_destroy(topicSenders.map, true, false);
 
-    celixThreadMutex_destroy(&topicReceivers.mutex);
     hashMap_destroy(topicReceivers.map, true, false);
 
-    celixThreadMutex_destroy(&discoveredEndpoints.mutex);
     hashMap_destroy(discoveredEndpoints.map, false, false);
 
-    celixThreadMutex_destroy(&serializers.mutex);
     hashMap_destroy(serializers.map, false, false);
 
     free(ipAddress);
@@ -259,16 +254,17 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t
         return;
     }
 
-    celixThreadMutex_lock(&serializers.mutex);
-    auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId));
-    if (entry == nullptr) {
-        entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
-        entry->serType = serType;
-        entry->svcId = svcId;
-        entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
-        hashMap_put(serializers.map, (void*)svcId, entry);
+    {
+        std::lock_guard<std::mutex> lock(serializers.mutex);
+        auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId));
+        if (entry == nullptr) {
+            entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
+            entry->serType = serType;
+            entry->svcId = svcId;
+            entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
+            hashMap_put(serializers.map, (void*)svcId, entry);
+        }
     }
-    celixThreadMutex_unlock(&serializers.mutex);
 }
 
 
@@ -281,40 +277,41 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
     // 3) loop and destroy all topic receivers using the serializer
     // Note that it is the responsibility of the topology manager to create new topic senders/receivers
 
-    celixThreadMutex_lock(&serializers.mutex);
+    std::lock_guard<std::mutex> lock(serializers.mutex);
     auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(serializers.map, (void*)svcId));
     if (entry != nullptr) {
-        celixThreadMutex_lock(&topicSenders.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-            auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry));
-            if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
-                char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
-                hashMapIterator_remove(&iter);
-                pubsub_nanoMsgTopicSender_destroy(sender);
-                free(key);
+        {
+            std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
+            hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+                auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapEntry_getValue(senderEntry));
+                if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+                    char *key = static_cast<char *>(hashMapEntry_getKey(senderEntry));
+                    hashMapIterator_remove(&iter);
+                    pubsub_nanoMsgTopicSender_destroy(sender);
+                    free(key);
+                }
             }
         }
-        celixThreadMutex_unlock(&topicSenders.mutex);
 
-        celixThreadMutex_lock(&topicReceivers.mutex);
-        iter = hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-            auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
-            if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
-                char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
-                hashMapIterator_remove(&iter);
-                pubsub_nanoMsgTopicReceiver_destroy(receiver);
-                free(key);
+        {
+            std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
+            hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+                auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+                if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+                    char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
+                    hashMapIterator_remove(&iter);
+                    pubsub_nanoMsgTopicReceiver_destroy(receiver);
+                    free(key);
+                }
             }
         }
-        celixThreadMutex_unlock(&topicReceivers.mutex);
 
         free(entry);
     }
-    celixThreadMutex_unlock(&serializers.mutex);
 }
 
 celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter,
@@ -353,7 +350,7 @@ celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t *end
 
 celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic,
                                                     long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
-    celix_status_t  status = CELIX_SUCCESS;
+    celix_status_t status = CELIX_SUCCESS;
 
     //1) Create TopicSender
     //2) Store TopicSender
@@ -363,21 +360,22 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
     celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-    celixThreadMutex_lock(&serializers.mutex);
-    celixThreadMutex_lock(&topicSenders.mutex);
-    auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(topicSenders.map, key));
+    pubsub_nanomsg_topic_sender_t *sender = nullptr;
+    std::lock_guard<std::mutex> serializerLock(serializers.mutex);
+    std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
+    sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMap_get(topicSenders.map, key));
     if (sender == nullptr) {
-        auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId));
+        auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
+                                                                                   (void *) serializerSvcId));
         if (serEntry != nullptr) {
-            sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc,
-                                                      ipAddress, basePort, maxPort);
+            sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
+                                                      basePort, maxPort);
         }
         if (sender != nullptr) {
             const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
             const char *serType = serEntry->serType;
-            newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
-                                                serType, nullptr);
+            newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType,
+                                                nullptr);
             celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
@@ -393,9 +391,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
         free(key);
         L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
     }
-    celixThreadMutex_unlock(&topicSenders.mutex);
-    celixThreadMutex_unlock(&serializers.mutex);
-
     if (sender != nullptr && newEndpoint != nullptr) {
         //TODO connect endpoints to sender, NOTE is this needed for a nanomsg topic sender?
     }
@@ -414,7 +409,7 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
     //2) destroy topic sender
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&topicSenders.mutex);
+    std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
     hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key);
     if (entry != nullptr) {
         char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
@@ -425,7 +420,6 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
     } else {
         L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
     }
-    celixThreadMutex_unlock(&topicSenders.mutex);
     free(key);
 
     return status;
@@ -437,41 +431,41 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
     celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&serializers.mutex);
-    celixThreadMutex_lock(&topicReceivers.mutex);
-    auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(topicReceivers.map, key));
-    if (receiver == nullptr) {
-        auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId));
-        if (serEntry != nullptr) {
-            receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId,
-                                                          serEntry->svc);
-        } else {
-            L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
-        }
-        if (receiver != nullptr) {
-            const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
-            const char *serType = serEntry->serType;
-            newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, nullptr);
-            //if available also set container name
-            const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
-            if (cn != nullptr) {
-                celix_properties_set(newEndpoint, "container_name", cn);
+    pubsub_nanomsg_topic_receiver_t * receiver = nullptr;
+    {
+        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
+        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
+        receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMap_get(topicReceivers.map, key));
+        if (receiver == nullptr) {
+            auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
+                                                                                       (void *) serializerSvcId));
+            if (serEntry != nullptr) {
+                receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+            } else {
+                L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
+            }
+            if (receiver != nullptr) {
+                const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
+                const char *serType = serEntry->serType;
+                newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+                                                    serType, nullptr);
+                //if available also set container name
+                const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
+                if (cn != nullptr) {
+                    celix_properties_set(newEndpoint, "container_name", cn);
+                }
+                hashMap_put(topicReceivers.map, key, receiver);
+            } else {
+                L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
+                free(key);
             }
-            hashMap_put(topicReceivers.map, key, receiver);
         } else {
-            L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
             free(key);
+            L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
         }
-    } else {
-        free(key);
-        L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
     }
-    celixThreadMutex_unlock(&topicReceivers.mutex);
-    celixThreadMutex_unlock(&serializers.mutex);
-
     if (receiver != nullptr && newEndpoint != nullptr) {
-        celixThreadMutex_lock(&discoveredEndpoints.mutex);
+        std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             auto *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
@@ -480,7 +474,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
                 connectEndpointToReceiver(receiver, endpoint);
             }
         }
-        celixThreadMutex_unlock(&discoveredEndpoints.mutex);
     }
 
     if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) {
@@ -493,7 +486,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
 
 celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) {
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&topicReceivers.mutex);
+    std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
     hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key);
     free(key);
     if (entry != nullptr) {
@@ -504,7 +497,6 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co
         free(receiverKey);
         pubsub_nanoMsgTopicReceiver_destroy(receiver);
     }
-    celixThreadMutex_lock(&topicReceivers.mutex);
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
@@ -542,20 +534,18 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-        celixThreadMutex_lock(&topicReceivers.mutex);
+        std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
         while (hashMapIterator_hasNext(&iter)) {
             pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
             connectEndpointToReceiver(receiver, endpoint);
         }
-        celixThreadMutex_unlock(&topicReceivers.mutex);
     }
 
-    celixThreadMutex_lock(&discoveredEndpoints.mutex);
+    std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
     celix_properties_t *cpy = celix_properties_copy(endpoint);
     const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr);
     hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy);
-    celixThreadMutex_unlock(&discoveredEndpoints.mutex);
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
@@ -592,20 +582,19 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-        celixThreadMutex_lock(&topicReceivers.mutex);
+        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
         while (hashMapIterator_hasNext(&iter)) {
             pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
             disconnectEndpointFromReceiver(receiver, endpoint);
         }
-        celixThreadMutex_unlock(&topicReceivers.mutex);
     }
-
-    celixThreadMutex_lock(&discoveredEndpoints.mutex);
-    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
-    celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid));
-    celixThreadMutex_unlock(&discoveredEndpoints.mutex);
-
+    celix_properties_t *found = nullptr;
+    {
+        std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
+        const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
+        found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid));
+    }
     if (found != nullptr) {
         celix_properties_destroy(found);
     }
@@ -620,52 +609,56 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
-    celixThreadMutex_lock(&serializers.mutex);
-    celixThreadMutex_lock(&topicSenders.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
-        long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
-        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId));
-        const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
-        const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
-        const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
-        const char *url = pubsub_nanoMsgTopicSender_url(sender);
-        fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
-        fprintf(out, "   |- serializer type = %s\n", serType);
-        fprintf(out, "   |- url             = %s\n", url);
-    }
-    celixThreadMutex_unlock(&topicSenders.mutex);
-    celixThreadMutex_unlock(&serializers.mutex);
-
-    fprintf(out, "\n");
-    fprintf(out, "\nTopic Receivers:\n");
-    celixThreadMutex_lock(&serializers.mutex);
-    celixThreadMutex_lock(&topicReceivers.mutex);
-    iter = hashMapIterator_construct(topicReceivers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
-        long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
-        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId));
-        const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
-        const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
-        const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
-
-        std::vector<std::string> connected{};
-        std::vector<std::string> unconnected{};
-        pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
-
-        fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
-        fprintf(out, "   |- serializer type = %s\n", serType);
-        for (auto url : connected) {
-            fprintf(out, "   |- connected url   = %s\n", url.c_str());
+    {
+        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
+        std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(
+                    &iter));
+            long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
+            psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
+                    serializers.map, (void *) serSvcId));
+            const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+            const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
+            const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
+            const char *url = pubsub_nanoMsgTopicSender_url(sender);
+            fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+            fprintf(out, "   |- serializer type = %s\n", serType);
+            fprintf(out, "   |- url             = %s\n", url);
         }
-        for (auto url : unconnected) {
-            fprintf(out, "   |- unconnected url = %s\n", url.c_str());
+    }
+
+    {
+        fprintf(out, "\n");
+        fprintf(out, "\nTopic Receivers:\n");
+        std::lock_guard<std::mutex> serialerLock(serializers.mutex);
+        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMapIterator_nextValue(
+                    &iter));
+            long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
+            psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
+                    serializers.map, (void *) serSvcId));
+            const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+            const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
+            const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+
+            std::vector<std::string> connected{};
+            std::vector<std::string> unconnected{};
+            pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
+
+            fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+            fprintf(out, "   |- serializer type = %s\n", serType);
+            for (auto url : connected) {
+                fprintf(out, "   |- connected url   = %s\n", url.c_str());
+            }
+            for (auto url : unconnected) {
+                fprintf(out, "   |- unconnected url = %s\n", url.c_str());
+            }
         }
     }
-    celixThreadMutex_unlock(&topicReceivers.mutex);
-    celixThreadMutex_unlock(&serializers.mutex);
     fprintf(out, "\n");
 
     return status;

http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 385b400..b06c887 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -20,6 +20,7 @@
 #ifndef CELIX_PUBSUB_ZMQ_ADMIN_H
 #define CELIX_PUBSUB_ZMQ_ADMIN_H
 
+#include <mutex>
 #include <pubsub_admin.h>
 #include "celix_api.h"
 #include "log_helper.h"
@@ -98,22 +99,22 @@ private:
     bool verbose{};
 
     struct {
-        celix_thread_mutex_t mutex;
+        std::mutex mutex;
         hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
     } serializers{};
 
     struct {
-        celix_thread_mutex_t mutex;
+        std::mutex mutex;
         hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
     } topicSenders{};
 
     struct {
-        celix_thread_mutex_t mutex;
+        std::mutex mutex;
         hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
     } topicReceivers{};
 
     struct {
-        celix_thread_mutex_t mutex;
+        std::mutex mutex;
         hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
     } discoveredEndpoints{};
 
@@ -127,7 +128,6 @@ extern "C" {
 }
 #endif
 
-celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
 
 #endif //CELIX_PUBSUB_ZMQ_ADMIN_H
 


[2/2] celix git commit: Replaced celix-map with std::map

Posted by er...@apache.org.
Replaced celix-map with std::map


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/3009e647
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/3009e647
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/3009e647

Branch: refs/heads/nanomsg
Commit: 3009e64705e1be606ca308ffc7ec7c0633dfde77
Parents: 95892a8
Author: Erjan Altena <er...@gmail.com>
Authored: Thu Nov 1 21:40:24 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Thu Nov 1 21:40:24 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 | 57 +++++++++++---------
 .../src/pubsub_nanomsg_admin.h                  | 11 +++-
 .../src/pubsub_nanomsg_topic_receiver.h         |  3 +-
 3 files changed, 44 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/3009e647/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index c10431f..9fe91d9 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -28,7 +28,6 @@
 #include <netdb.h>
 #include <ifaddrs.h>
 #include <pubsub_endpoint.h>
-#include <pubsub_serializer.h>
 
 #include "pubsub_utils.h"
 #include "pubsub_nanomsg_admin.h"
@@ -105,7 +104,7 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
     qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
     qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
 
-    serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
+    //serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
 
     topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 
@@ -145,10 +144,9 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
 
     {
         std::lock_guard<std::mutex> lock(serializers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(serializers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
-            free(entry);
+        // todo: do not use pointer but type in map
+        for(auto kv: serializers.map) {
+            free(kv.second);
         }
     }
 
@@ -158,8 +156,6 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
 
     hashMap_destroy(discoveredEndpoints.map, false, false);
 
-    hashMap_destroy(serializers.map, false, false);
-
     free(ipAddress);
 
 }
@@ -256,13 +252,13 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t
 
     {
         std::lock_guard<std::mutex> lock(serializers.mutex);
-        auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId));
-        if (entry == nullptr) {
-            entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
+        auto it = serializers.map.find(svcId);
+        if (it == serializers.map.end()) {
+            auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
             entry->serType = serType;
             entry->svcId = svcId;
             entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
-            hashMap_put(serializers.map, (void*)svcId, entry);
+            serializers.map[svcId] = entry;
         }
     }
 }
@@ -278,7 +274,13 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
     // Note that it is the responsibility of the topology manager to create new topic senders/receivers
 
     std::lock_guard<std::mutex> lock(serializers.mutex);
-    auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(serializers.map, (void*)svcId));
+
+    psa_nanomsg_serializer_entry_t* entry = nullptr;
+    auto kv = serializers.map.find(svcId);
+    if (kv != serializers.map.end()) {
+        entry = kv->second;
+    }
+    serializers.map.erase(svcId);
     if (entry != nullptr) {
         {
             std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
@@ -365,8 +367,13 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
     sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMap_get(topicSenders.map, key));
     if (sender == nullptr) {
-        auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
-                                                                                   (void *) serializerSvcId));
+        //auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
+        //                                                                           (void *) serializerSvcId));
+        psa_nanomsg_serializer_entry_t *serEntry = nullptr;
+        auto kv = serializers.map.find(serializerSvcId);
+        if (kv != serializers.map.end()) {
+            serEntry = kv->second;
+        }
         if (serEntry != nullptr) {
             sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
                                                       basePort, maxPort);
@@ -437,16 +444,16 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
         receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMap_get(topicReceivers.map, key));
         if (receiver == nullptr) {
-            auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
-                                                                                       (void *) serializerSvcId));
-            if (serEntry != nullptr) {
+            auto kv = serializers.map.find(serializerSvcId);
+            if (kv != serializers.map.end()) {
+                auto serEntry = kv->second;
                 receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
             } else {
                 L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
             }
             if (receiver != nullptr) {
                 const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
-                const char *serType = serEntry->serType;
+                const char *serType = kv->second->serType;
                 newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
                                                     serType, nullptr);
                 //if available also set container name
@@ -617,9 +624,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
             pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(
                     &iter));
             long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
-            psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
-                    serializers.map, (void *) serSvcId));
-            const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+            auto kv = serializers.map.find(serSvcId);
+            //psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
+            //        serializers.map, (void *) serSvcId));
+            const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
             const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
             const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
             const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -639,9 +647,8 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
             pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMapIterator_nextValue(
                     &iter));
             long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
-            psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
-                    serializers.map, (void *) serSvcId));
-            const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+            auto kv =  serializers.map.find(serSvcId);
+            const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
             const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
             const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3009e647/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index b06c887..98314b3 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -21,10 +21,13 @@
 #define CELIX_PUBSUB_ZMQ_ADMIN_H
 
 #include <mutex>
+#include <map>
 #include <pubsub_admin.h>
 #include "celix_api.h"
 #include "log_helper.h"
 #include "pubsub_nanomsg_topic_receiver.h"
+#include <pubsub_serializer.h>
+
 #include "../../../shell/shell/include/command.h"
 
 #define PUBSUB_NANOMSG_ADMIN_TYPE       "zmq"
@@ -98,9 +101,15 @@ private:
 
     bool verbose{};
 
+    typedef struct psa_nanomsg_serializer_entry {
+        const char *serType;
+        long svcId;
+        pubsub_serializer_service_t *svc;
+    } psa_nanomsg_serializer_entry_t;
     struct {
         std::mutex mutex;
-        hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
+        std::map<long, psa_nanomsg_serializer_entry_t*> map;
+        //hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
     } serializers{};
 
     struct {

http://git-wip-us.apache.org/repos/asf/celix/blob/3009e647/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 786fb90..d584db8 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -20,7 +20,8 @@
 #define CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
 #include <string>
 #include <vector>
-
+#include "pubsub_serializer.h"
+#include "log_helper.h"
 #include "celix_bundle_context.h"
 
 typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;