You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:03:04 UTC

[8/8] celix git commit: NanoMsg

NanoMsg


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7c141424
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7c141424
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7c141424

Branch: refs/heads/nanomsg
Commit: 7c141424d925afa98a83bc693649dc3500354965
Parents: cdefb0d
Author: Erjan Altena <er...@gmail.com>
Authored: Tue Nov 27 21:02:18 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Tue Nov 27 21:02:18 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 |  42 ++--
 .../src/pubsub_nanomsg_admin.h                  |   2 +-
 .../src/pubsub_nanomsg_topic_sender.cc          | 240 ++++++++-----------
 .../src/pubsub_nanomsg_topic_sender.h           |  66 ++++-
 4 files changed, 178 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index cf516ee..42ed632 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -99,20 +99,20 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
         std::lock_guard<std::mutex> lock(topicSenders.mutex);
         for (auto kv : topicSenders.map) {
             auto *sender = kv.second;
-            pubsub_nanoMsgTopicSender_destroy(sender);
+            delete (sender);
         }
     }
 
     {
         std::lock_guard<std::mutex> lock(topicReceivers.mutex);
-        for (auto kv: topicReceivers.map) {
+        for (auto &kv: topicReceivers.map) {
             delete kv.second;
         }
     }
 
     {
         std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
-        for (auto entry : discoveredEndpoints.map) {
+        for (auto &entry : discoveredEndpoints.map) {
             auto *ep = entry.second;
             celix_properties_destroy(ep);
         }
@@ -252,10 +252,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
             std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
                 for (auto kv: topicSenders.map) {
                 auto *sender = kv.second;
-                if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+                if (sender != nullptr && entry.svcId == sender->getSerializerSvcId()) {
                     char *key = kv.first;
                     topicSenders.map.erase(kv.first);
-                    pubsub_nanoMsgTopicSender_destroy(sender);
+                    delete (sender);
                     free(key);
                 }
             }
@@ -263,7 +263,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
 
         {
             std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
-            for (auto kv : topicReceivers.map){
+            for (auto &kv : topicReceivers.map){
                 auto *receiver = kv.second;
                 if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
                     auto key = kv.first;
@@ -322,7 +322,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
     celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    pubsub_nanomsg_topic_sender_t *sender = nullptr;
+    pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = nullptr;
     std::lock_guard<std::mutex> serializerLock(serializers.mutex);
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
     sender = topicSenders.map.find(key)->second;
@@ -333,7 +333,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
             serEntry = &kv->second;
         }
         if (serEntry != nullptr) {
-            sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
+            sender = new pubsub::nanomsg::pubsub_nanomsg_topic_sender(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
                                                       basePort, maxPort);
         }
         if (sender != nullptr) {
@@ -341,7 +341,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
             const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType,
                                                 nullptr);
-            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
+            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, sender->getUrl());
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
             if (cn != nullptr) {
@@ -378,10 +378,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
     auto kv = topicSenders.map.find(key);
     if (kv != topicSenders.map.end()) {
         char *mapKey = kv->first;
-        pubsub_nanomsg_topic_sender_t *sender = kv->second;
+        pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kv->second;
         free(mapKey);
         //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
-        pubsub_nanoMsgTopicSender_destroy(sender);
+        delete (sender);
     } else {
         L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
     }
@@ -495,7 +495,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
-        for (auto entry: topicReceivers.map) {
+        for (auto &entry: topicReceivers.map) {
             pubsub::nanomsg::topic_receiver *receiver = entry.second;
             connectEndpointToReceiver(receiver, endpoint);
         }
@@ -541,7 +541,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        for (auto entry : topicReceivers.map) {
+        for (auto &entry : topicReceivers.map) {
             pubsub::nanomsg::topic_receiver *receiver = entry.second;
             disconnectEndpointFromReceiver(receiver, endpoint);
         }
@@ -564,13 +564,13 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
         for (auto kvts: topicSenders.map) {
-            pubsub_nanomsg_topic_sender_t *sender = kvts.second;
-            long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
+            pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kvts.second;
+            long serSvcId = sender->getSerializerSvcId();
             auto kvs = serializers.map.find(serSvcId);
             const char* serType = ( kvs == serializers.map.end() ? "!Error" :  kvs->second.serType);
-            const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
-            const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
-            const char *url = pubsub_nanoMsgTopicSender_url(sender);
+            const char *scope = sender->getScope();
+            const char *topic = sender->getTopic();
+            const char *url = sender->getUrl();
             fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
             fprintf(out, "   |- serializer type = %s\n", serType);
             fprintf(out, "   |- url             = %s\n", url);
@@ -582,7 +582,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
         fprintf(out, "\nTopic Receivers:\n");
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        for (auto entry : topicReceivers.map) {
+        for (auto &entry : topicReceivers.map) {
             pubsub::nanomsg::topic_receiver *receiver = entry.second;
             long serSvcId = receiver->serializerSvcId();
             auto kv =  serializers.map.find(serSvcId);
@@ -596,10 +596,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
 
             fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str());
             fprintf(out, "   |- serializer type = %s\n", serType);
-            for (auto url : connected) {
+            for (auto &url : connected) {
                 fprintf(out, "   |- connected url   = %s\n", url.c_str());
             }
-            for (auto url : unconnected) {
+            for (auto &url : unconnected) {
                 fprintf(out, "   |- unconnected url = %s\n", url.c_str());
             }
         }

http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 689ae20..7c2e9a0 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -119,7 +119,7 @@ private:
         pubsub_serializer_service_t *svc;
     } psa_nanomsg_serializer_entry_t;
     ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
-    ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
+    ProtectedMap<char*, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> topicSenders{};
     ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
     ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
 };

http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index d5ed28f..1c75e71 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -28,11 +28,9 @@
 #include <nanomsg/bus.h>
 
 
-#include <pubsub_serializer.h>
 #include <pubsub_constants.h>
 #include <pubsub/publisher.h>
 #include <pubsub_common.h>
-#include <log_helper.h>
 #include "pubsub_nanomsg_topic_sender.h"
 #include "pubsub_psa_nanomsg_constants.h"
 #include "pubsub_nanomsg_common.h"
@@ -41,69 +39,47 @@
 #define NANOMSG_BIND_MAX_RETRY                      10
 
 #define L_DEBUG(...) \
-    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+    logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
-    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+    logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
 #define L_WARN(...) \
-    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+    logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
 #define L_ERROR(...) \
-    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-
-struct pubsub_nanomsg_topic_sender {
-    celix_bundle_context_t *ctx;
-    log_helper_t *logHelper;
-    long serializerSvcId;
-    pubsub_serializer_service_t *serializer;
-
-    char *scope;
-    char *topic;
-    char scopeAndTopicFilter[5];
-    char *url;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        int socket;
-    } nanomsg;
-
-    struct {
-        long svcId;
-        celix_service_factory_t factory;
-    } publisher;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        hash_map_t *map;  //key = bndId, value = psa_nanomsg_bounded_service_entry_t
-    } boundedServices;
-};
+    logHelper_log(logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
 
 typedef struct psa_nanomsg_bounded_service_entry {
-    pubsub_nanomsg_topic_sender_t *parent;
+    pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent;
     pubsub_publisher_t service;
     long bndId;
     hash_map_t *msgTypes;
     int getCount;
 } psa_nanomsg_bounded_service_entry_t;
 
-static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
-                                             const celix_properties_t *svcProperties);
-static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
-                                              const celix_properties_t *svcProperties);
+//static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+//                                             const celix_properties_t *svcProperties);
+//static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+//                                              const celix_properties_t *svcProperties);
 static unsigned int rand_range(unsigned int min, unsigned int max);
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender);
+static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender);
 
 static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg);
 
-pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
-                                                                const char *scope, const char *topic,
-                                                                long serializerSvcId, pubsub_serializer_service_t *ser,
-                                                                const char *bindIP, unsigned int basePort,
-                                                                unsigned int maxPort) {
-    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(calloc(1, sizeof(*sender)));
-    sender->ctx = ctx;
-    sender->logHelper = logHelper;
-    sender->serializerSvcId = serializerSvcId;
-    sender->serializer = ser;
-    psa_nanomsg_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx,
+                                                         log_helper_t *_logHelper,
+                                                         const char *_scope,
+                                                         const char *_topic,
+                                                         long _serializerSvcId,
+                                                         pubsub_serializer_service_t *_ser,
+                                                         const char *_bindIp,
+                                                         unsigned int _basePort,
+                                                         unsigned int _maxPort) :
+        ctx{_ctx},
+        logHelper{_logHelper},
+        serializerSvcId {_serializerSvcId},
+        serializer{_ser}{
+
+    psa_nanomsg_setScopeAndTopicFilter(_scope, _topic, scopeAndTopicFilter);
 
     //setting up nanomsg socket for nanomsg TopicSender
     {
@@ -116,10 +92,10 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
         int rv = -1, retry=0;
         while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
             /* Randomized part due to same bundle publishing on different topics */
-            unsigned int port = rand_range(basePort,maxPort);
-            size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
-            char *url = static_cast<char*>(calloc(len, sizeof(char*)));
-            snprintf(url, len, "tcp://%s:%u", bindIP, port);
+            unsigned int port = rand_range(_basePort,_maxPort);
+            size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", _bindIp, port) + 1;
+            char *_url = static_cast<char*>(calloc(len, sizeof(char*)));
+            snprintf(_url, len, "tcp://%s:%u", _bindIp, port);
 
             len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
             char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
@@ -127,165 +103,155 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
             rv = nn_bind (socket, bindUrl);
             if (rv == -1) {
                 perror("Error for nn_bind");
-                free(url);
+                free(_url);
             } else {
-                sender->url = url;
-                sender->nanomsg.socket = socket;
+                this->url = _url;
+                nanomsg.socket = socket;
             }
             retry++;
             free(bindUrl);
         }
     }
 
-    if (sender->url != NULL) {
-        sender->scope = strndup(scope, 1024 * 1024);
-        sender->topic = strndup(topic, 1024 * 1024);
+    if (url != NULL) {
+        scope = strndup(_scope, 1024 * 1024);
+        topic = strndup(_topic, 1024 * 1024);
 
-        celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
-        celixThreadMutex_create(&sender->nanomsg.mutex, NULL);
-        sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+        celixThreadMutex_create(&boundedServices.mutex, NULL);
+        celixThreadMutex_create(&nanomsg.mutex, NULL);
+        boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
     }
 
     //register publisher services using a service factory
-    if (sender->url != NULL) {
-        sender->publisher.factory.handle = sender;
-        sender->publisher.factory.getService = psa_nanomsg_getPublisherService;
-        sender->publisher.factory.ungetService = psa_nanomsg_ungetPublisherService;
+    if (url != NULL) {
+        publisher.factory.handle = this;
+        publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
+            return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService(
+                    requestingBundle,
+                    svcProperties);
+        };
+        publisher.factory.ungetService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
+            return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService(
+                    requestingBundle,
+                    svcProperties);
+        };
 
         celix_properties_t *props = celix_properties_create();
-        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
-        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic);
+        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope);
 
         celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
-        opts.factory = &sender->publisher.factory;
+        opts.factory = &publisher.factory;
         opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
         opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
         opts.properties = props;
 
-        sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
-    }
-
-    if (sender->url == NULL) {
-        free(sender);
-        sender = NULL;
+        publisher.svcId = celix_bundleContext_registerServiceWithOptions(_ctx, &opts);
     }
 
-    return sender;
 }
 
-void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender) {
-    if (sender != NULL) {
-        celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() {
+    celix_bundleContext_unregisterService(ctx, publisher.svcId);
 
-        nn_close(sender->nanomsg.socket);
+    nn_close(nanomsg.socket);
 
-        celixThreadMutex_lock(&sender->boundedServices.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
-            if (entry != NULL) {
-                sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
-                free(entry);
-            }
+    celixThreadMutex_lock(&boundedServices.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(boundedServices.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
+        if (entry != NULL) {
+            serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
+            free(entry);
         }
-        hashMap_destroy(sender->boundedServices.map, false, false);
-        celixThreadMutex_unlock(&sender->boundedServices.mutex);
-
-        celixThreadMutex_destroy(&sender->boundedServices.mutex);
-        celixThreadMutex_destroy(&sender->nanomsg.mutex);
-
-        free(sender->scope);
-        free(sender->topic);
-        free(sender->url);
-        free(sender);
     }
-}
+    hashMap_destroy(boundedServices.map, false, false);
+    celixThreadMutex_unlock(&boundedServices.mutex);
 
-long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender) {
-    return sender->serializerSvcId;
-}
+    celixThreadMutex_destroy(&boundedServices.mutex);
+    celixThreadMutex_destroy(&nanomsg.mutex);
 
-const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender) {
-    return sender->scope;
+    free(scope);
+    free(topic);
+    free(url);
 }
 
-const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender) {
-    return sender->topic;
+long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const {
+    return serializerSvcId;
 }
 
-const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender) {
-    return sender->url;
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const {
+    return scope;
 }
 
-void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
-    //TODO subscriber count -> topic info
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const {
+    return topic;
 }
 
-void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
-    //TODO
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const  {
+    return url;
 }
 
-static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+
+void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle,
                                              const celix_properties_t *svcProperties __attribute__((unused))) {
-    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
     long bndId = celix_bundle_getId(requestingBundle);
 
-    celixThreadMutex_lock(&sender->boundedServices.mutex);
-    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+    celixThreadMutex_lock(&boundedServices.mutex);
+    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId));
     if (entry != NULL) {
         entry->getCount += 1;
     } else {
         entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(calloc(1, sizeof(*entry)));
         entry->getCount = 1;
-        entry->parent = sender;
+        entry->parent = this;
         entry->bndId = bndId;
 
-        int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+        int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
         if (rc == 0) {
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_nanoMsg_localMsgTypeIdForMsgType;
             entry->service.send = psa_nanomsg_topicPublicationSend;
             entry->service.sendMultipart = NULL; //not supported TODO remove
-            hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+            hashMap_put(boundedServices.map, (void*)bndId, entry);
         } else {
-            L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", sender->scope, sender->topic);
+            L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", scope, topic);
         }
 
 
 
     }
-    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    celixThreadMutex_unlock(&boundedServices.mutex);
 
     return &entry->service;
 }
 
-static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle,
                                               const celix_properties_t *svcProperties __attribute__((unused))) {
-    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
     long bndId = celix_bundle_getId(requestingBundle);
 
-    celixThreadMutex_lock(&sender->boundedServices.mutex);
-    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+    celixThreadMutex_lock(&boundedServices.mutex);
+    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId));
     if (entry != NULL) {
         entry->getCount -= 1;
     }
     if (entry != NULL && entry->getCount == 0) {
         //free entry
-        hashMap_remove(sender->boundedServices.map, (void*)bndId);
-        int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+        hashMap_remove(boundedServices.map, (void*)bndId);
+        int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
         if (rc != 0) {
             L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
         }
 
         free(entry);
     }
-    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    celixThreadMutex_unlock(&boundedServices.mutex);
 }
 
 static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) {
     int status = CELIX_SUCCESS;
     psa_nanomsg_bounded_service_entry_t *bound = static_cast<psa_nanomsg_bounded_service_entry_t*>(handle);
-    pubsub_nanomsg_topic_sender_t *sender = bound->parent;
+    pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = bound->parent;
 
     pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId));
 
@@ -329,27 +295,27 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId
             int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 );
             free(serializedOutput);
             if (rc < 0) {
-                L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno));
+                //TODO L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno));
             } else {
-                L_INFO("[PSA_ZMQ_TS] Send message with size %d\n",  rc);
-                L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n",  msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
+                //TODO L_INFO("[PSA_ZMQ_TS] Send message with size %d\n",  rc);
+                //TODO L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n",  msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
             }
         } else {
-            L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
+            //TODO L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
         }
     } else {
         status = CELIX_SERVICE_EXCEPTION;
-        L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+        //TODO L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
     }
     return status;
 }
 
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) {
+static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender */*sender*/) {
 
     static bool firstSend = true;
 
     if(firstSend){
-        L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+        //TODO L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
         sleep(FIRST_SEND_DELAY_IN_SECONDS);
         firstSend = false;
     }

http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
index ec85c37..90ab6ce 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
@@ -20,23 +20,63 @@
 #define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
 
 #include "celix_bundle_context.h"
+#include <log_helper.h>
+#include <pubsub_serializer.h>
 
-typedef struct pubsub_nanomsg_topic_sender pubsub_nanomsg_topic_sender_t;
+namespace pubsub {
+    namespace nanomsg {
+        class pubsub_nanomsg_topic_sender {
+        public:
+            pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, const char *_scope,
+                                        const char *_topic, long _serializerSvcId, pubsub_serializer_service_t *_ser,
+                                        const char *_bindIp, unsigned int _basePort, unsigned int _maxPort);
 
-pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
-                                                                const char *scope, const char *topic,
-                                                                long serializerSvcId, pubsub_serializer_service_t *ser,
-                                                                const char *bindIP, unsigned int basePort,
-                                                                unsigned int maxPort);
-void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender);
+            ~pubsub_nanomsg_topic_sender();
 
-const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender);
-const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender);
-const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender);
+            pubsub_nanomsg_topic_sender(const pubsub_nanomsg_topic_sender &) = delete;
 
-long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender);
+            const pubsub_nanomsg_topic_sender &operator=(const pubsub_nanomsg_topic_sender &) = delete;
 
-void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
-void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
+            long getSerializerSvcId() const ;
+            const char *getScope() const ;
+            const char *getTopic() const ;
+            const char *getUrl() const;
+
+            void* getPublisherService(const celix_bundle_t *requestingBundle,
+                    const celix_properties_t *svcProperties __attribute__((unused)));
+            void ungetPublisherService(const celix_bundle_t *requestingBundle,
+                        const celix_properties_t *svcProperties __attribute__((unused)));
+            int topicPublicationSend(unsigned int msgTypeId, const void *inMsg);
+            void delay_first_send_for_late_joiners() ;
+
+
+                //private:
+            celix_bundle_context_t *ctx;
+            log_helper_t *logHelper;
+            long serializerSvcId;
+            pubsub_serializer_service_t *serializer;
+
+            char *scope{};
+            char *topic{};
+            char scopeAndTopicFilter[5];
+            char *url{};
+
+            struct {
+                celix_thread_mutex_t mutex;
+                int socket;
+            } nanomsg{};
+
+            struct {
+                long svcId;
+                celix_service_factory_t factory;
+            } publisher{};
+
+            struct {
+                celix_thread_mutex_t mutex{};
+                hash_map_t *map{};  //key = bndId, value = psa_nanomsg_bounded_service_entry_t
+            } boundedServices{};
+        };
+    }
+}
 
 #endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H