You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:02:58 UTC

[2/8] celix git commit: nanomsg topicreceiver changed to class

nanomsg topicreceiver changed to class


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c19a5bd8
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c19a5bd8
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c19a5bd8

Branch: refs/heads/nanomsg
Commit: c19a5bd85b5c514794c6b38a1080b5382b8ca1ad
Parents: 95633eb
Author: Erjan Altena <er...@gmail.com>
Authored: Mon Nov 19 20:10:50 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Mon Nov 19 20:10:50 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 |  44 ++---
 .../src/pubsub_nanomsg_admin.h                  |   6 +-
 .../src/pubsub_nanomsg_topic_receiver.cc        | 162 +++++++++----------
 .../src/pubsub_nanomsg_topic_receiver.h         |  77 +++++++--
 4 files changed, 169 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 6c15ec8..3e788ae 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -111,7 +111,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
     {
         std::lock_guard<std::mutex> lock(topicReceivers.mutex);
         for (auto kv: topicReceivers.map) {
-            pubsub_nanoMsgTopicReceiver_destroy(kv.second);
+            delete kv.second;
         }
     }
 
@@ -274,10 +274,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
             std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
             for (auto kv : topicReceivers.map){
                 auto *receiver = kv.second;
-                if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+                if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
                     char *key = kv.first;
                     topicReceivers.map.erase(key);
-                    pubsub_nanoMsgTopicReceiver_destroy(receiver);
+                    delete receiver;
                     free(key);
                 }
             }
@@ -409,7 +409,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
     celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    pubsub_nanomsg_topic_receiver_t * receiver = nullptr;
+    pubsub::nanomsg::topic_receiver * receiver = nullptr;
     {
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
@@ -421,7 +421,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
             auto kvs = serializers.map.find(serializerSvcId);
             if (kvs != serializers.map.end()) {
                 auto serEntry = kvs->second;
-                receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+                receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
             } else {
                 L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
             }
@@ -471,24 +471,24 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co
     free(key);
     if (entry != topicReceivers.map.end()) {
         char *receiverKey = entry->first;
-        pubsub_nanomsg_topic_receiver_t *receiver = entry->second;
+        pubsub::nanomsg::topic_receiver *receiver = entry->second;
         topicReceivers.map.erase(receiverKey);
 
         free(receiverKey);
-        pubsub_nanoMsgTopicReceiver_destroy(receiver);
+        delete receiver;
     }
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
 }
 
-celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
                                                                     const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
-    const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+    const char *scope = receiver->scope();
+    const char *topic = receiver->topic();
 
     const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
     const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
@@ -503,7 +503,7 @@ celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_to
         if (eScope != nullptr && eTopic != nullptr &&
             strncmp(eScope, scope, 1024 * 1024) == 0 &&
             strncmp(eTopic, topic, 1024 * 1024) == 0) {
-            pubsub_nanoMsgTopicReceiver_connectTo(receiver, url);
+            receiver->connectTo(url);
         }
     }
 
@@ -516,7 +516,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
         for (auto entry: topicReceivers.map) {
-            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
+            pubsub::nanomsg::topic_receiver *receiver = entry.second;
             connectEndpointToReceiver(receiver, endpoint);
         }
     }
@@ -532,13 +532,13 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
 }
 
 
-celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
                                                                             const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
-    const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+    const char *scope = receiver->scope();
+    const char *topic = receiver->topic();
 
     const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
     const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
@@ -551,7 +551,7 @@ celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanom
         if (eScope != nullptr && eTopic != nullptr &&
             strncmp(eScope, scope, 1024 * 1024) == 0 &&
             strncmp(eTopic, topic, 1024 * 1024) == 0) {
-            pubsub_nanoMsgTopicReceiver_disconnectFrom(receiver, url);
+            receiver->disconnectFrom(url);
         }
     }
 
@@ -564,7 +564,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
         for (auto entry : topicReceivers.map) {
-            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
+            pubsub::nanomsg::topic_receiver *receiver = entry.second;
             disconnectEndpointFromReceiver(receiver, endpoint);
         }
     }
@@ -605,16 +605,16 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
         for (auto entry : topicReceivers.map) {
-            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
-            long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
+            pubsub::nanomsg::topic_receiver *receiver = entry.second;
+            long serSvcId = receiver->serializerSvcId();
             auto kv =  serializers.map.find(serSvcId);
             const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
-            const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
-            const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+            const char *scope = receiver->scope();
+            const char *topic = receiver->topic();
 
             std::vector<std::string> connected{};
             std::vector<std::string> unconnected{};
-            pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
+            receiver->listConnections(connected, unconnected);
 
             fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
             fprintf(out, "   |- serializer type = %s\n", serType);

http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index c34a310..3e680b6 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -84,10 +84,10 @@ private:
     celix_status_t executeCommand(char *commandLine __attribute__((unused)), FILE *out,
                                                         FILE *errStream __attribute__((unused)));
 
-    celix_status_t connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+    celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
                                                                    const celix_properties_t *endpoint);
 
-    celix_status_t disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+    celix_status_t disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
                                                                         const celix_properties_t *endpoint);
     celix_bundle_context_t *ctx;
     log_helper_t *log;
@@ -117,7 +117,7 @@ private:
     } psa_nanomsg_serializer_entry_t;
     ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
     ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
-    ProtectedMap<char*, pubsub_nanomsg_topic_receiver_t*> topicReceivers{};
+    ProtectedMap<char*, pubsub::nanomsg::topic_receiver*> topicReceivers{};
     ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
 };
 

http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 42f6423..889d79d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -109,92 +109,96 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc
 static void* psa_nanomsg_recvThread(void *data);
 
 
-pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-                                                                    log_helper_t *logHelper, const char *scope,
-                                                                    const char *topic, long serializerSvcId,
-                                                                    pubsub_serializer_service_t *serializer) {
-    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
-    receiver->ctx = ctx;
-    receiver->logHelper = logHelper;
-    receiver->serializerSvcId = serializerSvcId;
-    receiver->serializer = serializer;
-    psa_nanomsg_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
-
-
-    receiver->nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
-    if (receiver->nanoMsgSocket < 0) {
-        free(receiver);
-        receiver = NULL;
-        L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", scope, topic);
+//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+//                                                                    log_helper_t *logHelper, const char *scope,
+//                                                                    const char *topic, long serializerSvcId,
+//                                                                    pubsub_serializer_service_t *serializer) {
+pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
+        log_helper_t *_logHelper,
+        const char *_scope,
+        const char *_topic,
+        long _serializerSvcId,
+        pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
+    //pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
+    ctx = _ctx;
+    logHelper = _logHelper;
+    serializer = _serializer;
+    psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, m_scopeAndTopicFilter);
+
+    m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
+    if (m_nanoMsgSocket < 0) {
+        // TODO throw error or something
+        //free(receiver);
+        //receiver = NULL;
+        L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope, m_topic);
     } else {
         int timeout = PSA_NANOMSG_RECV_TIMEOUT;
-        if (nn_setsockopt(receiver->nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
+        if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
                           sizeof (timeout)) < 0) {
-            free(receiver);
-            receiver = NULL;
-            L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", scope, topic);
+            // TODO throw error or something
+            //free(receiver);
+            //receiver = NULL;
+            L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope, m_topic);
         }
 
         char subscribeFilter[5];
-        psa_nanomsg_setScopeAndTopicFilter(scope, topic, subscribeFilter);
+        psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
         //zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter);
 
-        receiver->scope = strndup(scope, 1024 * 1024);
-        receiver->topic = strndup(topic, 1024 * 1024);
+        m_scope = strndup(m_scope, 1024 * 1024);
+        m_topic = strndup(m_topic, 1024 * 1024);
 
-        receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
-        receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+        subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
-        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
         char buf[size + 1];
-        snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
         celix_service_tracking_options_t opts{};
         opts.filter.ignoreServiceLanguage = true;
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
-        opts.callbackHandle = receiver;
+        opts.callbackHandle = this;
         opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
         opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
 
-        receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-        receiver->recvThread.running = true;
-        celixThread_create(&receiver->recvThread.thread, NULL, psa_nanomsg_recvThread, receiver);
+        subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+        recvThread.running = true;
+        celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this);
         std::stringstream namestream;
-        namestream << "NANOMSG TR " << scope << "/" << topic;
-        celixThread_setName(&receiver->recvThread.thread, namestream.str().c_str());
+        namestream << "NANOMSG TR " << m_scope << "/" << m_topic;
+        celixThread_setName(&recvThread.thread, namestream.str().c_str());
     }
-    return receiver;
 }
 
-void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver) {
-    if (receiver != NULL) {
+pubsub::nanomsg::topic_receiver::~topic_receiver() {
 
         {
-            std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
-            receiver->recvThread.running = false;
+            std::lock_guard<std::mutex> _lock(recvThread.mutex);
+            recvThread.running = false;
         }
-        celixThread_join(receiver->recvThread.thread, NULL);
+        celixThread_join(recvThread.thread, NULL);
 
-        celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+        celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
 
         hash_map_iterator_t iter=hash_map_iterator_t();
         {
-            std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
-            iter = hashMapIterator_construct(receiver->subscribers.map);
+            std::lock_guard<std::mutex> _lock(subscribers.mutex);
+            iter = hashMapIterator_construct(subscribers.map);
             while (hashMapIterator_hasNext(&iter)) {
                 psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
                 if (entry != NULL)  {
-                    receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                    serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
                     free(entry);
                 }
             }
-            hashMap_destroy(receiver->subscribers.map, false, false);
+            hashMap_destroy(subscribers.map, false, false);
         }
 
 
         {
-            std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
-            iter = hashMapIterator_construct(receiver->requestedConnections.map);
+            std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+            iter = hashMapIterator_construct(requestedConnections.map);
             while (hashMapIterator_hasNext(&iter)) {
                 psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
                 if (entry != NULL) {
@@ -202,37 +206,34 @@ void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiv
                     free(entry);
                 }
             }
-            hashMap_destroy(receiver->requestedConnections.map, false, false);
+            hashMap_destroy(requestedConnections.map, false, false);
         }
 
         //celixThreadMutex_destroy(&receiver->subscribers.mutex);
         //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
         //celixThreadMutex_destroy(&receiver->recvThread.mutex);
 
-        nn_close(receiver->nanoMsgSocket);
+        nn_close(m_nanoMsgSocket);
 
-        free(receiver->scope);
-        free(receiver->topic);
-    }
-    free(receiver);
+        free((void*)m_scope);
+        free((void*)m_topic);
 }
 
-const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver) {
-    return receiver->scope;
+const char* pubsub::nanomsg::topic_receiver::scope() const {
+    return m_scope;
 }
-const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver) {
-    return receiver->topic;
+const char* pubsub::nanomsg::topic_receiver::topic() const {
+    return m_topic;
 }
 
-long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver) {
-    return receiver->serializerSvcId;
+long pubsub::nanomsg::topic_receiver::serializerSvcId() const {
+    return m_serializerSvcId;
 }
 
-void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
-                                                 std::vector<std::string> &connectedUrls,
+void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls,
                                                  std::vector<std::string> &unconnectedUrls) {
-    std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(requestedConnections.map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter));
         if (entry->connected) {
@@ -244,21 +245,19 @@ void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t
 }
 
 
-void pubsub_nanoMsgTopicReceiver_connectTo(
-        pubsub_nanomsg_topic_receiver_t *receiver,
-        const char *url) {
-    L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope, receiver->topic, url);
+void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
+    L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url);
 
-    std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
-    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(receiver->requestedConnections.map, url));
+    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(requestedConnections.map, url));
     if (entry == NULL) {
         entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry)));
         entry->url = strndup(url, 1024*1024);
         entry->connected = false;
-        hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
+        hashMap_put(requestedConnections.map, (void*)entry->url, entry);
     }
     if (!entry->connected) {
-        int connection_id = nn_connect(receiver->nanoMsgSocket, url);
+        int connection_id = nn_connect(m_nanoMsgSocket, url);
         if (connection_id >= 0) {
             entry->connected = true;
             entry->id = connection_id;
@@ -268,13 +267,13 @@ void pubsub_nanoMsgTopicReceiver_connectTo(
     }
 }
 
-void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url) {
-    L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope, receiver->topic, url);
+void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
+    L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url);
 
-    std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
-    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(receiver->requestedConnections.map, url));
+    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(requestedConnections.map, url));
     if (entry != NULL && entry->connected) {
-        if (nn_shutdown(receiver->nanoMsgSocket, entry->id) == 0) {
+        if (nn_shutdown(m_nanoMsgSocket, entry->id) == 0) {
             entry->connected = false;
         } else {
             L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno));
@@ -287,7 +286,7 @@ void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t
 }
 
 static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
-    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
 
     long bndId = celix_bundle_getId(bnd);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -318,7 +317,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
 
 static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
                                                          const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
-    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
 
     long bndId = celix_bundle_getId(bnd);
 
@@ -338,7 +337,7 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s
     }
 }
 
-static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
     pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
     pubsub_subscriber_t *svc = entry->svc;
 
@@ -362,7 +361,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t
     }
 }
 
-static inline void processMsg(pubsub_nanomsg_topic_receiver_t *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
     std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -377,8 +376,9 @@ struct Message {
     pubsub_nanmosg_msg_header_t header;
     char payload[];
 };
+
 static void* psa_nanomsg_recvThread(void *data) {
-    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(data);
+    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data);
     bool running{};
     {
         std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);

http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index d584db8..6cd216b 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -24,23 +24,72 @@
 #include "log_helper.h"
 #include "celix_bundle_context.h"
 
-typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+namespace pubsub {
+    namespace nanomsg {
+        class topic_receiver {
+        public:
+            topic_receiver(celix_bundle_context_t
+                           *ctx,
+                           log_helper_t *logHelper,
+                           const char *scope,
+                           const char *topic,
+                           long serializerSvcId, pubsub_serializer_service_t
+                           *serializer);
+            topic_receiver(const topic_receiver &) = delete;
+            topic_receiver & operator=(const topic_receiver &) = delete;
+            ~topic_receiver();
 
-pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-                                                                    log_helper_t *logHelper, const char *scope,
-                                                                    const char *topic, long serializerSvcId,
-                                                                    pubsub_serializer_service_t *serializer);
-void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+            const char* scope() const;
+            const char* topic() const;
+            long serializerSvcId() const;
+            void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
+            void connectTo(const char *url);
+            void disconnectFrom(const char *url);
+        private:
+            celix_bundle_context_t *ctx{nullptr};
+            log_helper_t *logHelper{nullptr};
+            long m_serializerSvcId{0};
+            pubsub_serializer_service_t *serializer{nullptr};
+            const char *m_scope{nullptr};
+            const char *m_topic{nullptr};
+            char m_scopeAndTopicFilter[5];
 
-const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
-const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+            int m_nanoMsgSocket{0};
 
-long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
-void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
-                                                 std::vector<std::string> &connectedUrls,
-                                                 std::vector<std::string> &unconnectedUrls);
+            struct {
+                celix_thread_t thread;
+                std::mutex mutex;
+                bool running;
+            } recvThread{};
 
-void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
-void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+            struct {
+                std::mutex mutex;
+                hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+            } requestedConnections{};
+
+            long subscriberTrackerId{0};
+            struct {
+                std::mutex mutex;
+                hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+            } subscribers{};
+        };
+    }}
+//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+//                                                                    log_helper_t *logHelper, const char *scope,
+//                                                                    const char *topic, long serializerSvcId,
+//                                                                    pubsub_serializer_service_t *serializer);
+//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+
+//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
+//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+
+//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
+//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
+//                                                 std::vector<std::string> &connectedUrls,
+//                                                 std::vector<std::string> &unconnectedUrls);
+
+//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
 
 #endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H