You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:02:57 UTC

[1/8] celix git commit: Removed celix-maps from nanomsg admin

Repository: celix
Updated Branches:
  refs/heads/nanomsg 3009e6470 -> 7c141424d


Removed celix-maps from nanomsg admin


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/95633eb9
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/95633eb9
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/95633eb9

Branch: refs/heads/nanomsg
Commit: 95633eb954a09232867277d7cd0d71c30d49a012
Parents: 3009e64
Author: Erjan Altena <er...@gmail.com>
Authored: Sat Nov 3 19:40:27 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Sat Nov 3 19:40:27 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 | 148 +++++++------------
 .../src/pubsub_nanomsg_admin.h                  |  34 ++---
 2 files changed, 67 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/95633eb9/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 9fe91d9..6c15ec8 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -32,8 +32,6 @@
 #include "pubsub_utils.h"
 #include "pubsub_nanomsg_admin.h"
 #include "pubsub_psa_nanomsg_constants.h"
-#include "pubsub_nanomsg_topic_sender.h"
-#include "pubsub_nanomsg_topic_receiver.h"
 /*
 //#define L_DEBUG(...) \
 //    logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
@@ -50,11 +48,6 @@
 #define L_ERROR printf
 
 
-typedef struct psa_nanomsg_serializer_entry {
-    const char *serType;
-    long svcId;
-    pubsub_serializer_service_t *svc;
-} psa_nanomsg_serializer_entry_t;
 
 static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
 
@@ -103,41 +96,29 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
     defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
     qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
     qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
-
-    //serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
-
-    topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
-
-    topicReceivers.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
-
-    discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 }
 
 pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
     //note assuming al psa register services and service tracker are removed.
     {
         std::lock_guard<std::mutex> lock(topicSenders.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(&iter));
+        for (auto kv : topicSenders.map) {
+            auto *sender = kv.second;
             pubsub_nanoMsgTopicSender_destroy(sender);
         }
     }
 
     {
         std::lock_guard<std::mutex> lock(topicReceivers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
-            pubsub_nanoMsgTopicReceiver_destroy(recv);
+        for (auto kv: topicReceivers.map) {
+            pubsub_nanoMsgTopicReceiver_destroy(kv.second);
         }
     }
 
     {
         std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+        for (auto entry : discoveredEndpoints.map) {
+            auto *ep = entry.second;
             celix_properties_destroy(ep);
         }
     }
@@ -150,12 +131,6 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
         }
     }
 
-    hashMap_destroy(topicSenders.map, true, false);
-
-    hashMap_destroy(topicReceivers.map, true, false);
-
-    hashMap_destroy(discoveredEndpoints.map, false, false);
-
     free(ipAddress);
 
 }
@@ -276,21 +251,19 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
     std::lock_guard<std::mutex> lock(serializers.mutex);
 
     psa_nanomsg_serializer_entry_t* entry = nullptr;
-    auto kv = serializers.map.find(svcId);
-    if (kv != serializers.map.end()) {
-        entry = kv->second;
+    auto kvsm = serializers.map.find(svcId);
+    if (kvsm != serializers.map.end()) {
+        entry = kvsm->second;
     }
     serializers.map.erase(svcId);
     if (entry != nullptr) {
         {
             std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-                auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapEntry_getValue(senderEntry));
+                for (auto kv: topicSenders.map) {
+                auto *sender = kv.second;
                 if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
-                    char *key = static_cast<char *>(hashMapEntry_getKey(senderEntry));
-                    hashMapIterator_remove(&iter);
+                    char *key = kv.first;
+                    topicSenders.map.erase(kv.first);
                     pubsub_nanoMsgTopicSender_destroy(sender);
                     free(key);
                 }
@@ -299,13 +272,11 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
 
         {
             std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-                auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+            for (auto kv : topicReceivers.map){
+                auto *receiver = kv.second;
                 if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
-                    char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
-                    hashMapIterator_remove(&iter);
+                    char *key = kv.first;
+                    topicReceivers.map.erase(key);
                     pubsub_nanoMsgTopicReceiver_destroy(receiver);
                     free(key);
                 }
@@ -365,7 +336,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
     pubsub_nanomsg_topic_sender_t *sender = nullptr;
     std::lock_guard<std::mutex> serializerLock(serializers.mutex);
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
-    sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMap_get(topicSenders.map, key));
+    sender = topicSenders.map.find(key)->second;
     if (sender == nullptr) {
         //auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
         //                                                                           (void *) serializerSvcId));
@@ -389,7 +360,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
             if (cn != nullptr) {
                 celix_properties_set(newEndpoint, "container_name", cn);
             }
-            hashMap_put(topicSenders.map, key, sender);
+            topicSenders.map[key] = sender;
         } else {
             L_ERROR("[PSA NANOMSG] Error creating a TopicSender");
             free(key);
@@ -417,10 +388,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
-    hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key);
-    if (entry != nullptr) {
-        char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
-        pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(topicSenders.map, key));
+    auto kv = topicSenders.map.find(key);
+    if (kv != topicSenders.map.end()) {
+        char *mapKey = kv->first;
+        pubsub_nanomsg_topic_sender_t *sender = kv->second;
         free(mapKey);
         //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
         pubsub_nanoMsgTopicSender_destroy(sender);
@@ -442,18 +413,21 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
     {
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMap_get(topicReceivers.map, key));
+         auto trkv = topicReceivers.map.find(key);
+         if (trkv != topicReceivers.map.end()) {
+             receiver = trkv->second;
+         }
         if (receiver == nullptr) {
-            auto kv = serializers.map.find(serializerSvcId);
-            if (kv != serializers.map.end()) {
-                auto serEntry = kv->second;
+            auto kvs = serializers.map.find(serializerSvcId);
+            if (kvs != serializers.map.end()) {
+                auto serEntry = kvs->second;
                 receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
             } else {
                 L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
             }
             if (receiver != nullptr) {
                 const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
-                const char *serType = kv->second->serType;
+                const char *serType = kvs->second->serType;
                 newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
                                                     serType, nullptr);
                 //if available also set container name
@@ -461,7 +435,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
                 if (cn != nullptr) {
                     celix_properties_set(newEndpoint, "container_name", cn);
                 }
-                hashMap_put(topicReceivers.map, key, receiver);
+                topicReceivers.map[key] = receiver;
             } else {
                 L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
                 free(key);
@@ -473,9 +447,8 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
     }
     if (receiver != nullptr && newEndpoint != nullptr) {
         std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            auto *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+        for (auto entry : discoveredEndpoints.map) {
+            auto *endpoint = entry.second;
             const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
             if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
                 connectEndpointToReceiver(receiver, endpoint);
@@ -494,12 +467,12 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
 celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) {
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-    hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key);
+    auto entry = topicReceivers.map.find(key);
     free(key);
-    if (entry != nullptr) {
-        char *receiverKey = static_cast<char*>(hashMapEntry_getKey(entry));
-        pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(entry));
-        hashMap_remove(topicReceivers.map, receiverKey);
+    if (entry != topicReceivers.map.end()) {
+        char *receiverKey = entry->first;
+        pubsub_nanomsg_topic_receiver_t *receiver = entry->second;
+        topicReceivers.map.erase(receiverKey);
 
         free(receiverKey);
         pubsub_nanoMsgTopicReceiver_destroy(receiver);
@@ -542,17 +515,17 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+        for (auto entry: topicReceivers.map) {
+            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
             connectEndpointToReceiver(receiver, endpoint);
         }
     }
 
     std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
     celix_properties_t *cpy = celix_properties_copy(endpoint);
+    //TODO : check if properties are never deleted before map.
     const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr);
-    hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy);
+    discoveredEndpoints.map[uuid] = cpy;
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
@@ -590,24 +563,17 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+        for (auto entry : topicReceivers.map) {
+            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
             disconnectEndpointFromReceiver(receiver, endpoint);
         }
     }
-    celix_properties_t *found = nullptr;
     {
         std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
         const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
-        found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid));
-    }
-    if (found != nullptr) {
-        celix_properties_destroy(found);
+        discoveredEndpoints.map.erase(uuid);
     }
-
-    celix_status_t  status = CELIX_SUCCESS;
-    return status;
+    return CELIX_SUCCESS;;
 }
 
 celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribute__((unused)), FILE *out,
@@ -619,15 +585,11 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
     {
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(
-                    &iter));
+        for (auto kvts: topicSenders.map) {
+            pubsub_nanomsg_topic_sender_t *sender = kvts.second;
             long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
-            auto kv = serializers.map.find(serSvcId);
-            //psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
-            //        serializers.map, (void *) serSvcId));
-            const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
+            auto kvs = serializers.map.find(serSvcId);
+            const char *serType = kvs->second == nullptr ? "!Error!" : kvs->second->serType;
             const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
             const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
             const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -640,12 +602,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
     {
         fprintf(out, "\n");
         fprintf(out, "\nTopic Receivers:\n");
-        std::lock_guard<std::mutex> serialerLock(serializers.mutex);
+        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMapIterator_nextValue(
-                    &iter));
+        for (auto entry : topicReceivers.map) {
+            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
             long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
             auto kv =  serializers.map.find(serSvcId);
             const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;

http://git-wip-us.apache.org/repos/asf/celix/blob/95633eb9/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 98314b3..c34a310 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -29,6 +29,8 @@
 #include <pubsub_serializer.h>
 
 #include "../../../shell/shell/include/command.h"
+#include "pubsub_nanomsg_topic_sender.h"
+#include "pubsub_nanomsg_topic_receiver.h"
 
 #define PUBSUB_NANOMSG_ADMIN_TYPE       "zmq"
 #define PUBSUB_NANOMSG_URL_KEY          "zmq.url"
@@ -42,6 +44,13 @@
 #define PUBSUB_NANOMSG_DEFAULT_IP       "127.0.0.1"
 
 //typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
+
+template <typename key, typename value>
+struct ProtectedMap {
+    std::mutex mutex{};
+    std::map<key, value> map{};
+};
+
 class pubsub_nanomsg_admin {
 public:
     pubsub_nanomsg_admin(celix_bundle_context_t *ctx, log_helper_t *logHelper);
@@ -106,27 +115,10 @@ private:
         long svcId;
         pubsub_serializer_service_t *svc;
     } psa_nanomsg_serializer_entry_t;
-    struct {
-        std::mutex mutex;
-        std::map<long, psa_nanomsg_serializer_entry_t*> map;
-        //hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
-    } serializers{};
-
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
-    } topicSenders{};
-
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
-    } topicReceivers{};
-
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
-    } discoveredEndpoints{};
-
+    ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
+    ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
+    ProtectedMap<char*, pubsub_nanomsg_topic_receiver_t*> topicReceivers{};
+    ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
 };
 
 #ifdef __cplusplus


[5/8] celix git commit: subscriber.map now std::map

Posted by er...@apache.org.
subscriber.map now std::map


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/120895dd
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/120895dd
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/120895dd

Branch: refs/heads/nanomsg
Commit: 120895dd93c2c995d98327925fe88f939d345d12
Parents: 8658738
Author: Erjan Altena <er...@gmail.com>
Authored: Fri Nov 23 21:53:50 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Fri Nov 23 21:53:50 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_topic_receiver.cc        | 100 +++++++------------
 .../src/pubsub_nanomsg_topic_receiver.h         |  12 ++-
 2 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 8acf6b1..2205ed2 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -63,13 +63,6 @@
 #define L_ERROR printf
 
 
-
-
-
-//static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
-//                                                         const celix_bundle_t *owner);
-
-
 pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         log_helper_t *_logHelper,
         const char *_scope,
@@ -103,8 +96,8 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         m_scope = strndup(m_scope, 1024 * 1024);
         m_topic = strndup(m_topic, 1024 * 1024);
 
-        subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
-        std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
+        //subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        //std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
         //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
@@ -139,38 +132,13 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
 
         celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
 
-        hash_map_iterator_t iter=hash_map_iterator_t();
         {
             std::lock_guard<std::mutex> _lock(subscribers.mutex);
-            iter = hashMapIterator_construct(subscribers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
-                if (entry != NULL)  {
-                    serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
-                    free(entry);
-                }
+            for(auto elem : subscribers.map) {
+                serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes);
             }
-            hashMap_destroy(subscribers.map, false, false);
+            subscribers.map.clear();
         }
-
-
-//        {
-//            std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-//            iter = hashMapIterator_construct(requestedConnections.map);
-//            while (hashMapIterator_hasNext(&iter)) {
-//                psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
-//                if (entry != NULL) {
-//                    free(entry->url);
-//                    free(entry);
-//                }
-//            }
-//            hashMap_destroy(requestedConnections.map, false, false);
-//        }
-
-        //celixThreadMutex_destroy(&receiver->subscribers.mutex);
-        //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
-        //celixThreadMutex_destroy(&receiver->recvThread.mutex);
-
         nn_close(m_nanoMsgSocket);
 
         free((void*)m_scope);
@@ -211,6 +179,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
                 std::piecewise_construct,
                 std::forward_as_tuple(std::string(url)),
                 std::forward_as_tuple(url, -1));
+        entry = requestedConnections.map.find(url);
     }
     if (!entry->second.isConnected()) {
         int connection_id = nn_connect(m_nanoMsgSocket, url);
@@ -254,21 +223,26 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_prope
     }
 
     std::lock_guard<std::mutex> _lock(subscribers.mutex);
-    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
-    if (entry != NULL) {
-        entry->usageCount += 1;
+    auto entry = subscribers.map.find(bndId);
+    if (entry != subscribers.map.end()) {
+        entry->second.usageCount += 1;
     } else {
         //new create entry
-        entry = static_cast<psa_nanomsg_subscriber_entry_t*>(calloc(1, sizeof(*entry)));
-        entry->usageCount = 1;
-        entry->svc = static_cast<pubsub_subscriber_t*>(svc);
+        subscribers.map.emplace(std::piecewise_construct,
+                std::forward_as_tuple(bndId),
+                std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1));
+        entry = subscribers.map.find(bndId);
+        if (entry == subscribers.map.end()) {
+            std::cerr << "### THIS IS A VERY CRITICAL ERROR!!\n";
+        }
 
-        int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+        int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes);
         if (rc == 0) {
-            hashMap_put(subscribers.map, (void*)bndId, entry);
+            //hashMap_put(subscribers.map, (void*)bndId, entry);
+            //subscribers.map[bndId] = *entry;
         } else {
             L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic);
-            free(entry);
+            subscribers.map.erase(bndId);
         }
     }
 }
@@ -278,18 +252,17 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
     long bndId = celix_bundle_getId(bnd);
 
     std::lock_guard<std::mutex> _lock(subscribers.mutex);
-    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
-    if (entry != NULL) {
-        entry->usageCount -= 1;
-    }
-    if (entry != NULL && entry->usageCount <= 0) {
-        //remove entry
-        hashMap_remove(subscribers.map, (void*)bndId);
-        int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
-        if (rc != 0) {
-            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+    auto entry = subscribers.map.find(bndId);
+    if (entry != subscribers.map.end()) {
+        entry->second.usageCount -= 1;
+        if (entry->second.usageCount <= 0) {
+            //remove entry
+            int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
+            if (rc != 0) {
+                L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+            }
+            subscribers.map.erase(bndId);
         }
-        free(entry);
     }
 }
 
@@ -319,12 +292,13 @@ void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_s
 
 void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
     std::lock_guard<std::mutex> _lock(subscribers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
-        if (entry != NULL) {
-            processMsgForSubscriberEntry(entry, hdr, payload, payloadSize);
-        }
+    //hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
+    //while (hashMapIterator_hasNext(&iter)) {
+    for (auto entry : subscribers.map) {
+        //psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
+        //if (entry != NULL) {
+            processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize);
+        //}
     }
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 3398fb1..09d62a9 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -28,10 +28,13 @@
 #include "pubsub_nanomsg_common.h"
 #include "pubsub/subscriber.h"
 
-typedef struct psa_zmq_subscriber_entry {
+typedef struct psa_nanomsg_subscriber_entry {
+    psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) :
+    svc{_svc}, usageCount{_usageCount} {
+    }
+    pubsub_subscriber_t *svc{};
     int usageCount;
-    hash_map_t *msgTypes; //map from serializer svc
-    pubsub_subscriber_t *svc;
+    hash_map_t *msgTypes{nullptr}; //map from serializer svc
 } psa_nanomsg_subscriber_entry_t;
 
 typedef struct psa_zmq_requested_connection_entry {
@@ -116,7 +119,8 @@ namespace pubsub {
             long subscriberTrackerId{0};
             struct {
                 std::mutex mutex;
-                hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+                std::map<long, psa_nanomsg_subscriber_entry_t> map;
+                //hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
             } subscribers{};
         };
     }


[6/8] celix git commit: Nanomsg: moved charptr to std::string

Posted by er...@apache.org.
Nanomsg: moved charptr to std::string


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/883abeed
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/883abeed
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/883abeed

Branch: refs/heads/nanomsg
Commit: 883abeed00cfa3c1509b026fea888550b863defc
Parents: 120895d
Author: Erjan Altena <er...@gmail.com>
Authored: Fri Nov 23 22:35:30 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Fri Nov 23 22:35:30 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 | 56 +++++++-----------
 .../src/pubsub_nanomsg_admin.h                  |  4 +-
 .../src/pubsub_nanomsg_common.cc                |  8 +--
 .../src/pubsub_nanomsg_common.h                 |  3 +-
 .../src/pubsub_nanomsg_topic_receiver.cc        | 60 +++++++-------------
 .../src/pubsub_nanomsg_topic_receiver.h         | 26 ++++-----
 6 files changed, 63 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 3e788ae..030441d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -159,7 +159,7 @@ void pubsub_nanomsg_admin::start() {
     };
     adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
         auto me = static_cast<pubsub_nanomsg_admin*>(handle);
-        return me->setupTopicReceiver(scope, topic,serializerSvcId, subscriberEndpoint);
+        return me->setupTopicReceiver(std::string(scope), std::string(topic),serializerSvcId, subscriberEndpoint);
     };
 
     adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) {
@@ -205,7 +205,7 @@ void pubsub_nanomsg_admin::start() {
     celix_properties_t* shellProps = celix_properties_create();
     celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
     celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
-    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
+    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the nanomsg PSA");
     cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps);
 
 }
@@ -275,10 +275,9 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
             for (auto kv : topicReceivers.map){
                 auto *receiver = kv.second;
                 if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
-                    char *key = kv.first;
+                    auto key = kv.first;
                     topicReceivers.map.erase(key);
                     delete receiver;
-                    free(key);
                 }
             }
         }
@@ -338,8 +337,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
     sender = topicSenders.map.find(key)->second;
     if (sender == nullptr) {
-        //auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
-        //                                                                           (void *) serializerSvcId));
         psa_nanomsg_serializer_entry_t *serEntry = nullptr;
         auto kv = serializers.map.find(serializerSvcId);
         if (kv != serializers.map.end()) {
@@ -403,12 +400,12 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
     return status;
 }
 
-celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const char *topic,
+celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope, const std::string &topic,
                                                       long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
 
     celix_properties_t *newEndpoint = nullptr;
 
-    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    std::string key = pubsubEndpoint_createScopeTopicKey(scope.c_str(), topic.c_str());
     pubsub::nanomsg::topic_receiver * receiver = nullptr;
     {
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
@@ -423,12 +420,12 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
                 auto serEntry = kvs->second;
                 receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
             } else {
-                L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
+                L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope.c_str(), topic.c_str());
             }
             if (receiver != nullptr) {
                 const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
                 const char *serType = kvs->second->serType;
-                newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+                newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
                                                     serType, nullptr);
                 //if available also set container name
                 const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
@@ -438,11 +435,9 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
                 topicReceivers.map[key] = receiver;
             } else {
                 L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
-                free(key);
             }
         } else {
-            free(key);
-            L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+            L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope.c_str(), topic.c_str());
         }
     }
     if (receiver != nullptr && newEndpoint != nullptr) {
@@ -470,11 +465,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co
     auto entry = topicReceivers.map.find(key);
     free(key);
     if (entry != topicReceivers.map.end()) {
-        char *receiverKey = entry->first;
+        auto receiverKey = entry->first;
         pubsub::nanomsg::topic_receiver *receiver = entry->second;
         topicReceivers.map.erase(receiverKey);
 
-        free(receiverKey);
         delete receiver;
     }
 
@@ -487,22 +481,18 @@ celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = receiver->scope();
-    const char *topic = receiver->topic();
+    auto scope = receiver->scope();
+    auto topic = receiver->topic();
 
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+    std::string eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
+    std::string eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
     const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
 
     if (url == nullptr) {
-//        const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, nullptr);
-//        const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 //        L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type);
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eScope != nullptr && eTopic != nullptr &&
-            strncmp(eScope, scope, 1024 * 1024) == 0 &&
-            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+        if ((eScope == scope) && (eTopic == topic)) {
             receiver->connectTo(url);
         }
     }
@@ -537,20 +527,18 @@ celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nano
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = receiver->scope();
-    const char *topic = receiver->topic();
+    auto scope = receiver->scope();
+    auto topic = receiver->topic();
 
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+    auto eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
+    auto eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
     const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
 
     if (url == nullptr) {
         L_WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eScope != nullptr && eTopic != nullptr &&
-            strncmp(eScope, scope, 1024 * 1024) == 0 &&
-            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+        if ((eScope == scope) && (eTopic == topic)) {
             receiver->disconnectFrom(url);
         }
     }
@@ -609,14 +597,14 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
             long serSvcId = receiver->serializerSvcId();
             auto kv =  serializers.map.find(serSvcId);
             const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
-            const char *scope = receiver->scope();
-            const char *topic = receiver->topic();
+            auto scope = receiver->scope();
+            auto topic = receiver->topic();
 
             std::vector<std::string> connected{};
             std::vector<std::string> unconnected{};
             receiver->listConnections(connected, unconnected);
 
-            fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+            fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str());
             fprintf(out, "   |- serializer type = %s\n", serType);
             for (auto url : connected) {
                 fprintf(out, "   |- connected url   = %s\n", url.c_str());

http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 3e680b6..b33a3c0 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -74,7 +74,7 @@ private:
                                                         long serializerSvcId, celix_properties_t **publisherEndpoint);
     celix_status_t teardownTopicSender(const char *scope, const char *topic);
 
-    celix_status_t setupTopicReceiver(const char *scope, const char *topic,
+    celix_status_t setupTopicReceiver(const std::string &scope, const std::string &topic,
                                                           long serializerSvcId, celix_properties_t **subscriberEndpoint);
     celix_status_t teardownTopicReceiver(const char *scope, const char *topic);
 
@@ -117,7 +117,7 @@ private:
     } psa_nanomsg_serializer_entry_t;
     ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
     ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
-    ProtectedMap<char*, pubsub::nanomsg::topic_receiver*> topicReceivers{};
+    ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
     ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
 };
 

http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
index 2a2bcfe..3ecd19c 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
@@ -41,15 +41,15 @@ bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_he
     return check;
 }
 
-void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter) {
-    for (int i = 0; i < 5; ++i) {
+void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter) {
+    for (int i = 0; i < 5; ++i) { // 5 ??
         filter[i] = '\0';
     }
-    if (scope != NULL && strnlen(scope, 3) >= 2)  {
+    if (scope.size() >= 2)  { //3 ??
         filter[0] = scope[0];
         filter[1] = scope[1];
     }
-    if (topic != NULL && strnlen(topic, 3) >= 2)  {
+    if (topic.size() >= 2)  { //3 ??
         filter[2] = topic[0];
         filter[3] = topic[1];
     }

http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
index 3d5d48d..276169f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -20,6 +20,7 @@
 #ifndef CELIX_PUBSUB_ZMQ_COMMON_H
 #define CELIX_PUBSUB_ZMQ_COMMON_H
 
+#include <string>
 #include <utils.h>
 
 #include "version.h"
@@ -48,7 +49,7 @@ typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t;
 
 
 int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
-void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter);
+void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter);
 
 bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_header_t *hdr);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 2205ed2..db8469b 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -65,8 +65,8 @@
 
 pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         log_helper_t *_logHelper,
-        const char *_scope,
-        const char *_topic,
+        const std::string &_scope,
+        const std::string &_topic,
         long _serializerSvcId,
         pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
     ctx = _ctx;
@@ -76,33 +76,22 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
 
     m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
     if (m_nanoMsgSocket < 0) {
-        // TODO throw error or something
-        //free(receiver);
-        //receiver = NULL;
-        L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope, m_topic);
+        L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope.c_str(), m_topic.c_str());
+        std::bad_alloc{};
     } else {
         int timeout = PSA_NANOMSG_RECV_TIMEOUT;
         if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
                           sizeof (timeout)) < 0) {
-            // TODO throw error or something
-            //free(receiver);
-            //receiver = NULL;
-            L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope, m_topic);
+            L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope.c_str(), m_topic.c_str());
+            std::bad_alloc{};
         }
 
-        char subscribeFilter[5];
-        psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
+        char subscriberFilter[5]; // 5 ??
+        psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscriberFilter);
 
-        m_scope = strndup(m_scope, 1024 * 1024);
-        m_topic = strndup(m_topic, 1024 * 1024);
-
-        //subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
-        //std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
-        //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
-        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
+        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str());
         char buf[size + 1];
-        snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
+        snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str());
         celix_service_tracking_options_t opts{};
         opts.filter.ignoreServiceLanguage = true;
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
@@ -141,14 +130,13 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
         }
         nn_close(m_nanoMsgSocket);
 
-        free((void*)m_scope);
-        free((void*)m_topic);
 }
 
-const char* pubsub::nanomsg::topic_receiver::scope() const {
+std::string pubsub::nanomsg::topic_receiver::scope() const {
     return m_scope;
 }
-const char* pubsub::nanomsg::topic_receiver::topic() const {
+
+std::string pubsub::nanomsg::topic_receiver::topic() const {
     return m_topic;
 }
 
@@ -170,7 +158,7 @@ void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &
 
 
 void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
-    L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url);
+    L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
     auto entry  = requestedConnections.map.find(url);
@@ -193,7 +181,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
 }
 
 void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
-    L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url);
+    L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
     auto entry = requestedConnections.map.find(url);
@@ -216,8 +204,8 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
 void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props,
                                                       const celix_bundle_t *bnd) {
     long bndId = celix_bundle_getId(bnd);
-    const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
-    if (strncmp(subScope, m_scope, strlen(m_scope)) != 0) {
+    std::string subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+    if (subScope != m_scope) {
         //not the same scope. ignore
         return;
     }
@@ -232,16 +220,10 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_prope
                 std::forward_as_tuple(bndId),
                 std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1));
         entry = subscribers.map.find(bndId);
-        if (entry == subscribers.map.end()) {
-            std::cerr << "### THIS IS A VERY CRITICAL ERROR!!\n";
-        }
 
         int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes);
-        if (rc == 0) {
-            //hashMap_put(subscribers.map, (void*)bndId, entry);
-            //subscribers.map[bndId] = *entry;
-        } else {
-            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic);
+        if (rc != 0) {
+            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str());
             subscribers.map.erase(bndId);
         }
     }
@@ -259,14 +241,14 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
             //remove entry
             int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
             if (rc != 0) {
-                L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+                L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str());
             }
             subscribers.map.erase(bndId);
         }
     }
 }
 
-void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
     pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
     pubsub_subscriber_t *svc = entry->svc;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 09d62a9..2519e4a 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -28,18 +28,18 @@
 #include "pubsub_nanomsg_common.h"
 #include "pubsub/subscriber.h"
 
-typedef struct psa_nanomsg_subscriber_entry {
+struct psa_nanomsg_subscriber_entry {
     psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) :
     svc{_svc}, usageCount{_usageCount} {
     }
     pubsub_subscriber_t *svc{};
     int usageCount;
     hash_map_t *msgTypes{nullptr}; //map from serializer svc
-} psa_nanomsg_subscriber_entry_t;
+};
 
-typedef struct psa_zmq_requested_connection_entry {
+typedef struct psa_nanomsg_requested_connection_entry {
 public:
-    psa_zmq_requested_connection_entry(std::string _url, int _id, bool _connected=false):
+    psa_nanomsg_requested_connection_entry(std::string _url, int _id, bool _connected=false):
     url{_url}, id{_id}, connected{_connected} {
     }
     bool isConnected() const {
@@ -73,23 +73,23 @@ namespace pubsub {
             topic_receiver(celix_bundle_context_t
                            *ctx,
                            log_helper_t *logHelper,
-                           const char *scope,
-                           const char *topic,
+                           const std::string &scope,
+                           const std::string &topic,
                            long serializerSvcId, pubsub_serializer_service_t
                            *serializer);
             topic_receiver(const topic_receiver &) = delete;
             topic_receiver & operator=(const topic_receiver &) = delete;
             ~topic_receiver();
 
-            const char* scope() const;
-            const char* topic() const;
+            std::string scope() const;
+            std::string topic() const;
             long serializerSvcId() const;
             void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
             void connectTo(const char *url);
             void disconnectFrom(const char *url);
             void recvThread_exec();
             void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize);
-            void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
+            void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
             void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
             void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd);
 
@@ -98,8 +98,8 @@ namespace pubsub {
             log_helper_t *logHelper{nullptr};
             long m_serializerSvcId{0};
             pubsub_serializer_service_t *serializer{nullptr};
-            const char *m_scope{nullptr};
-            const char *m_topic{nullptr};
+            const std::string m_scope{};
+            const std::string m_topic{};
             char m_scopeAndTopicFilter[5];
 
             int m_nanoMsgSocket{0};
@@ -113,14 +113,12 @@ namespace pubsub {
             struct {
                 std::mutex mutex;
                 std::map<std::string, psa_nanomsg_requested_connection_entry_t> map;
-                //hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
             } requestedConnections{};
 
             long subscriberTrackerId{0};
             struct {
                 std::mutex mutex;
-                std::map<long, psa_nanomsg_subscriber_entry_t> map;
-                //hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+                std::map<long, psa_nanomsg_subscriber_entry> map;
             } subscribers{};
         };
     }


[3/8] celix git commit: nanomsg Topic receiver to class

Posted by er...@apache.org.
nanomsg Topic receiver to class


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/0abbf432
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/0abbf432
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/0abbf432

Branch: refs/heads/nanomsg
Commit: 0abbf4323838b5823b6f275ab418438727dfe289
Parents: c19a5bd
Author: Erjan Altena <er...@gmail.com>
Authored: Wed Nov 21 20:34:12 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Wed Nov 21 21:10:41 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_common.h                 |  4 +-
 .../src/pubsub_nanomsg_topic_receiver.cc        | 90 ++++++--------------
 .../src/pubsub_nanomsg_topic_receiver.h         | 42 +++++----
 3 files changed, 47 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
index 28293a8..3d5d48d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -37,14 +37,14 @@
  */
 
 
-struct pubsub_zmq_msg_header {
+struct pubsub_nanomsg_msg_header {
     //header
     unsigned int type;
     unsigned char major;
     unsigned char minor;
 };
 
-typedef struct pubsub_zmq_msg_header pubsub_nanmosg_msg_header_t;
+typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t;
 
 
 int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);

http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 889d79d..88886c6 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -17,6 +17,7 @@
  *under the License.
  */
 
+#include <iostream>
 #include <mutex>
 #include <memory.h>
 #include <vector>
@@ -61,34 +62,6 @@
 #define L_WARN printf
 #define L_ERROR printf
 
-struct pubsub_nanomsg_topic_receiver {
-    celix_bundle_context_t *ctx;
-    log_helper_t *logHelper;
-    long serializerSvcId;
-    pubsub_serializer_service_t *serializer;
-    char *scope;
-    char *topic;
-    char scopeAndTopicFilter[5];
-
-    int nanoMsgSocket;
-
-    struct {
-        celix_thread_t thread;
-        std::mutex mutex;
-        bool running;
-    } recvThread;
-
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
-    } requestedConnections;
-
-    long subscriberTrackerId;
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
-    } subscribers;
-};
 
 typedef struct psa_zmq_requested_connection_entry {
     char *url;
@@ -96,23 +69,14 @@ typedef struct psa_zmq_requested_connection_entry {
     int id;
 } psa_nanomsg_requested_connection_entry_t;
 
-typedef struct psa_zmq_subscriber_entry {
-    int usageCount;
-    hash_map_t *msgTypes; //map from serializer svc
-    pubsub_subscriber_t *svc;
-} psa_nanomsg_subscriber_entry_t;
 
 
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+                                                      const celix_bundle_t *owner);
 static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
                                                          const celix_bundle_t *owner);
-static void* psa_nanomsg_recvThread(void *data);
 
 
-//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-//                                                                    log_helper_t *logHelper, const char *scope,
-//                                                                    const char *topic, long serializerSvcId,
-//                                                                    pubsub_serializer_service_t *serializer) {
 pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         log_helper_t *_logHelper,
         const char *_scope,
@@ -149,6 +113,7 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         m_topic = strndup(m_topic, 1024 * 1024);
 
         subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
         requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
@@ -159,15 +124,13 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = this;
-        opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
+        opts.addWithOwner = pubsub_nanomsgTopicReceiver_addSubscriber;
         opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
 
         subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
         recvThread.running = true;
-        celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this);
-        std::stringstream namestream;
-        namestream << "NANOMSG TR " << m_scope << "/" << m_topic;
-        celixThread_setName(&recvThread.thread, namestream.str().c_str());
+
+        recvThread.thread = std::thread([this]() {this->recvThread_exec();});
     }
 }
 
@@ -177,7 +140,7 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
             std::lock_guard<std::mutex> _lock(recvThread.mutex);
             recvThread.running = false;
         }
-        celixThread_join(recvThread.thread, NULL);
+        recvThread.thread.join();
 
         celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
 
@@ -285,12 +248,13 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
     }
 }
 
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
-    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
+static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+                                                      const celix_bundle_t *bnd) {
+    auto *receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
 
     long bndId = celix_bundle_getId(bnd);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
-    if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+    if (strncmp(subScope, receiver->m_scope, strlen(receiver->m_scope)) != 0) {
         //not the same scope. ignore
         return;
     }
@@ -309,7 +273,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
         if (rc == 0) {
             hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
         } else {
-            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
             free(entry);
         }
     }
@@ -317,7 +281,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
 
 static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
                                                          const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
-    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
+    auto receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
 
     long bndId = celix_bundle_getId(bnd);
 
@@ -331,13 +295,13 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
         if (rc != 0) {
-            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
         }
         free(entry);
     }
 }
 
-static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
     pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
     pubsub_subscriber_t *svc = entry->svc;
 
@@ -353,7 +317,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r
                     msgSer->freeMsg(msgSer->handle, deserializedMsg);
                 }
             } else {
-                L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
+                //L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, scope, topic);
             }
         }
     } else {
@@ -361,13 +325,13 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r
     }
 }
 
-static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
-    std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+    std::lock_guard<std::mutex> _lock(subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
         if (entry != NULL) {
-            processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize);
+            processMsgForSubscriberEntry(entry, hdr, payload, payloadSize);
         }
     }
 }
@@ -377,12 +341,11 @@ struct Message {
     char payload[];
 };
 
-static void* psa_nanomsg_recvThread(void *data) {
-    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data);
+void pubsub::nanomsg::topic_receiver::recvThread_exec() {
     bool running{};
     {
-        std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
-        running = receiver->recvThread.running;
+        std::lock_guard<std::mutex> _lock(recvThread.mutex);
+        running = recvThread.running;
     }
     while (running) {
         Message *msg = nullptr;
@@ -400,9 +363,9 @@ static void* psa_nanomsg_recvThread(void *data) {
         msgHdr.msg_controllen = 0;
 
         errno = 0;
-        int recvBytes = nn_recvmsg(receiver->nanoMsgSocket, &msgHdr, 0);
+        int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0);
         if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
-            processMsg(receiver, &msg->header, msg->payload, recvBytes-sizeof(msg->header));
+            processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
             nn_freemsg(msg);
         } else if (recvBytes >= 0) {
             L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
@@ -415,5 +378,4 @@ static void* psa_nanomsg_recvThread(void *data) {
         }
     } // while
 
-    return NULL;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 6cd216b..f977917 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -16,15 +16,24 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-#ifndef CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
-#define CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
+#pragma once
 #include <string>
 #include <vector>
+#include <thread>
+#include <mutex>
 #include "pubsub_serializer.h"
 #include "log_helper.h"
 #include "celix_bundle_context.h"
+#include "pubsub_nanomsg_common.h"
+#include "pubsub/subscriber.h"
+
+typedef struct psa_zmq_subscriber_entry {
+    int usageCount;
+    hash_map_t *msgTypes; //map from serializer svc
+    pubsub_subscriber_t *svc;
+} psa_nanomsg_subscriber_entry_t;
+
 
-//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
 namespace pubsub {
     namespace nanomsg {
         class topic_receiver {
@@ -46,7 +55,10 @@ namespace pubsub {
             void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
             void connectTo(const char *url);
             void disconnectFrom(const char *url);
-        private:
+            void recvThread_exec();
+            void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize);
+            void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
+        //private:
             celix_bundle_context_t *ctx{nullptr};
             log_helper_t *logHelper{nullptr};
             long m_serializerSvcId{0};
@@ -58,7 +70,7 @@ namespace pubsub {
             int m_nanoMsgSocket{0};
 
             struct {
-                celix_thread_t thread;
+                std::thread thread;
                 std::mutex mutex;
                 bool running;
             } recvThread{};
@@ -74,22 +86,6 @@ namespace pubsub {
                 hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
             } subscribers{};
         };
-    }}
-//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-//                                                                    log_helper_t *logHelper, const char *scope,
-//                                                                    const char *topic, long serializerSvcId,
-//                                                                    pubsub_serializer_service_t *serializer);
-//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
-
-//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
-//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
-
-//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
-//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
-//                                                 std::vector<std::string> &connectedUrls,
-//                                                 std::vector<std::string> &unconnectedUrls);
-
-//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
-//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+    }
+}
 
-#endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H


[2/8] celix git commit: nanomsg topicreceiver changed to class

Posted by er...@apache.org.
nanomsg topicreceiver changed to class


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c19a5bd8
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c19a5bd8
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c19a5bd8

Branch: refs/heads/nanomsg
Commit: c19a5bd85b5c514794c6b38a1080b5382b8ca1ad
Parents: 95633eb
Author: Erjan Altena <er...@gmail.com>
Authored: Mon Nov 19 20:10:50 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Mon Nov 19 20:10:50 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 |  44 ++---
 .../src/pubsub_nanomsg_admin.h                  |   6 +-
 .../src/pubsub_nanomsg_topic_receiver.cc        | 162 +++++++++----------
 .../src/pubsub_nanomsg_topic_receiver.h         |  77 +++++++--
 4 files changed, 169 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 6c15ec8..3e788ae 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -111,7 +111,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
     {
         std::lock_guard<std::mutex> lock(topicReceivers.mutex);
         for (auto kv: topicReceivers.map) {
-            pubsub_nanoMsgTopicReceiver_destroy(kv.second);
+            delete kv.second;
         }
     }
 
@@ -274,10 +274,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
             std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
             for (auto kv : topicReceivers.map){
                 auto *receiver = kv.second;
-                if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+                if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
                     char *key = kv.first;
                     topicReceivers.map.erase(key);
-                    pubsub_nanoMsgTopicReceiver_destroy(receiver);
+                    delete receiver;
                     free(key);
                 }
             }
@@ -409,7 +409,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
     celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    pubsub_nanomsg_topic_receiver_t * receiver = nullptr;
+    pubsub::nanomsg::topic_receiver * receiver = nullptr;
     {
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
@@ -421,7 +421,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
             auto kvs = serializers.map.find(serializerSvcId);
             if (kvs != serializers.map.end()) {
                 auto serEntry = kvs->second;
-                receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+                receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
             } else {
                 L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
             }
@@ -471,24 +471,24 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co
     free(key);
     if (entry != topicReceivers.map.end()) {
         char *receiverKey = entry->first;
-        pubsub_nanomsg_topic_receiver_t *receiver = entry->second;
+        pubsub::nanomsg::topic_receiver *receiver = entry->second;
         topicReceivers.map.erase(receiverKey);
 
         free(receiverKey);
-        pubsub_nanoMsgTopicReceiver_destroy(receiver);
+        delete receiver;
     }
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
 }
 
-celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
                                                                     const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
-    const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+    const char *scope = receiver->scope();
+    const char *topic = receiver->topic();
 
     const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
     const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
@@ -503,7 +503,7 @@ celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_to
         if (eScope != nullptr && eTopic != nullptr &&
             strncmp(eScope, scope, 1024 * 1024) == 0 &&
             strncmp(eTopic, topic, 1024 * 1024) == 0) {
-            pubsub_nanoMsgTopicReceiver_connectTo(receiver, url);
+            receiver->connectTo(url);
         }
     }
 
@@ -516,7 +516,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
         for (auto entry: topicReceivers.map) {
-            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
+            pubsub::nanomsg::topic_receiver *receiver = entry.second;
             connectEndpointToReceiver(receiver, endpoint);
         }
     }
@@ -532,13 +532,13 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
 }
 
 
-celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
                                                                             const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
-    const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+    const char *scope = receiver->scope();
+    const char *topic = receiver->topic();
 
     const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
     const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
@@ -551,7 +551,7 @@ celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanom
         if (eScope != nullptr && eTopic != nullptr &&
             strncmp(eScope, scope, 1024 * 1024) == 0 &&
             strncmp(eTopic, topic, 1024 * 1024) == 0) {
-            pubsub_nanoMsgTopicReceiver_disconnectFrom(receiver, url);
+            receiver->disconnectFrom(url);
         }
     }
 
@@ -564,7 +564,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
         for (auto entry : topicReceivers.map) {
-            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
+            pubsub::nanomsg::topic_receiver *receiver = entry.second;
             disconnectEndpointFromReceiver(receiver, endpoint);
         }
     }
@@ -605,16 +605,16 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
         for (auto entry : topicReceivers.map) {
-            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
-            long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
+            pubsub::nanomsg::topic_receiver *receiver = entry.second;
+            long serSvcId = receiver->serializerSvcId();
             auto kv =  serializers.map.find(serSvcId);
             const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
-            const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
-            const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+            const char *scope = receiver->scope();
+            const char *topic = receiver->topic();
 
             std::vector<std::string> connected{};
             std::vector<std::string> unconnected{};
-            pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
+            receiver->listConnections(connected, unconnected);
 
             fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
             fprintf(out, "   |- serializer type = %s\n", serType);

http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index c34a310..3e680b6 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -84,10 +84,10 @@ private:
     celix_status_t executeCommand(char *commandLine __attribute__((unused)), FILE *out,
                                                         FILE *errStream __attribute__((unused)));
 
-    celix_status_t connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+    celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
                                                                    const celix_properties_t *endpoint);
 
-    celix_status_t disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+    celix_status_t disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
                                                                         const celix_properties_t *endpoint);
     celix_bundle_context_t *ctx;
     log_helper_t *log;
@@ -117,7 +117,7 @@ private:
     } psa_nanomsg_serializer_entry_t;
     ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
     ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
-    ProtectedMap<char*, pubsub_nanomsg_topic_receiver_t*> topicReceivers{};
+    ProtectedMap<char*, pubsub::nanomsg::topic_receiver*> topicReceivers{};
     ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
 };
 

http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 42f6423..889d79d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -109,92 +109,96 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc
 static void* psa_nanomsg_recvThread(void *data);
 
 
-pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-                                                                    log_helper_t *logHelper, const char *scope,
-                                                                    const char *topic, long serializerSvcId,
-                                                                    pubsub_serializer_service_t *serializer) {
-    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
-    receiver->ctx = ctx;
-    receiver->logHelper = logHelper;
-    receiver->serializerSvcId = serializerSvcId;
-    receiver->serializer = serializer;
-    psa_nanomsg_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
-
-
-    receiver->nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
-    if (receiver->nanoMsgSocket < 0) {
-        free(receiver);
-        receiver = NULL;
-        L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", scope, topic);
+//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+//                                                                    log_helper_t *logHelper, const char *scope,
+//                                                                    const char *topic, long serializerSvcId,
+//                                                                    pubsub_serializer_service_t *serializer) {
+pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
+        log_helper_t *_logHelper,
+        const char *_scope,
+        const char *_topic,
+        long _serializerSvcId,
+        pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
+    //pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
+    ctx = _ctx;
+    logHelper = _logHelper;
+    serializer = _serializer;
+    psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, m_scopeAndTopicFilter);
+
+    m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
+    if (m_nanoMsgSocket < 0) {
+        // TODO throw error or something
+        //free(receiver);
+        //receiver = NULL;
+        L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope, m_topic);
     } else {
         int timeout = PSA_NANOMSG_RECV_TIMEOUT;
-        if (nn_setsockopt(receiver->nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
+        if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
                           sizeof (timeout)) < 0) {
-            free(receiver);
-            receiver = NULL;
-            L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", scope, topic);
+            // TODO throw error or something
+            //free(receiver);
+            //receiver = NULL;
+            L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope, m_topic);
         }
 
         char subscribeFilter[5];
-        psa_nanomsg_setScopeAndTopicFilter(scope, topic, subscribeFilter);
+        psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
         //zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter);
 
-        receiver->scope = strndup(scope, 1024 * 1024);
-        receiver->topic = strndup(topic, 1024 * 1024);
+        m_scope = strndup(m_scope, 1024 * 1024);
+        m_topic = strndup(m_topic, 1024 * 1024);
 
-        receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
-        receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+        subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
-        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
         char buf[size + 1];
-        snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
         celix_service_tracking_options_t opts{};
         opts.filter.ignoreServiceLanguage = true;
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
-        opts.callbackHandle = receiver;
+        opts.callbackHandle = this;
         opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
         opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
 
-        receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-        receiver->recvThread.running = true;
-        celixThread_create(&receiver->recvThread.thread, NULL, psa_nanomsg_recvThread, receiver);
+        subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+        recvThread.running = true;
+        celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this);
         std::stringstream namestream;
-        namestream << "NANOMSG TR " << scope << "/" << topic;
-        celixThread_setName(&receiver->recvThread.thread, namestream.str().c_str());
+        namestream << "NANOMSG TR " << m_scope << "/" << m_topic;
+        celixThread_setName(&recvThread.thread, namestream.str().c_str());
     }
-    return receiver;
 }
 
-void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver) {
-    if (receiver != NULL) {
+pubsub::nanomsg::topic_receiver::~topic_receiver() {
 
         {
-            std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
-            receiver->recvThread.running = false;
+            std::lock_guard<std::mutex> _lock(recvThread.mutex);
+            recvThread.running = false;
         }
-        celixThread_join(receiver->recvThread.thread, NULL);
+        celixThread_join(recvThread.thread, NULL);
 
-        celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+        celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
 
         hash_map_iterator_t iter=hash_map_iterator_t();
         {
-            std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
-            iter = hashMapIterator_construct(receiver->subscribers.map);
+            std::lock_guard<std::mutex> _lock(subscribers.mutex);
+            iter = hashMapIterator_construct(subscribers.map);
             while (hashMapIterator_hasNext(&iter)) {
                 psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
                 if (entry != NULL)  {
-                    receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                    serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
                     free(entry);
                 }
             }
-            hashMap_destroy(receiver->subscribers.map, false, false);
+            hashMap_destroy(subscribers.map, false, false);
         }
 
 
         {
-            std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
-            iter = hashMapIterator_construct(receiver->requestedConnections.map);
+            std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+            iter = hashMapIterator_construct(requestedConnections.map);
             while (hashMapIterator_hasNext(&iter)) {
                 psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
                 if (entry != NULL) {
@@ -202,37 +206,34 @@ void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiv
                     free(entry);
                 }
             }
-            hashMap_destroy(receiver->requestedConnections.map, false, false);
+            hashMap_destroy(requestedConnections.map, false, false);
         }
 
         //celixThreadMutex_destroy(&receiver->subscribers.mutex);
         //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
         //celixThreadMutex_destroy(&receiver->recvThread.mutex);
 
-        nn_close(receiver->nanoMsgSocket);
+        nn_close(m_nanoMsgSocket);
 
-        free(receiver->scope);
-        free(receiver->topic);
-    }
-    free(receiver);
+        free((void*)m_scope);
+        free((void*)m_topic);
 }
 
-const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver) {
-    return receiver->scope;
+const char* pubsub::nanomsg::topic_receiver::scope() const {
+    return m_scope;
 }
-const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver) {
-    return receiver->topic;
+const char* pubsub::nanomsg::topic_receiver::topic() const {
+    return m_topic;
 }
 
-long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver) {
-    return receiver->serializerSvcId;
+long pubsub::nanomsg::topic_receiver::serializerSvcId() const {
+    return m_serializerSvcId;
 }
 
-void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
-                                                 std::vector<std::string> &connectedUrls,
+void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls,
                                                  std::vector<std::string> &unconnectedUrls) {
-    std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(requestedConnections.map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter));
         if (entry->connected) {
@@ -244,21 +245,19 @@ void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t
 }
 
 
-void pubsub_nanoMsgTopicReceiver_connectTo(
-        pubsub_nanomsg_topic_receiver_t *receiver,
-        const char *url) {
-    L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope, receiver->topic, url);
+void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
+    L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url);
 
-    std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
-    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(receiver->requestedConnections.map, url));
+    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(requestedConnections.map, url));
     if (entry == NULL) {
         entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry)));
         entry->url = strndup(url, 1024*1024);
         entry->connected = false;
-        hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
+        hashMap_put(requestedConnections.map, (void*)entry->url, entry);
     }
     if (!entry->connected) {
-        int connection_id = nn_connect(receiver->nanoMsgSocket, url);
+        int connection_id = nn_connect(m_nanoMsgSocket, url);
         if (connection_id >= 0) {
             entry->connected = true;
             entry->id = connection_id;
@@ -268,13 +267,13 @@ void pubsub_nanoMsgTopicReceiver_connectTo(
     }
 }
 
-void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url) {
-    L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope, receiver->topic, url);
+void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
+    L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url);
 
-    std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
-    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(receiver->requestedConnections.map, url));
+    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(requestedConnections.map, url));
     if (entry != NULL && entry->connected) {
-        if (nn_shutdown(receiver->nanoMsgSocket, entry->id) == 0) {
+        if (nn_shutdown(m_nanoMsgSocket, entry->id) == 0) {
             entry->connected = false;
         } else {
             L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno));
@@ -287,7 +286,7 @@ void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t
 }
 
 static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
-    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
 
     long bndId = celix_bundle_getId(bnd);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -318,7 +317,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
 
 static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
                                                          const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
-    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
 
     long bndId = celix_bundle_getId(bnd);
 
@@ -338,7 +337,7 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s
     }
 }
 
-static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
     pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
     pubsub_subscriber_t *svc = entry->svc;
 
@@ -362,7 +361,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t
     }
 }
 
-static inline void processMsg(pubsub_nanomsg_topic_receiver_t *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
     std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -377,8 +376,9 @@ struct Message {
     pubsub_nanmosg_msg_header_t header;
     char payload[];
 };
+
 static void* psa_nanomsg_recvThread(void *data) {
-    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(data);
+    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data);
     bool running{};
     {
         std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);

http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index d584db8..6cd216b 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -24,23 +24,72 @@
 #include "log_helper.h"
 #include "celix_bundle_context.h"
 
-typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+namespace pubsub {
+    namespace nanomsg {
+        class topic_receiver {
+        public:
+            topic_receiver(celix_bundle_context_t
+                           *ctx,
+                           log_helper_t *logHelper,
+                           const char *scope,
+                           const char *topic,
+                           long serializerSvcId, pubsub_serializer_service_t
+                           *serializer);
+            topic_receiver(const topic_receiver &) = delete;
+            topic_receiver & operator=(const topic_receiver &) = delete;
+            ~topic_receiver();
 
-pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-                                                                    log_helper_t *logHelper, const char *scope,
-                                                                    const char *topic, long serializerSvcId,
-                                                                    pubsub_serializer_service_t *serializer);
-void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+            const char* scope() const;
+            const char* topic() const;
+            long serializerSvcId() const;
+            void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
+            void connectTo(const char *url);
+            void disconnectFrom(const char *url);
+        private:
+            celix_bundle_context_t *ctx{nullptr};
+            log_helper_t *logHelper{nullptr};
+            long m_serializerSvcId{0};
+            pubsub_serializer_service_t *serializer{nullptr};
+            const char *m_scope{nullptr};
+            const char *m_topic{nullptr};
+            char m_scopeAndTopicFilter[5];
 
-const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
-const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+            int m_nanoMsgSocket{0};
 
-long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
-void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
-                                                 std::vector<std::string> &connectedUrls,
-                                                 std::vector<std::string> &unconnectedUrls);
+            struct {
+                celix_thread_t thread;
+                std::mutex mutex;
+                bool running;
+            } recvThread{};
 
-void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
-void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+            struct {
+                std::mutex mutex;
+                hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+            } requestedConnections{};
+
+            long subscriberTrackerId{0};
+            struct {
+                std::mutex mutex;
+                hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+            } subscribers{};
+        };
+    }}
+//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+//                                                                    log_helper_t *logHelper, const char *scope,
+//                                                                    const char *topic, long serializerSvcId,
+//                                                                    pubsub_serializer_service_t *serializer);
+//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+
+//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
+//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+
+//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
+//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
+//                                                 std::vector<std::string> &connectedUrls,
+//                                                 std::vector<std::string> &unconnectedUrls);
+
+//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
 
 #endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H


[8/8] celix git commit: NanoMsg

Posted by er...@apache.org.
NanoMsg


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7c141424
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7c141424
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7c141424

Branch: refs/heads/nanomsg
Commit: 7c141424d925afa98a83bc693649dc3500354965
Parents: cdefb0d
Author: Erjan Altena <er...@gmail.com>
Authored: Tue Nov 27 21:02:18 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Tue Nov 27 21:02:18 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 |  42 ++--
 .../src/pubsub_nanomsg_admin.h                  |   2 +-
 .../src/pubsub_nanomsg_topic_sender.cc          | 240 ++++++++-----------
 .../src/pubsub_nanomsg_topic_sender.h           |  66 ++++-
 4 files changed, 178 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index cf516ee..42ed632 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -99,20 +99,20 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
         std::lock_guard<std::mutex> lock(topicSenders.mutex);
         for (auto kv : topicSenders.map) {
             auto *sender = kv.second;
-            pubsub_nanoMsgTopicSender_destroy(sender);
+            delete (sender);
         }
     }
 
     {
         std::lock_guard<std::mutex> lock(topicReceivers.mutex);
-        for (auto kv: topicReceivers.map) {
+        for (auto &kv: topicReceivers.map) {
             delete kv.second;
         }
     }
 
     {
         std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
-        for (auto entry : discoveredEndpoints.map) {
+        for (auto &entry : discoveredEndpoints.map) {
             auto *ep = entry.second;
             celix_properties_destroy(ep);
         }
@@ -252,10 +252,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
             std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
                 for (auto kv: topicSenders.map) {
                 auto *sender = kv.second;
-                if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+                if (sender != nullptr && entry.svcId == sender->getSerializerSvcId()) {
                     char *key = kv.first;
                     topicSenders.map.erase(kv.first);
-                    pubsub_nanoMsgTopicSender_destroy(sender);
+                    delete (sender);
                     free(key);
                 }
             }
@@ -263,7 +263,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
 
         {
             std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
-            for (auto kv : topicReceivers.map){
+            for (auto &kv : topicReceivers.map){
                 auto *receiver = kv.second;
                 if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
                     auto key = kv.first;
@@ -322,7 +322,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
     celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    pubsub_nanomsg_topic_sender_t *sender = nullptr;
+    pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = nullptr;
     std::lock_guard<std::mutex> serializerLock(serializers.mutex);
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
     sender = topicSenders.map.find(key)->second;
@@ -333,7 +333,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
             serEntry = &kv->second;
         }
         if (serEntry != nullptr) {
-            sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
+            sender = new pubsub::nanomsg::pubsub_nanomsg_topic_sender(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
                                                       basePort, maxPort);
         }
         if (sender != nullptr) {
@@ -341,7 +341,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
             const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType,
                                                 nullptr);
-            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
+            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, sender->getUrl());
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
             if (cn != nullptr) {
@@ -378,10 +378,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
     auto kv = topicSenders.map.find(key);
     if (kv != topicSenders.map.end()) {
         char *mapKey = kv->first;
-        pubsub_nanomsg_topic_sender_t *sender = kv->second;
+        pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kv->second;
         free(mapKey);
         //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
-        pubsub_nanoMsgTopicSender_destroy(sender);
+        delete (sender);
     } else {
         L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
     }
@@ -495,7 +495,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
-        for (auto entry: topicReceivers.map) {
+        for (auto &entry: topicReceivers.map) {
             pubsub::nanomsg::topic_receiver *receiver = entry.second;
             connectEndpointToReceiver(receiver, endpoint);
         }
@@ -541,7 +541,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        for (auto entry : topicReceivers.map) {
+        for (auto &entry : topicReceivers.map) {
             pubsub::nanomsg::topic_receiver *receiver = entry.second;
             disconnectEndpointFromReceiver(receiver, endpoint);
         }
@@ -564,13 +564,13 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
         for (auto kvts: topicSenders.map) {
-            pubsub_nanomsg_topic_sender_t *sender = kvts.second;
-            long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
+            pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kvts.second;
+            long serSvcId = sender->getSerializerSvcId();
             auto kvs = serializers.map.find(serSvcId);
             const char* serType = ( kvs == serializers.map.end() ? "!Error" :  kvs->second.serType);
-            const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
-            const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
-            const char *url = pubsub_nanoMsgTopicSender_url(sender);
+            const char *scope = sender->getScope();
+            const char *topic = sender->getTopic();
+            const char *url = sender->getUrl();
             fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
             fprintf(out, "   |- serializer type = %s\n", serType);
             fprintf(out, "   |- url             = %s\n", url);
@@ -582,7 +582,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
         fprintf(out, "\nTopic Receivers:\n");
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        for (auto entry : topicReceivers.map) {
+        for (auto &entry : topicReceivers.map) {
             pubsub::nanomsg::topic_receiver *receiver = entry.second;
             long serSvcId = receiver->serializerSvcId();
             auto kv =  serializers.map.find(serSvcId);
@@ -596,10 +596,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
 
             fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str());
             fprintf(out, "   |- serializer type = %s\n", serType);
-            for (auto url : connected) {
+            for (auto &url : connected) {
                 fprintf(out, "   |- connected url   = %s\n", url.c_str());
             }
-            for (auto url : unconnected) {
+            for (auto &url : unconnected) {
                 fprintf(out, "   |- unconnected url = %s\n", url.c_str());
             }
         }

http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 689ae20..7c2e9a0 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -119,7 +119,7 @@ private:
         pubsub_serializer_service_t *svc;
     } psa_nanomsg_serializer_entry_t;
     ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
-    ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
+    ProtectedMap<char*, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> topicSenders{};
     ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
     ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
 };

http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index d5ed28f..1c75e71 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -28,11 +28,9 @@
 #include <nanomsg/bus.h>
 
 
-#include <pubsub_serializer.h>
 #include <pubsub_constants.h>
 #include <pubsub/publisher.h>
 #include <pubsub_common.h>
-#include <log_helper.h>
 #include "pubsub_nanomsg_topic_sender.h"
 #include "pubsub_psa_nanomsg_constants.h"
 #include "pubsub_nanomsg_common.h"
@@ -41,69 +39,47 @@
 #define NANOMSG_BIND_MAX_RETRY                      10
 
 #define L_DEBUG(...) \
-    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+    logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
-    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+    logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
 #define L_WARN(...) \
-    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+    logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
 #define L_ERROR(...) \
-    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-
-struct pubsub_nanomsg_topic_sender {
-    celix_bundle_context_t *ctx;
-    log_helper_t *logHelper;
-    long serializerSvcId;
-    pubsub_serializer_service_t *serializer;
-
-    char *scope;
-    char *topic;
-    char scopeAndTopicFilter[5];
-    char *url;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        int socket;
-    } nanomsg;
-
-    struct {
-        long svcId;
-        celix_service_factory_t factory;
-    } publisher;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        hash_map_t *map;  //key = bndId, value = psa_nanomsg_bounded_service_entry_t
-    } boundedServices;
-};
+    logHelper_log(logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
 
 typedef struct psa_nanomsg_bounded_service_entry {
-    pubsub_nanomsg_topic_sender_t *parent;
+    pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent;
     pubsub_publisher_t service;
     long bndId;
     hash_map_t *msgTypes;
     int getCount;
 } psa_nanomsg_bounded_service_entry_t;
 
-static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
-                                             const celix_properties_t *svcProperties);
-static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
-                                              const celix_properties_t *svcProperties);
+//static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+//                                             const celix_properties_t *svcProperties);
+//static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+//                                              const celix_properties_t *svcProperties);
 static unsigned int rand_range(unsigned int min, unsigned int max);
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender);
+static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender);
 
 static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg);
 
-pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
-                                                                const char *scope, const char *topic,
-                                                                long serializerSvcId, pubsub_serializer_service_t *ser,
-                                                                const char *bindIP, unsigned int basePort,
-                                                                unsigned int maxPort) {
-    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(calloc(1, sizeof(*sender)));
-    sender->ctx = ctx;
-    sender->logHelper = logHelper;
-    sender->serializerSvcId = serializerSvcId;
-    sender->serializer = ser;
-    psa_nanomsg_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx,
+                                                         log_helper_t *_logHelper,
+                                                         const char *_scope,
+                                                         const char *_topic,
+                                                         long _serializerSvcId,
+                                                         pubsub_serializer_service_t *_ser,
+                                                         const char *_bindIp,
+                                                         unsigned int _basePort,
+                                                         unsigned int _maxPort) :
+        ctx{_ctx},
+        logHelper{_logHelper},
+        serializerSvcId {_serializerSvcId},
+        serializer{_ser}{
+
+    psa_nanomsg_setScopeAndTopicFilter(_scope, _topic, scopeAndTopicFilter);
 
     //setting up nanomsg socket for nanomsg TopicSender
     {
@@ -116,10 +92,10 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
         int rv = -1, retry=0;
         while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
             /* Randomized part due to same bundle publishing on different topics */
-            unsigned int port = rand_range(basePort,maxPort);
-            size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
-            char *url = static_cast<char*>(calloc(len, sizeof(char*)));
-            snprintf(url, len, "tcp://%s:%u", bindIP, port);
+            unsigned int port = rand_range(_basePort,_maxPort);
+            size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", _bindIp, port) + 1;
+            char *_url = static_cast<char*>(calloc(len, sizeof(char*)));
+            snprintf(_url, len, "tcp://%s:%u", _bindIp, port);
 
             len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
             char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
@@ -127,165 +103,155 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
             rv = nn_bind (socket, bindUrl);
             if (rv == -1) {
                 perror("Error for nn_bind");
-                free(url);
+                free(_url);
             } else {
-                sender->url = url;
-                sender->nanomsg.socket = socket;
+                this->url = _url;
+                nanomsg.socket = socket;
             }
             retry++;
             free(bindUrl);
         }
     }
 
-    if (sender->url != NULL) {
-        sender->scope = strndup(scope, 1024 * 1024);
-        sender->topic = strndup(topic, 1024 * 1024);
+    if (url != NULL) {
+        scope = strndup(_scope, 1024 * 1024);
+        topic = strndup(_topic, 1024 * 1024);
 
-        celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
-        celixThreadMutex_create(&sender->nanomsg.mutex, NULL);
-        sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+        celixThreadMutex_create(&boundedServices.mutex, NULL);
+        celixThreadMutex_create(&nanomsg.mutex, NULL);
+        boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
     }
 
     //register publisher services using a service factory
-    if (sender->url != NULL) {
-        sender->publisher.factory.handle = sender;
-        sender->publisher.factory.getService = psa_nanomsg_getPublisherService;
-        sender->publisher.factory.ungetService = psa_nanomsg_ungetPublisherService;
+    if (url != NULL) {
+        publisher.factory.handle = this;
+        publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
+            return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService(
+                    requestingBundle,
+                    svcProperties);
+        };
+        publisher.factory.ungetService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
+            return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService(
+                    requestingBundle,
+                    svcProperties);
+        };
 
         celix_properties_t *props = celix_properties_create();
-        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
-        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic);
+        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope);
 
         celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
-        opts.factory = &sender->publisher.factory;
+        opts.factory = &publisher.factory;
         opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
         opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
         opts.properties = props;
 
-        sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
-    }
-
-    if (sender->url == NULL) {
-        free(sender);
-        sender = NULL;
+        publisher.svcId = celix_bundleContext_registerServiceWithOptions(_ctx, &opts);
     }
 
-    return sender;
 }
 
-void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender) {
-    if (sender != NULL) {
-        celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() {
+    celix_bundleContext_unregisterService(ctx, publisher.svcId);
 
-        nn_close(sender->nanomsg.socket);
+    nn_close(nanomsg.socket);
 
-        celixThreadMutex_lock(&sender->boundedServices.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
-            if (entry != NULL) {
-                sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
-                free(entry);
-            }
+    celixThreadMutex_lock(&boundedServices.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(boundedServices.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
+        if (entry != NULL) {
+            serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
+            free(entry);
         }
-        hashMap_destroy(sender->boundedServices.map, false, false);
-        celixThreadMutex_unlock(&sender->boundedServices.mutex);
-
-        celixThreadMutex_destroy(&sender->boundedServices.mutex);
-        celixThreadMutex_destroy(&sender->nanomsg.mutex);
-
-        free(sender->scope);
-        free(sender->topic);
-        free(sender->url);
-        free(sender);
     }
-}
+    hashMap_destroy(boundedServices.map, false, false);
+    celixThreadMutex_unlock(&boundedServices.mutex);
 
-long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender) {
-    return sender->serializerSvcId;
-}
+    celixThreadMutex_destroy(&boundedServices.mutex);
+    celixThreadMutex_destroy(&nanomsg.mutex);
 
-const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender) {
-    return sender->scope;
+    free(scope);
+    free(topic);
+    free(url);
 }
 
-const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender) {
-    return sender->topic;
+long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const {
+    return serializerSvcId;
 }
 
-const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender) {
-    return sender->url;
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const {
+    return scope;
 }
 
-void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
-    //TODO subscriber count -> topic info
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const {
+    return topic;
 }
 
-void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
-    //TODO
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const  {
+    return url;
 }
 
-static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+
+void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle,
                                              const celix_properties_t *svcProperties __attribute__((unused))) {
-    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
     long bndId = celix_bundle_getId(requestingBundle);
 
-    celixThreadMutex_lock(&sender->boundedServices.mutex);
-    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+    celixThreadMutex_lock(&boundedServices.mutex);
+    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId));
     if (entry != NULL) {
         entry->getCount += 1;
     } else {
         entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(calloc(1, sizeof(*entry)));
         entry->getCount = 1;
-        entry->parent = sender;
+        entry->parent = this;
         entry->bndId = bndId;
 
-        int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+        int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
         if (rc == 0) {
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_nanoMsg_localMsgTypeIdForMsgType;
             entry->service.send = psa_nanomsg_topicPublicationSend;
             entry->service.sendMultipart = NULL; //not supported TODO remove
-            hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+            hashMap_put(boundedServices.map, (void*)bndId, entry);
         } else {
-            L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", sender->scope, sender->topic);
+            L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", scope, topic);
         }
 
 
 
     }
-    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    celixThreadMutex_unlock(&boundedServices.mutex);
 
     return &entry->service;
 }
 
-static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle,
                                               const celix_properties_t *svcProperties __attribute__((unused))) {
-    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
     long bndId = celix_bundle_getId(requestingBundle);
 
-    celixThreadMutex_lock(&sender->boundedServices.mutex);
-    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+    celixThreadMutex_lock(&boundedServices.mutex);
+    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId));
     if (entry != NULL) {
         entry->getCount -= 1;
     }
     if (entry != NULL && entry->getCount == 0) {
         //free entry
-        hashMap_remove(sender->boundedServices.map, (void*)bndId);
-        int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+        hashMap_remove(boundedServices.map, (void*)bndId);
+        int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
         if (rc != 0) {
             L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
         }
 
         free(entry);
     }
-    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    celixThreadMutex_unlock(&boundedServices.mutex);
 }
 
 static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) {
     int status = CELIX_SUCCESS;
     psa_nanomsg_bounded_service_entry_t *bound = static_cast<psa_nanomsg_bounded_service_entry_t*>(handle);
-    pubsub_nanomsg_topic_sender_t *sender = bound->parent;
+    pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = bound->parent;
 
     pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId));
 
@@ -329,27 +295,27 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId
             int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 );
             free(serializedOutput);
             if (rc < 0) {
-                L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno));
+                //TODO L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno));
             } else {
-                L_INFO("[PSA_ZMQ_TS] Send message with size %d\n",  rc);
-                L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n",  msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
+                //TODO L_INFO("[PSA_ZMQ_TS] Send message with size %d\n",  rc);
+                //TODO L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n",  msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
             }
         } else {
-            L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
+            //TODO L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
         }
     } else {
         status = CELIX_SERVICE_EXCEPTION;
-        L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+        //TODO L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
     }
     return status;
 }
 
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) {
+static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender */*sender*/) {
 
     static bool firstSend = true;
 
     if(firstSend){
-        L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+        //TODO L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
         sleep(FIRST_SEND_DELAY_IN_SECONDS);
         firstSend = false;
     }

http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
index ec85c37..90ab6ce 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
@@ -20,23 +20,63 @@
 #define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
 
 #include "celix_bundle_context.h"
+#include <log_helper.h>
+#include <pubsub_serializer.h>
 
-typedef struct pubsub_nanomsg_topic_sender pubsub_nanomsg_topic_sender_t;
+namespace pubsub {
+    namespace nanomsg {
+        class pubsub_nanomsg_topic_sender {
+        public:
+            pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, const char *_scope,
+                                        const char *_topic, long _serializerSvcId, pubsub_serializer_service_t *_ser,
+                                        const char *_bindIp, unsigned int _basePort, unsigned int _maxPort);
 
-pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
-                                                                const char *scope, const char *topic,
-                                                                long serializerSvcId, pubsub_serializer_service_t *ser,
-                                                                const char *bindIP, unsigned int basePort,
-                                                                unsigned int maxPort);
-void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender);
+            ~pubsub_nanomsg_topic_sender();
 
-const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender);
-const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender);
-const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender);
+            pubsub_nanomsg_topic_sender(const pubsub_nanomsg_topic_sender &) = delete;
 
-long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender);
+            const pubsub_nanomsg_topic_sender &operator=(const pubsub_nanomsg_topic_sender &) = delete;
 
-void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
-void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
+            long getSerializerSvcId() const ;
+            const char *getScope() const ;
+            const char *getTopic() const ;
+            const char *getUrl() const;
+
+            void* getPublisherService(const celix_bundle_t *requestingBundle,
+                    const celix_properties_t *svcProperties __attribute__((unused)));
+            void ungetPublisherService(const celix_bundle_t *requestingBundle,
+                        const celix_properties_t *svcProperties __attribute__((unused)));
+            int topicPublicationSend(unsigned int msgTypeId, const void *inMsg);
+            void delay_first_send_for_late_joiners() ;
+
+
+                //private:
+            celix_bundle_context_t *ctx;
+            log_helper_t *logHelper;
+            long serializerSvcId;
+            pubsub_serializer_service_t *serializer;
+
+            char *scope{};
+            char *topic{};
+            char scopeAndTopicFilter[5];
+            char *url{};
+
+            struct {
+                celix_thread_mutex_t mutex;
+                int socket;
+            } nanomsg{};
+
+            struct {
+                long svcId;
+                celix_service_factory_t factory;
+            } publisher{};
+
+            struct {
+                celix_thread_mutex_t mutex{};
+                hash_map_t *map{};  //key = bndId, value = psa_nanomsg_bounded_service_entry_t
+            } boundedServices{};
+        };
+    }
+}
 
 #endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H


[4/8] celix git commit: nanomsg celix-map replaced by std::map

Posted by er...@apache.org.
nanomsg celix-map replaced by std::map


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/8658738d
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/8658738d
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/8658738d

Branch: refs/heads/nanomsg
Commit: 8658738d5e9a905eb0642ea605d36e50dc24730a
Parents: 0abbf43
Author: Erjan Altena <er...@gmail.com>
Authored: Wed Nov 21 21:28:32 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Wed Nov 21 21:28:32 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_topic_receiver.cc        | 128 +++++++++----------
 .../src/pubsub_nanomsg_topic_receiver.h         |  37 +++++-
 2 files changed, 95 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/8658738d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 88886c6..8acf6b1 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -63,18 +63,11 @@
 #define L_ERROR printf
 
 
-typedef struct psa_zmq_requested_connection_entry {
-    char *url;
-    bool connected;
-    int id;
-} psa_nanomsg_requested_connection_entry_t;
 
 
 
-static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                      const celix_bundle_t *owner);
-static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                         const celix_bundle_t *owner);
+//static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
+//                                                         const celix_bundle_t *owner);
 
 
 pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
@@ -83,7 +76,6 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         const char *_topic,
         long _serializerSvcId,
         pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
-    //pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
     ctx = _ctx;
     logHelper = _logHelper;
     serializer = _serializer;
@@ -107,14 +99,13 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
 
         char subscribeFilter[5];
         psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
-        //zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter);
 
         m_scope = strndup(m_scope, 1024 * 1024);
         m_topic = strndup(m_topic, 1024 * 1024);
 
         subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
         std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
-        requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+        //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
         char buf[size + 1];
@@ -124,8 +115,12 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = this;
-        opts.addWithOwner = pubsub_nanomsgTopicReceiver_addSubscriber;
-        opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
+        opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) {
+            static_cast<topic_receiver*>(handle)->addSubscriber(svc, props, svcOwner);
+        };
+        opts.removeWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) {
+            static_cast<topic_receiver*>(handle)->removeSubscriber(svc, props, svcOwner);
+        };
 
         subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
         recvThread.running = true;
@@ -159,18 +154,18 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
         }
 
 
-        {
-            std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-            iter = hashMapIterator_construct(requestedConnections.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
-                if (entry != NULL) {
-                    free(entry->url);
-                    free(entry);
-                }
-            }
-            hashMap_destroy(requestedConnections.map, false, false);
-        }
+//        {
+//            std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+//            iter = hashMapIterator_construct(requestedConnections.map);
+//            while (hashMapIterator_hasNext(&iter)) {
+//                psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
+//                if (entry != NULL) {
+//                    free(entry->url);
+//                    free(entry);
+//                }
+//            }
+//            hashMap_destroy(requestedConnections.map, false, false);
+//        }
 
         //celixThreadMutex_destroy(&receiver->subscribers.mutex);
         //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
@@ -196,13 +191,11 @@ long pubsub::nanomsg::topic_receiver::serializerSvcId() const {
 void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls,
                                                  std::vector<std::string> &unconnectedUrls) {
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(requestedConnections.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter));
-        if (entry->connected) {
-            connectedUrls.push_back(std::string(entry->url));
+    for (auto entry : requestedConnections.map) {
+        if (entry.second.isConnected()) {
+            connectedUrls.push_back(std::string(entry.second.getUrl()));
         } else {
-            unconnectedUrls.push_back(std::string(entry->url));
+            unconnectedUrls.push_back(std::string(entry.second.getUrl()));
         }
     }
 }
@@ -212,18 +205,18 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
     L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(requestedConnections.map, url));
-    if (entry == NULL) {
-        entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry)));
-        entry->url = strndup(url, 1024*1024);
-        entry->connected = false;
-        hashMap_put(requestedConnections.map, (void*)entry->url, entry);
+    auto entry  = requestedConnections.map.find(url);
+    if (entry == requestedConnections.map.end()) {
+        requestedConnections.map.emplace(
+                std::piecewise_construct,
+                std::forward_as_tuple(std::string(url)),
+                std::forward_as_tuple(url, -1));
     }
-    if (!entry->connected) {
+    if (!entry->second.isConnected()) {
         int connection_id = nn_connect(m_nanoMsgSocket, url);
         if (connection_id >= 0) {
-            entry->connected = true;
-            entry->id = connection_id;
+            entry->second.setConnected(true);
+            entry->second.setId(connection_id);
         } else {
             L_WARN("[PSA_NANOMSG] Error connecting to NANOMSG url %s. (%s)", url, strerror(errno));
         }
@@ -234,33 +227,34 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
     L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(requestedConnections.map, url));
-    if (entry != NULL && entry->connected) {
-        if (nn_shutdown(m_nanoMsgSocket, entry->id) == 0) {
-            entry->connected = false;
-        } else {
-            L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno));
+    auto entry = requestedConnections.map.find(url);
+    if (entry != requestedConnections.map.end()) {
+        if (entry->second.isConnected()) {
+            if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) {
+                entry->second.setConnected(false);
+            } else {
+                L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->second.getId(),
+                       strerror(errno));
+            }
         }
-    }
-    if (entry != NULL) {
-        free(entry->url);
-        free(entry);
+        requestedConnections.map.erase(url);
+        std::cerr << "REMOVING connection " << url << std::endl;
+    } else {
+        std::cerr << "Disconnecting from unknown URL " << url << std::endl;
     }
 }
 
-static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props,
                                                       const celix_bundle_t *bnd) {
-    auto *receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
-
     long bndId = celix_bundle_getId(bnd);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
-    if (strncmp(subScope, receiver->m_scope, strlen(receiver->m_scope)) != 0) {
+    if (strncmp(subScope, m_scope, strlen(m_scope)) != 0) {
         //not the same scope. ignore
         return;
     }
 
-    std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
-    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId));
+    std::lock_guard<std::mutex> _lock(subscribers.mutex);
+    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
     if (entry != NULL) {
         entry->usageCount += 1;
     } else {
@@ -269,33 +263,31 @@ static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, c
         entry->usageCount = 1;
         entry->svc = static_cast<pubsub_subscriber_t*>(svc);
 
-        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+        int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
         if (rc == 0) {
-            hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+            hashMap_put(subscribers.map, (void*)bndId, entry);
         } else {
-            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
+            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic);
             free(entry);
         }
     }
 }
 
-static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
+void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
                                                          const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
-    auto receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
-
     long bndId = celix_bundle_getId(bnd);
 
-    std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
-    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId));
+    std::lock_guard<std::mutex> _lock(subscribers.mutex);
+    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
     if (entry != NULL) {
         entry->usageCount -= 1;
     }
     if (entry != NULL && entry->usageCount <= 0) {
         //remove entry
-        hashMap_remove(receiver->subscribers.map, (void*)bndId);
-        int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+        hashMap_remove(subscribers.map, (void*)bndId);
+        int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
         if (rc != 0) {
-            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
+            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
         }
         free(entry);
     }

http://git-wip-us.apache.org/repos/asf/celix/blob/8658738d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index f977917..3398fb1 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -21,6 +21,7 @@
 #include <vector>
 #include <thread>
 #include <mutex>
+#include <map>
 #include "pubsub_serializer.h"
 #include "log_helper.h"
 #include "celix_bundle_context.h"
@@ -33,6 +34,34 @@ typedef struct psa_zmq_subscriber_entry {
     pubsub_subscriber_t *svc;
 } psa_nanomsg_subscriber_entry_t;
 
+typedef struct psa_zmq_requested_connection_entry {
+public:
+    psa_zmq_requested_connection_entry(std::string _url, int _id, bool _connected=false):
+    url{_url}, id{_id}, connected{_connected} {
+    }
+    bool isConnected() const {
+        return connected;
+    }
+
+    int getId() const {
+        return id;
+    }
+
+    void setId(int _id) {
+        id = _id;
+    }
+    void setConnected(bool c) {
+        connected = c;
+    }
+
+    const std::string &getUrl() const {
+        return url;
+    }
+private:
+    std::string url;
+    int id;
+    bool connected;
+} psa_nanomsg_requested_connection_entry_t;
 
 namespace pubsub {
     namespace nanomsg {
@@ -58,7 +87,10 @@ namespace pubsub {
             void recvThread_exec();
             void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize);
             void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
-        //private:
+            void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+            void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd);
+
+        private:
             celix_bundle_context_t *ctx{nullptr};
             log_helper_t *logHelper{nullptr};
             long m_serializerSvcId{0};
@@ -77,7 +109,8 @@ namespace pubsub {
 
             struct {
                 std::mutex mutex;
-                hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+                std::map<std::string, psa_nanomsg_requested_connection_entry_t> map;
+                //hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
             } requestedConnections{};
 
             long subscriberTrackerId{0};


[7/8] celix git commit: Nanomsg

Posted by er...@apache.org.
Nanomsg


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/cdefb0d6
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/cdefb0d6
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/cdefb0d6

Branch: refs/heads/nanomsg
Commit: cdefb0d665b27a41f360599598ff489b322b4405
Parents: 883abee
Author: Erjan Altena <er...@gmail.com>
Authored: Mon Nov 26 20:23:11 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Mon Nov 26 20:23:11 2018 +0100

----------------------------------------------------------------------
 .../log_service/loghelper_include/log_helper.h  |  2 +-
 bundles/log_service/src/log_helper.c            |  2 +-
 .../src/pubsub_nanomsg_admin.cc                 | 62 ++++++++------------
 .../src/pubsub_nanomsg_admin.h                  | 10 ++--
 .../src/pubsub_nanomsg_topic_receiver.cc        | 10 ++--
 .../src/pubsub_nanomsg_topic_sender.cc          | 11 +---
 6 files changed, 42 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/loghelper_include/log_helper.h
----------------------------------------------------------------------
diff --git a/bundles/log_service/loghelper_include/log_helper.h b/bundles/log_service/loghelper_include/log_helper.h
index 28e6877..af058eb 100644
--- a/bundles/log_service/loghelper_include/log_helper.h
+++ b/bundles/log_service/loghelper_include/log_helper.h
@@ -33,7 +33,7 @@ celix_status_t logHelper_create(bundle_context_pt context, log_helper_pt* log_he
 celix_status_t logHelper_start(log_helper_pt loghelper);
 celix_status_t logHelper_stop(log_helper_pt loghelper);
 celix_status_t logHelper_destroy(log_helper_pt* loghelper);
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... );
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... );
 #ifdef __cplusplus
 }
 #endif

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/src/log_helper.c
----------------------------------------------------------------------
diff --git a/bundles/log_service/src/log_helper.c b/bundles/log_service/src/log_helper.c
index 6570357..e9939ed 100644
--- a/bundles/log_service/src/log_helper.c
+++ b/bundles/log_service/src/log_helper.c
@@ -156,7 +156,7 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
 
 
 
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... )
 {
     celix_status_t status = CELIX_SUCCESS;
 	va_list listPointer;

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 030441d..cf516ee 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -32,20 +32,15 @@
 #include "pubsub_utils.h"
 #include "pubsub_nanomsg_admin.h"
 #include "pubsub_psa_nanomsg_constants.h"
-/*
-//#define L_DEBUG(...) \
-//    logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
-//#define L_INFO(...) \
-//    logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
-//#define L_WARN(...) \
-//    logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
-//#define L_ERROR(...) \
-//    logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-*/
-#define L_DEBUG printf
-#define L_INFO printf
-#define L_WARN printf
-#define L_ERROR printf
+
+#define L_DEBUG(...) \
+    logHelper_log(log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
 
 
@@ -125,10 +120,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
 
     {
         std::lock_guard<std::mutex> lock(serializers.mutex);
-        // todo: do not use pointer but type in map
-        for(auto kv: serializers.map) {
-            free(kv.second);
-        }
+        serializers.map.clear();
     }
 
     free(ipAddress);
@@ -229,11 +221,14 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t
         std::lock_guard<std::mutex> lock(serializers.mutex);
         auto it = serializers.map.find(svcId);
         if (it == serializers.map.end()) {
-            auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
-            entry->serType = serType;
-            entry->svcId = svcId;
-            entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
-            serializers.map[svcId] = entry;
+            serializers.map.emplace(std::piecewise_construct,
+                    std::forward_as_tuple(svcId),
+                    std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc)));
+//            auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
+//            entry->serType = serType;
+//            entry->svcId = svcId;
+//            entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
+//            serializers.map[svcId] = entry;
         }
     }
 }
@@ -250,18 +245,14 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
 
     std::lock_guard<std::mutex> lock(serializers.mutex);
 
-    psa_nanomsg_serializer_entry_t* entry = nullptr;
     auto kvsm = serializers.map.find(svcId);
     if (kvsm != serializers.map.end()) {
-        entry = kvsm->second;
-    }
-    serializers.map.erase(svcId);
-    if (entry != nullptr) {
+        auto &entry = kvsm->second;
         {
             std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
                 for (auto kv: topicSenders.map) {
                 auto *sender = kv.second;
-                if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+                if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
                     char *key = kv.first;
                     topicSenders.map.erase(kv.first);
                     pubsub_nanoMsgTopicSender_destroy(sender);
@@ -274,7 +265,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
             std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
             for (auto kv : topicReceivers.map){
                 auto *receiver = kv.second;
-                if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
+                if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
                     auto key = kv.first;
                     topicReceivers.map.erase(key);
                     delete receiver;
@@ -282,7 +273,6 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
             }
         }
 
-        free(entry);
     }
 }
 
@@ -340,7 +330,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
         psa_nanomsg_serializer_entry_t *serEntry = nullptr;
         auto kv = serializers.map.find(serializerSvcId);
         if (kv != serializers.map.end()) {
-            serEntry = kv->second;
+            serEntry = &kv->second;
         }
         if (serEntry != nullptr) {
             sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
@@ -418,13 +408,13 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope
             auto kvs = serializers.map.find(serializerSvcId);
             if (kvs != serializers.map.end()) {
                 auto serEntry = kvs->second;
-                receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+                receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry.svc);
             } else {
                 L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope.c_str(), topic.c_str());
             }
             if (receiver != nullptr) {
                 const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
-                const char *serType = kvs->second->serType;
+                const char *serType = kvs->second.serType;
                 newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
                                                     serType, nullptr);
                 //if available also set container name
@@ -577,7 +567,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
             pubsub_nanomsg_topic_sender_t *sender = kvts.second;
             long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
             auto kvs = serializers.map.find(serSvcId);
-            const char *serType = kvs->second == nullptr ? "!Error!" : kvs->second->serType;
+            const char* serType = ( kvs == serializers.map.end() ? "!Error" :  kvs->second.serType);
             const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
             const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
             const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -596,7 +586,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
             pubsub::nanomsg::topic_receiver *receiver = entry.second;
             long serSvcId = receiver->serializerSvcId();
             auto kv =  serializers.map.find(serSvcId);
-            const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
+            const char *serType = (kv == serializers.map.end() ? "!Error!" : kv->second.serType);
             auto scope = receiver->scope();
             auto topic = receiver->topic();
 

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index b33a3c0..689ae20 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -43,8 +43,6 @@
 
 #define PUBSUB_NANOMSG_DEFAULT_IP       "127.0.0.1"
 
-//typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
-
 template <typename key, typename value>
 struct ProtectedMap {
     std::mutex mutex{};
@@ -111,11 +109,16 @@ private:
     bool verbose{};
 
     typedef struct psa_nanomsg_serializer_entry {
+        psa_nanomsg_serializer_entry(const char*_serType, long _svcId, pubsub_serializer_service_t *_svc) :
+            serType{_serType}, svcId{_svcId}, svc{_svc} {
+
+        }
+
         const char *serType;
         long svcId;
         pubsub_serializer_service_t *svc;
     } psa_nanomsg_serializer_entry_t;
-    ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
+    ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
     ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
     ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
     ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
@@ -131,4 +134,3 @@ extern "C" {
 
 
 #endif //CELIX_PUBSUB_ZMQ_ADMIN_H
-

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index db8469b..9f77a4c 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -158,7 +158,7 @@ void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &
 
 
 void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
-    L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
+    L_DEBUG("[PSA_NANOMSG] TopicReceiver %s/%s connecting to nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
     auto entry  = requestedConnections.map.find(url);
@@ -181,7 +181,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
 }
 
 void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
-    L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
+    L_DEBUG("[PSA NANOMSG] TopicReceiver %s/%s disconnect from nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
     auto entry = requestedConnections.map.find(url);
@@ -316,13 +316,13 @@ void pubsub::nanomsg::topic_receiver::recvThread_exec() {
             processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
             nn_freemsg(msg);
         } else if (recvBytes >= 0) {
-            L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
+            L_ERROR("[PSA_NANOMSG_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
         } else if (errno == EAGAIN || errno == ETIMEDOUT) {
             //nop
         } else if (errno == EINTR) {
-            L_DEBUG("[PSA_ZMQ_TR] zmsg_recv interrupted");
+            L_DEBUG("[PSA_NANOMSG_TR] nn_recvmsg interrupted");
         } else {
-            L_WARN("[PSA_ZMQ_TR] Error receiving zmq message: errno %d: %s\n", errno, strerror(errno));
+            L_WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno %d: %s\n", errno, strerror(errno));
         }
     } // while
 

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index ff0d4f7..d5ed28f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -37,9 +37,9 @@
 #include "pubsub_psa_nanomsg_constants.h"
 #include "pubsub_nanomsg_common.h"
 
-#define FIRST_SEND_DELAY_IN_SECONDS             2
+#define FIRST_SEND_DELAY_IN_SECONDS                 2
 #define NANOMSG_BIND_MAX_RETRY                      10
-/*
+
 #define L_DEBUG(...) \
     logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
@@ -48,11 +48,6 @@
     logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
 #define L_ERROR(...) \
     logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-*/
-#define L_DEBUG printf
-#define L_INFO printf
-#define L_WARN printf
-#define L_ERROR printf
 
 struct pubsub_nanomsg_topic_sender {
     celix_bundle_context_t *ctx;
@@ -349,7 +344,7 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId
     return status;
 }
 
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t */*sender*/) {
+static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) {
 
     static bool firstSend = true;