You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/10/24 14:29:03 UTC

[1/2] celix git commit: NanoMsgAdmin: first version

Repository: celix
Updated Branches:
  refs/heads/nanomsg [created] 4fc1f3d3f


http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
new file mode 100644
index 0000000..feead76
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -0,0 +1,368 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <memory.h>
+
+#include <stdlib.h>
+#include <utils.h>
+#include <arpa/inet.h>
+#include <zconf.h>
+
+#include <nanomsg/nn.h>
+#include <nanomsg/bus.h>
+
+
+#include <pubsub_serializer.h>
+#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <pubsub_common.h>
+#include <log_helper.h>
+#include "pubsub_nanomsg_topic_sender.h"
+#include "pubsub_psa_nanomsg_constants.h"
+#include "pubsub_nanomsg_common.h"
+
+#define FIRST_SEND_DELAY_IN_SECONDS             2
+#define NANOMSG_BIND_MAX_RETRY                      10
+/*
+#define L_DEBUG(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+*/
+#define L_DEBUG printf
+#define L_INFO printf
+#define L_WARN printf
+#define L_ERROR printf
+
+struct pubsub_nanomsg_topic_sender {
+    celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
+
+    char *scope;
+    char *topic;
+    char scopeAndTopicFilter[5];
+    char *url;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        int socket;
+    } nanomsg;
+
+    struct {
+        long svcId;
+        celix_service_factory_t factory;
+    } publisher;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map;  //key = bndId, value = psa_nanomsg_bounded_service_entry_t
+    } boundedServices;
+};
+
+typedef struct psa_nanomsg_bounded_service_entry {
+    pubsub_nanomsg_topic_sender_t *parent;
+    pubsub_publisher_t service;
+    long bndId;
+    hash_map_t *msgTypes;
+    int getCount;
+} psa_nanomsg_bounded_service_entry_t;
+
+static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                             const celix_properties_t *svcProperties);
+static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                              const celix_properties_t *svcProperties);
+static unsigned int rand_range(unsigned int min, unsigned int max);
+static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender);
+
+static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg);
+
+pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
+                                                                const char *scope, const char *topic,
+                                                                long serializerSvcId, pubsub_serializer_service_t *ser,
+                                                                const char *bindIP, unsigned int basePort,
+                                                                unsigned int maxPort) {
+    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(calloc(1, sizeof(*sender)));
+    sender->ctx = ctx;
+    sender->logHelper = logHelper;
+    sender->serializerSvcId = serializerSvcId;
+    sender->serializer = ser;
+    psa_nanomsg_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+
+    //setting up nanomsg socket for nanomsg TopicSender
+    {
+
+        int socket = nn_socket(AF_SP, NN_BUS);
+        if (socket == -1) {
+            perror("Error for nanomsg_socket");
+        }
+
+        int rv = -1, retry=0;
+        while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
+            /* Randomized part due to same bundle publishing on different topics */
+            unsigned int port = rand_range(basePort,maxPort);
+
+            size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
+            char *url = static_cast<char*>(calloc(len, sizeof(char*)));
+            snprintf(url, len, "tcp://%s:%u", bindIP, port);
+
+            len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
+            char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
+            snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
+
+            rv = nn_bind (socket, bindUrl);
+            if (rv == -1) {
+                perror("Error for nn_bind");
+                free(url);
+            } else {
+                sender->url = url;
+                sender->nanomsg.socket = socket;
+            }
+            retry++;
+            free(bindUrl);
+        }
+    }
+
+    if (sender->url != NULL) {
+        sender->scope = strndup(scope, 1024 * 1024);
+        sender->topic = strndup(topic, 1024 * 1024);
+
+        celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+        celixThreadMutex_create(&sender->nanomsg.mutex, NULL);
+        sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+    }
+
+    //register publisher services using a service factory
+    if (sender->url != NULL) {
+        sender->publisher.factory.handle = sender;
+        sender->publisher.factory.getService = psa_nanomsg_getPublisherService;
+        sender->publisher.factory.ungetService = psa_nanomsg_ungetPublisherService;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+
+        celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+        opts.factory = &sender->publisher.factory;
+        opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+        opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
+        opts.properties = props;
+
+        sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+    }
+
+    if (sender->url == NULL) {
+        free(sender);
+        sender = NULL;
+    }
+
+    return sender;
+}
+
+void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender) {
+    if (sender != NULL) {
+        celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+
+        nn_close(sender->nanomsg.socket);
+
+        celixThreadMutex_lock(&sender->boundedServices.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
+            if (entry != NULL) {
+                sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+                free(entry);
+            }
+        }
+        hashMap_destroy(sender->boundedServices.map, false, false);
+        celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+        celixThreadMutex_destroy(&sender->boundedServices.mutex);
+        celixThreadMutex_destroy(&sender->nanomsg.mutex);
+
+        free(sender->scope);
+        free(sender->topic);
+        free(sender->url);
+        free(sender);
+    }
+}
+
+long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender) {
+    return sender->serializerSvcId;
+}
+
+const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender) {
+    return sender->scope;
+}
+
+const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender) {
+    return sender->topic;
+}
+
+const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender) {
+    return sender->url;
+}
+
+void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
+    //TODO subscriber count -> topic info
+}
+
+void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
+    //TODO
+}
+
+static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                             const celix_properties_t *svcProperties __attribute__((unused))) {
+    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+    if (entry != NULL) {
+        entry->getCount += 1;
+    } else {
+        entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(calloc(1, sizeof(*entry)));
+        entry->getCount = 1;
+        entry->parent = sender;
+        entry->bndId = bndId;
+
+        int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+        if (rc == 0) {
+            entry->service.handle = entry;
+            entry->service.localMsgTypeIdForMsgType = psa_nanoMsg_localMsgTypeIdForMsgType;
+            entry->service.send = psa_nanomsg_topicPublicationSend;
+            entry->service.sendMultipart = NULL; //not supported TODO remove
+            hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+        } else {
+            L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", sender->scope, sender->topic);
+        }
+
+
+
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+    return &entry->service;
+}
+
+static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                              const celix_properties_t *svcProperties __attribute__((unused))) {
+    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+    if (entry != NULL) {
+        entry->getCount -= 1;
+    }
+    if (entry != NULL && entry->getCount == 0) {
+        //free entry
+        hashMap_remove(sender->boundedServices.map, (void*)bndId);
+        int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+        if (rc != 0) {
+            L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
+        }
+
+        free(entry);
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+}
+
+static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) {
+    int status = CELIX_SUCCESS;
+    psa_nanomsg_bounded_service_entry_t *bound = static_cast<psa_nanomsg_bounded_service_entry_t*>(handle);
+    pubsub_nanomsg_topic_sender_t *sender = bound->parent;
+
+    pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId));
+
+    if (msgSer != NULL) {
+        delay_first_send_for_late_joiners(sender);
+
+        int major = 0, minor = 0;
+
+        pubsub_nanmosg_msg_header_t msg_hdr;// = calloc(1, sizeof(*msg_hdr));
+        msg_hdr.type = msgTypeId;
+
+        if (msgSer->msgVersion != NULL) {
+            version_getMajor(msgSer->msgVersion, &major);
+            version_getMinor(msgSer->msgVersion, &minor);
+            msg_hdr.major = (unsigned char) major;
+            msg_hdr.minor = (unsigned char) minor;
+        }
+
+        void *serializedOutput = NULL;
+        size_t serializedOutputLen = 0;
+        status = msgSer->serialize(msgSer, inMsg, &serializedOutput, &serializedOutputLen);
+        if (status == CELIX_SUCCESS) {
+            nn_iovec data[2];
+
+            nn_msghdr msg;
+            msg.msg_iov = data;
+            msg.msg_iovlen = 2;
+            msg.msg_iov[0].iov_base = static_cast<void*>(&msg_hdr);
+            msg.msg_iov[0].iov_len = sizeof(msg_hdr);
+            msg.msg_iov[1].iov_base = serializedOutput;
+            msg.msg_iov[1].iov_len = serializedOutputLen;
+            msg.msg_control = nullptr;
+            msg.msg_controllen = 0;
+            //zmsg_t *msg = zmsg_new();
+            //TODO revert to use zmq_msg_init_data (or something like that) for zero copy for the payload
+            //TODO remove socket mutex .. not needed (initialized during creation)
+            //zmsg_addstr(msg, sender->scopeAndTopicFilter);
+            //zmsg_addmem(msg, &msg_hdr, sizeof(msg_hdr));
+            //zmsg_addmem(msg, serializedOutput, );
+            errno = 0;
+            int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 );
+            free(serializedOutput);
+            if (rc < 0) {
+                L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno));
+            } else {
+                L_INFO("[PSA_ZMQ_TS] Send message with size %d\n",  rc);
+                L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n",  msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
+            }
+        } else {
+            L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
+        }
+    } else {
+        status = CELIX_SERVICE_EXCEPTION;
+        L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+    }
+    return status;
+}
+
+static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t */*sender*/) {
+
+    static bool firstSend = true;
+
+    if(firstSend){
+        L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        firstSend = false;
+    }
+}
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+    double scaled = ((double)random())/((double)RAND_MAX);
+    return (unsigned int)((max-min+1)*scaled + min);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
new file mode 100644
index 0000000..ec85c37
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
@@ -0,0 +1,42 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
+#define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
+
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_nanomsg_topic_sender pubsub_nanomsg_topic_sender_t;
+
+pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
+                                                                const char *scope, const char *topic,
+                                                                long serializerSvcId, pubsub_serializer_service_t *ser,
+                                                                const char *bindIP, unsigned int basePort,
+                                                                unsigned int maxPort);
+void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender);
+
+const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender);
+const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender);
+const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender);
+
+long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender);
+
+void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
+void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
+
+#endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h
new file mode 100644
index 0000000..2ae2a16
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h
@@ -0,0 +1,50 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+
+#ifndef PUBSUB_PSA_ZMQ_CONSTANTS_H_
+#define PUBSUB_PSA_ZMQ_CONSTANTS_H_
+
+
+
+#define PSA_ZMQ_PUBSUB_ADMIN_TYPE	            "zmq"
+
+#define PSA_NANOMSG_BASE_PORT                       "PSA_ZMQ_BASE_PORT"
+#define PSA_NANOMSG_MAX_PORT                        "PSA_ZMQ_MAX_PORT"
+
+#define PSA_NANOMSG_DEFAULT_BASE_PORT               5501
+#define PSA_NANOMSG_DEFAULT_MAX_PORT                6000
+
+#define PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE 	    30
+#define PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE 	    70
+#define PSA_NANOMSG_DEFAULT_SCORE 				    30
+
+#define PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY 		    "PSA_ZMQ_QOS_SAMPLE_SCORE"
+#define PSA_NANOMSG_QOS_CONTROL_SCORE_KEY 		    "PSA_ZMQ_QOS_CONTROL_SCORE"
+#define PSA_NANOMSG_DEFAULT_SCORE_KEY 			    "PSA_ZMQ_DEFAULT_SCORE"
+
+#define PSA_ZMQ_DEFAULT_VERBOSE 			    false
+#define PSA_ZMQ_VERBOSE_KEY		 			    "PSA_ZMQ_VERBOSE"
+
+
+#define PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY			"pubsub.zmq.url"
+
+
+#endif /* PUBSUB_PSA_ZMQ_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 6d06ee6..372e386 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -79,7 +79,7 @@ struct pubsub_udpmc_admin {
 
 };
 
-typedef struct psa_zmq_serializer_entry {
+typedef struct psa_nanomsg_serializer_entry {
     const char *serType;
     long svcId;
     pubsub_serializer_service_t *svc;

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index b03109d..9cda058 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -28,6 +28,9 @@
 
 #include "pubsub_constants.h"
 
+#ifdef __cplusplus
+extern "C" {
+#endif
 //required for valid endpoint
 #define PUBSUB_ENDPOINT_TOPIC_NAME      "pubsub.topic.name"
 #define PUBSUB_ENDPOINT_TOPIC_SCOPE     "pubsub.topic.scope"
@@ -43,16 +46,26 @@
 #define PUBSUB_SUBSCRIBER_ENDPOINT_TYPE "pubsub.subscriber"
 
 
-celix_properties_t* pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, const char* pubsubType, const char* adminType, const char *serType, celix_properties_t *topic_props);
-celix_properties_t* pubsubEndpoint_createFromSubscriberSvc(bundle_context_t* ctx, long svcBndId, const celix_properties_t *svcProps);
-celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context_t *ctx, long bundleId, const char *filter);
+celix_properties_t *
+pubsubEndpoint_create(const char *fwUUID, const char *scope, const char *topic, const char *pubsubType,
+                      const char *adminType, const char *serType, celix_properties_t *topic_props);
+
+celix_properties_t *
+pubsubEndpoint_createFromSubscriberSvc(bundle_context_t *ctx, long svcBndId, const celix_properties_t *svcProps);
+
+celix_properties_t *
+pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context_t *ctx, long bundleId, const char *filter);
 
 bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properties_t *psEp2);
 
 //check if the required properties are available for the endpoint
-bool pubsubEndpoint_isValid(const celix_properties_t *endpointProperties, bool requireAdminType, bool requireSerializerType);
+bool
+pubsubEndpoint_isValid(const celix_properties_t *endpointProperties, bool requireAdminType, bool requireSerializerType);
 
 
-char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic);
+char *pubsubEndpoint_createScopeTopicKey(const char *scope, const char *topic);
 
+#ifdef __cplusplus
+}
+#endif
 #endif /* PUBSUB_ENDPOINT_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 66cc44a..e4983b1 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -23,10 +23,12 @@
 #include "bundle_context.h"
 #include "celix_array_list.h"
 #include "celix_bundle_context.h"
-
-#define PUBSUB_UTILS_QOS_ATTRIBUTE_KEY	    "qos"
-#define PUBSUB_UTILS_QOS_TYPE_SAMPLE		"sample"	/* A.k.a. unreliable connection */
-#define PUBSUB_UTILS_QOS_TYPE_CONTROL	    "control"	/* A.k.a. reliable connection */
+#ifdef __cplusplus
+extern "C" {
+#endif
+#define PUBSUB_UTILS_QOS_ATTRIBUTE_KEY        "qos"
+#define PUBSUB_UTILS_QOS_TYPE_SAMPLE        "sample"    /* A.k.a. unreliable connection */
+#define PUBSUB_UTILS_QOS_TYPE_CONTROL        "control"    /* A.k.a. reliable connection */
 
 
 /**
@@ -36,38 +38,26 @@
  * present a allocated scope string.
  * The caller is owner of the topic and scope output string.
  */
-celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topic, char **scope);
+celix_status_t pubsub_getPubSubInfoFromFilter(const char *filterstr, char **topic, char **scope);
 
-char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
+char *pubsub_getKeysBundleDir(bundle_context_pt ctx);
 
-double pubsub_utils_matchPublisher(
-        celix_bundle_context_t *ctx,
-        long bundleId,
-        const char *filter,
-        const char *adminType,
-        double sampleScore,
-        double controlScore,
-        double defaultScore,
-        long *outSerializerSvcId);
+double
+pubsub_utils_matchPublisher(celix_bundle_context_t *ctx, long bundleId, const char *filter, const char *adminType,
+                            double sampleScore, double controlScore, double defaultScore, long *outSerializerSvcId);
 
-double pubsub_utils_matchSubscriber(
-        celix_bundle_context_t *ctx,
-        long svcProviderBundleId,
-        const celix_properties_t *svcProperties,
-        const char *adminType,
-        double sampleScore,
-        double controlScore,
-        double defaultScore,
-        long *outSerializerSvcId);
+double pubsub_utils_matchSubscriber(celix_bundle_context_t *ctx, long svcProviderBundleId,
+                                    const celix_properties_t *svcProperties, const char *adminType, double sampleScore,
+                                    double controlScore, double defaultScore, long *outSerializerSvcId);
 
-bool pubsub_utils_matchEndpoint(
-        celix_bundle_context_t *ctx,
-        const celix_properties_t *endpoint,
-        const char *adminType,
-        long *outSerializerSvcId);
+bool pubsub_utils_matchEndpoint(celix_bundle_context_t *ctx, const celix_properties_t *endpoint, const char *adminType,
+                                long *outSerializerSvcId);
 
 
-celix_properties_t* pubsub_utils_getTopicProperties(const celix_bundle_t *bundle, const char *topic, bool isPublisher);
+celix_properties_t *pubsub_utils_getTopicProperties(const celix_bundle_t *bundle, const char *topic, bool isPublisher);
 
+#ifdef __cplusplus
+}
+#endif
 
 #endif /* PUBSUB_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/cmake/Modules/FindNanoMsg.cmake
----------------------------------------------------------------------
diff --git a/cmake/Modules/FindNanoMsg.cmake b/cmake/Modules/FindNanoMsg.cmake
new file mode 100644
index 0000000..79d68c5
--- /dev/null
+++ b/cmake/Modules/FindNanoMsg.cmake
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# - Try to find ZMQ
+# 	Once done this will define
+#  ZMQ_FOUND - System has Zmq
+#  ZMQ_INCLUDE_DIRS - The Zmq include directories
+#  ZMQ_LIBRARIES - The libraries needed to use Zmq
+#  ZMQ_DEFINITIONS - Compiler switches required for using Zmq
+
+find_path(NANOMSG_INCLUDE_DIR nanomsg/nn.h
+          /usr/include
+          /usr/local/include )
+
+find_library(NANOMSG_LIBRARY NAMES nanomsg
+             PATHS /usr/lib /usr/local/lib /usr/lib64 /usr/local/lib64 )
+
+set(NANOMSG_LIBRARIES ${NANOMSG_LIBRARY} )
+set(NANOMSG_INCLUDE_DIRS ${NANOMSG_INCLUDE_DIR} )
+
+include(FindPackageHandleStandardArgs)
+# handle the QUIETLY and REQUIRED arguments and set ZMQ_FOUND to TRUE
+# if all listed variables are TRUE
+find_package_handle_standard_args(NanoMsg  DEFAULT_MSG
+                                  NANOMSG_LIBRARY NANOMSG_INCLUDE_DIR)
+
+mark_as_advanced(NANOMSG_INCLUDE_DIR NANOMSG_LIBRARY )

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/libs/framework/include/celix_bundle_activator.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_bundle_activator.h b/libs/framework/include/celix_bundle_activator.h
index eb7d514..75e34d2 100644
--- a/libs/framework/include/celix_bundle_activator.h
+++ b/libs/framework/include/celix_bundle_activator.h
@@ -115,7 +115,7 @@ celix_status_t celix_bundleActivator_destroy(void *userData, celix_bundle_contex
 #define CELIX_GEN_BUNDLE_ACTIVATOR(actType, actStart, actStop)                                                         \
 celix_status_t celix_bundleActivator_create(celix_bundle_context_t *ctx __attribute__((unused)), void **userData) {    \
     celix_status_t status = CELIX_SUCCESS;                                                                             \
-    actType *data = calloc(1, sizeof(*data));                                                                          \
+    actType *data = (actType*)calloc(1, sizeof(*data));                                                                          \
     if (data != NULL) {                                                                                                \
         *userData = data;                                                                                              \
     } else {                                                                                                           \


[2/2] celix git commit: NanoMsgAdmin: first version

Posted by er...@apache.org.
NanoMsgAdmin: first version


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/4fc1f3d3
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/4fc1f3d3
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/4fc1f3d3

Branch: refs/heads/nanomsg
Commit: 4fc1f3d3f7398a4b1f0e3993ed889b811f653d95
Parents: 1ffdd94
Author: Erjan Altena <er...@gmail.com>
Authored: Wed Oct 24 14:15:02 2018 +0200
Committer: Erjan Altena <er...@gmail.com>
Committed: Wed Oct 24 14:30:16 2018 +0200

----------------------------------------------------------------------
 .../log_service/loghelper_include/log_helper.h  |   7 +-
 bundles/pubsub/CMakeLists.txt                   |   2 +
 bundles/pubsub/examples/CMakeLists.txt          |  88 ++-
 .../pubsub/pubsub_admin_nanomsg/CMakeLists.txt  |  51 ++
 .../pubsub_admin_nanomsg/src/nanomsg_crypto.cc  | 281 ++++++++
 .../pubsub_admin_nanomsg/src/nanomsg_crypto.h   |  41 ++
 .../src/psa_nanomsg_activator.cc                | 113 +++
 .../src/pubsub_nanomsg_admin.cc                 | 698 +++++++++++++++++++
 .../src/pubsub_nanomsg_admin.h                  |  73 ++
 .../src/pubsub_nanomsg_common.cc                |  56 ++
 .../src/pubsub_nanomsg_common.h                 |  56 ++
 .../src/pubsub_nanomsg_topic_receiver.cc        | 420 +++++++++++
 .../src/pubsub_nanomsg_topic_receiver.h         |  45 ++
 .../src/pubsub_nanomsg_topic_sender.cc          | 368 ++++++++++
 .../src/pubsub_nanomsg_topic_sender.h           |  42 ++
 .../src/pubsub_psa_nanomsg_constants.h          |  50 ++
 .../src/pubsub_udpmc_admin.c                    |   2 +-
 .../pubsub/pubsub_spi/include/pubsub_endpoint.h |  23 +-
 .../pubsub/pubsub_spi/include/pubsub_utils.h    |  50 +-
 cmake/Modules/FindNanoMsg.cmake                 |  42 ++
 libs/framework/include/celix_bundle_activator.h |   2 +-
 21 files changed, 2471 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/log_service/loghelper_include/log_helper.h
----------------------------------------------------------------------
diff --git a/bundles/log_service/loghelper_include/log_helper.h b/bundles/log_service/loghelper_include/log_helper.h
index 2ae9d83..28e6877 100644
--- a/bundles/log_service/loghelper_include/log_helper.h
+++ b/bundles/log_service/loghelper_include/log_helper.h
@@ -23,7 +23,9 @@
 
 #include "bundle_context.h"
 #include "log_service.h"
-
+#ifdef __cplusplus
+extern "C" {
+#endif
 typedef struct log_helper log_helper_t;
 typedef struct log_helper* log_helper_pt;
 
@@ -32,5 +34,8 @@ celix_status_t logHelper_start(log_helper_pt loghelper);
 celix_status_t logHelper_stop(log_helper_pt loghelper);
 celix_status_t logHelper_destroy(log_helper_pt* loghelper);
 celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... );
+#ifdef __cplusplus
+}
+#endif
 
 #endif /* LOGHELPER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index e3db995..05e6ee1 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -24,12 +24,14 @@ if (PUBSUB)
 		option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF)
     endif (BUILD_PUBSUB_PSA_ZMQ)
 
+	option(BUILD_PUBSUB_PSA_NANOMSG "Build NanoMsg PubSub Admin - Experimental" OFF)
 	add_subdirectory(pubsub_api)
 	add_subdirectory(pubsub_spi)
 	add_subdirectory(pubsub_topology_manager)
 	add_subdirectory(pubsub_discovery)
 	add_subdirectory(pubsub_serializer_json)
 	add_subdirectory(pubsub_admin_zmq)
+	add_subdirectory(pubsub_admin_nanomsg)
 	add_subdirectory(pubsub_admin_udp_mc)
 	add_subdirectory(keygen)
 	add_subdirectory(mock)

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index e8113b9..126db2d 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -25,7 +25,7 @@ find_package(ZMQ REQUIRED)
 find_package(CZMQ REQUIRED)
 find_package(Jansson REQUIRED)
 
-set(PUBSUB_CONTAINER_LIBS ${JANSSON_LIBRARY} ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} Celix::dfi)
+set(PUBSUB_CONTAINER_LIBS ${JANSSON_LIBRARY} ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${NANOMSG_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} Celix::dfi)
 
 # UDP Multicast
 add_celix_container(pubsub_publisher_udp_mc
@@ -264,5 +264,91 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             USE_TERM
         )
     endif ()
+endif()
+if (BUILD_PUBSUB_PSA_NANOMSG)
+    add_celix_container("pubsub_publisher1_nanomsg"
+            GROUP "pubsub"
+            BUNDLES
+            Celix::shell
+            Celix::shell_tui
+            Celix::pubsub_serializer_json
+            Celix::pubsub_discovery_etcd
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_nanomsg
+            celix_pubsub_poi_publisher
+            PROPERTIES
+            PSA_NANOMSG_VERBOSE=true
+            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+            )
+    target_link_libraries(pubsub_publisher1_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+    add_celix_container("pubsub_publisher2_nanomsg"
+            GROUP "pubsub"
+            BUNDLES
+            Celix::shell
+            Celix::shell_tui
+            Celix::pubsub_serializer_json
+            Celix::pubsub_discovery_etcd
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_nanomsg
+            celix_pubsub_poi_publisher
+            PROPERTIES
+            PSA_NANOMSG_VERBOSE=true
+            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+            )
+    target_link_libraries(pubsub_publisher2_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+    add_celix_container(pubsub_subscriber1_nanomsg
+            GROUP "pubsub"
+            BUNDLES
+            Celix::shell
+            Celix::shell_tui
+            Celix::pubsub_serializer_json
+            Celix::pubsub_discovery_etcd
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_nanomsg
+            celix_pubsub_poi_subscriber
+            PROPERTIES
+            PSA_NANOMSG_VERBOSE=true
+            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+            )
+    target_link_libraries(pubsub_subscriber1_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+    add_celix_container(pubsub_subscriber2_nanomsg
+            GROUP "pubsub"
+            BUNDLES
+            Celix::shell
+            Celix::shell_tui
+            Celix::pubsub_serializer_json
+            Celix::pubsub_discovery_etcd
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_nanomsg
+            celix_pubsub_poi_subscriber
+            PROPERTIES
+            PSA_NANOMSG_VERBOSE=true
+            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+            )
+    target_link_libraries(pubsub_subscriber2_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+
+    if (ETCD_CMD AND XTERM_CMD)
+        #Runtime starting a publish and 2 subscribers for zmq
+        add_celix_runtime(pubsub_rt_nanomsg
+                NAME zmq
+                GROUP pubsub
+                CONTAINERS
+                pubsub_publisher1_nanomsg
+                pubsub_publisher2_nanomsg
+                pubsub_subscriber1_nanomsg
+                pubsub_subscriber2_nanomsg
+                COMMANDS
+                etcd
+                USE_TERM
+                )
+    endif ()
 
 endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt b/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt
new file mode 100644
index 0000000..ab9806e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if (BUILD_PUBSUB_PSA_NANOMSG)
+
+	find_package(NanoMsg REQUIRED)
+	find_package(Jansson REQUIRED)
+
+	add_celix_bundle(celix_pubsub_admin_nanomsg
+		BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_nanomsg"
+		VERSION "1.0.0"
+		GROUP "Celix/PubSub"
+		SOURCES
+			src/psa_nanomsg_activator.cc
+			src/pubsub_nanomsg_admin.cc
+			src/pubsub_nanomsg_topic_sender.cc
+			src/pubsub_nanomsg_topic_receiver.cc
+			src/pubsub_nanomsg_common.cc
+	)
+
+	set_target_properties(celix_pubsub_admin_nanomsg PROPERTIES INSTALL_RPATH "$ORIGIN")
+	target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE
+			Celix::pubsub_spi
+			Celix::framework Celix::dfi Celix::log_helper
+			${NANOMSG_LIBRARIES}
+	)
+	target_include_directories(celix_pubsub_admin_nanomsg PRIVATE
+		${NANOMSG_INCLUDE_DIR}
+		${JANSSON_INCLUDE_DIR}
+		src
+		../pubsub_topology_manager/src
+	)
+
+	install_celix_bundle(celix_pubsub_admin_nanomsg EXPORT celix COMPONENT pubsub)
+	target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE Celix::shell_api)
+	add_library(Celix::pubsub_admin_nanomsg ALIAS celix_pubsub_admin_nanomsg)
+endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
new file mode 100644
index 0000000..d7d88bb
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
@@ -0,0 +1,281 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * zmq_crypto.c
+ *
+ *  \date       Dec 2, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include "nanomsg_crypto.h"
+
+#include <zmq.h>
+#include <openssl/conf.h>
+#include <openssl/evp.h>
+#include <openssl/err.h>
+
+#include <string.h>
+
+#define MAX_FILE_PATH_LENGTH 512
+#define ZMQ_KEY_LENGTH 40
+#define AES_KEY_LENGTH 32
+#define AES_IV_LENGTH 16
+
+#define KEY_TO_GET "aes_key"
+#define IV_TO_GET "aes_iv"
+
+static char* read_file_content(char* filePath, char* fileName);
+static void parse_key_lines(char *keysBuffer, char **key, char **iv);
+static void parse_key_line(char *line, char **key, char **iv);
+static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey);
+
+/**
+ * Return a valid zcert_t from an encoded file
+ * Caller is responsible for freeing by calling zcert_destroy(zcert** cert);
+ */
+zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path)
+{
+
+	if (keysFilePath == NULL){
+		keysFilePath = DEFAULT_KEYS_FILE_PATH;
+	}
+
+	if (keysFileName == NULL){
+		keysFileName = DEFAULT_KEYS_FILE_NAME;
+	}
+
+	char* keys_data = read_file_content(keysFilePath, keysFileName);
+	if (keys_data == NULL){
+		return NULL;
+	}
+
+	char *key = NULL;
+	char *iv = NULL;
+	parse_key_lines(keys_data, &key, &iv);
+	free(keys_data);
+
+	if (key == NULL || iv == NULL){
+		free(key);
+		free(iv);
+
+		printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
+		return NULL;
+	}
+
+	//At this point, we know an aes key and iv are stored and loaded
+
+	// generate sha256 hashes
+	unsigned char key_digest[EVP_MAX_MD_SIZE];
+	unsigned char iv_digest[EVP_MAX_MD_SIZE];
+	generate_sha256_hash((char*) key, key_digest);
+	generate_sha256_hash((char*) iv, iv_digest);
+
+	zchunk_t* encoded_secret = zchunk_slurp (file_path, 0);
+	if (encoded_secret == NULL){
+		free(key);
+		free(iv);
+
+		return NULL;
+	}
+
+	int encoded_secret_size = (int) zchunk_size (encoded_secret);
+	char* encoded_secret_data = zchunk_strdup(encoded_secret);
+	zchunk_destroy (&encoded_secret);
+
+	// Decryption of data
+	int decryptedtext_len;
+	unsigned char decryptedtext[encoded_secret_size];
+	decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext);
+	decryptedtext[decryptedtext_len] = '\0';
+
+	EVP_cleanup();
+
+	free(encoded_secret_data);
+	free(key);
+	free(iv);
+
+	// The public and private keys are retrieved
+	char* public_text = NULL;
+	char* secret_text = NULL;
+
+	extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text);
+
+	byte public_key [32] = { 0 };
+	byte secret_key [32] = { 0 };
+
+	zmq_z85_decode (public_key, public_text);
+	zmq_z85_decode (secret_key, secret_text);
+
+	zcert_t* cert_loaded = zcert_new_from(public_key, secret_key);
+
+	free(public_text);
+	free(secret_text);
+
+	return cert_loaded;
+}
+
+int generate_sha256_hash(char* text, unsigned char* digest)
+{
+	unsigned int digest_len;
+
+	EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
+	EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
+	EVP_DigestUpdate(mdctx, text, strlen(text));
+	EVP_DigestFinal_ex(mdctx, digest, &digest_len);
+	EVP_MD_CTX_free(mdctx);
+
+	return digest_len;
+}
+
+int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
+{
+	int len;
+	int plaintext_len;
+
+	EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
+
+	EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
+	EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
+	plaintext_len = len;
+	EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
+	plaintext_len += len;
+
+	EVP_CIPHER_CTX_free(ctx);
+
+	return plaintext_len;
+}
+
+/**
+ * Caller is responsible for freeing the returned value
+ */
+static char* read_file_content(char* filePath, char* fileName){
+
+	char fileNameWithPath[MAX_FILE_PATH_LENGTH];
+	snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName);
+	int rc = 0;
+
+	if (!zsys_file_exists(fileNameWithPath)){
+		printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath);
+		return NULL;
+	}
+
+	zfile_t* keys_file = zfile_new (filePath, fileName);
+	rc = zfile_input (keys_file);
+	if (rc != 0){
+		zfile_destroy(&keys_file);
+		printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath);
+		return NULL;
+	}
+
+	ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
+	zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
+	if (keys_chunk == NULL){
+		zfile_close(keys_file);
+		zfile_destroy(&keys_file);
+		printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
+		return NULL;
+	}
+
+	char* keys_data = zchunk_strdup(keys_chunk);
+	zchunk_destroy(&keys_chunk);
+	zfile_close(keys_file);
+	zfile_destroy (&keys_file);
+
+	return keys_data;
+}
+
+static void parse_key_lines(char *keysBuffer, char **key, char **iv){
+	char *line = NULL, *saveLinePointer = NULL;
+
+	bool firstTime = true;
+	do {
+		if (firstTime){
+			line = strtok_r(keysBuffer, "\n", &saveLinePointer);
+			firstTime = false;
+		}else {
+			line = strtok_r(NULL, "\n", &saveLinePointer);
+		}
+
+		if (line == NULL){
+			break;
+		}
+
+		parse_key_line(line, key, iv);
+
+	} while((*key == NULL || *iv == NULL) && line != NULL);
+
+}
+
+static void parse_key_line(char *line, char **key, char **iv){
+	char *detectedKey = NULL, *detectedValue= NULL;
+
+	char* sep_at = strchr(line, ':');
+	if (sep_at == NULL){
+		return;
+	}
+
+	*sep_at = '\0'; // overwrite first separator, creating two strings.
+	detectedKey = line;
+	detectedValue = sep_at + 1;
+
+	if (detectedKey == NULL || detectedValue == NULL){
+		return;
+	}
+	if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
+		return;
+	}
+
+	if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
+		*key = strndup(detectedValue, AES_KEY_LENGTH);
+	} else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
+		*iv = strndup(detectedValue, AES_IV_LENGTH);
+	}
+}
+
+static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) {
+	// Load decrypted text buffer
+	zchunk_t* secret_decrypted = zchunk_new(input, inputlen);
+	if (secret_decrypted == NULL){
+		printf("CRYPTO: Failed to create zchunk\n");
+		return;
+	}
+
+	zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted);
+	zchunk_destroy (&secret_decrypted);
+	if (secret_config == NULL){
+		printf("CRYPTO: Failed to create zconfig\n");
+		return;
+	}
+
+	// Extract public and secret key from text buffer
+	char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL);
+	char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL);
+
+	if (public_text == NULL || secret_text == NULL){
+		zconfig_destroy(&secret_config);
+		printf("CRYPTO: Loading public / secret key from text-buffer failed!\n");
+		return;
+	}
+
+	*publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
+	*secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
+
+	zconfig_destroy(&secret_config);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
new file mode 100644
index 0000000..f1a990f
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
@@ -0,0 +1,41 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * zmq_crypto.h
+ *
+ *  \date       Dec 2, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef ZMQ_CRYPTO_H_
+#define ZMQ_CRYPTO_H_
+
+#include <czmq.h>
+
+#define PROPERTY_KEYS_FILE_PATH "keys.file.path"
+#define PROPERTY_KEYS_FILE_NAME "keys.file.name"
+#define DEFAULT_KEYS_FILE_PATH "/etc/"
+#define DEFAULT_KEYS_FILE_NAME "pubsub.keys"
+
+zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path);
+int generate_sha256_hash(char* text, unsigned char* digest);
+int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext);
+
+#endif

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
new file mode 100644
index 0000000..79ea1d4
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
@@ -0,0 +1,113 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include <stdlib.h>
+
+#include "celix_api.h"
+#include "pubsub_serializer.h"
+#include "log_helper.h"
+
+#include "pubsub_admin.h"
+#include "pubsub_nanomsg_admin.h"
+#include "../../../shell/shell/include/command.h"
+
+typedef struct psa_nanomsg_activator {
+	log_helper_t *logHelper;
+
+	pubsub_nanomsg_admin_t *admin;
+
+	long serializersTrackerId;
+
+	pubsub_admin_service_t adminService;
+	long adminSvcId;
+
+	command_service_t cmdSvc;
+	long cmdSvcId;
+} psa_nanomsg_activator_t;
+
+int psa_nanomsg_start(psa_nanomsg_activator_t *act, celix_bundle_context_t *ctx) {
+	act->adminSvcId = -1L;
+	act->cmdSvcId = -1L;
+	act->serializersTrackerId = -1L;
+
+	logHelper_create(ctx, &act->logHelper);
+	logHelper_start(act->logHelper);
+
+	act->admin = pubsub_nanoMsgAdmin_create(ctx, act->logHelper);
+	celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+
+	//track serializers
+	if (status == CELIX_SUCCESS) {
+		celix_service_tracking_options_t opts{};
+		opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+		opts.filter.ignoreServiceLanguage = true;
+		opts.callbackHandle = act->admin;
+		opts.addWithProperties = pubsub_nanoMsgAdmin_addSerializerSvc;
+		opts.removeWithProperties = pubsub_nanoMsgAdmin_removeSerializerSvc;
+		act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+	}
+
+	//register pubsub admin service
+	if (status == CELIX_SUCCESS) {
+		pubsub_admin_service_t *psaSvc = &act->adminService;
+		psaSvc->handle = act->admin;
+		psaSvc->matchPublisher = pubsub_nanoMsgAdmin_matchPublisher;
+		psaSvc->matchSubscriber = pubsub_nanoMsgAdmin_matchSubscriber;
+		psaSvc->matchEndpoint = pubsub_nanoMsgAdmin_matchEndpoint;
+		psaSvc->setupTopicSender = pubsub_nanoMsgAdmin_setupTopicSender;
+		psaSvc->teardownTopicSender = pubsub_nanoMsgAdmin_teardownTopicSender;
+		psaSvc->setupTopicReceiver = pubsub_nanoMsgAdmin_setupTopicReceiver;
+		psaSvc->teardownTopicReceiver = pubsub_nanoMsgAdmin_teardownTopicReceiver;
+		psaSvc->addEndpoint = pubsub_nanoMsgAdmin_addEndpoint;
+		psaSvc->removeEndpoint = pubsub_nanoMsgAdmin_removeEndpoint;
+
+		celix_properties_t *props = celix_properties_create();
+		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE);
+
+		act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
+	}
+
+	//register shell command service
+	{
+		act->cmdSvc.handle = act->admin;
+		act->cmdSvc.executeCommand = pubsub_nanoMsgAdmin_executeCommand;
+		celix_properties_t *props = celix_properties_create();
+		celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
+		celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
+		celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
+		act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+	}
+
+	return status;
+}
+
+int psa_nanomsg_stop(psa_nanomsg_activator_t *act, celix_bundle_context_t *ctx) {
+	celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+	celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+	celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+	pubsub_nanoMsgAdmin_destroy(act->admin);
+
+	logHelper_stop(act->logHelper);
+	logHelper_destroy(&act->logHelper);
+
+	return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_nanomsg_activator_t, psa_nanomsg_start, psa_nanomsg_stop);

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
new file mode 100644
index 0000000..f3e6831
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -0,0 +1,698 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <string>
+#include <vector>
+#include <memory.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <ifaddrs.h>
+#include <pubsub_endpoint.h>
+#include <pubsub_serializer.h>
+
+#include "pubsub_utils.h"
+#include "pubsub_nanomsg_admin.h"
+#include "pubsub_psa_nanomsg_constants.h"
+#include "pubsub_nanomsg_topic_sender.h"
+#include "pubsub_nanomsg_topic_receiver.h"
+/*
+//#define L_DEBUG(...) \
+//    logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+//#define L_INFO(...) \
+//    logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+//#define L_WARN(...) \
+//    logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+//#define L_ERROR(...) \
+//    logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+*/
+#define L_DEBUG printf
+#define L_INFO printf
+#define L_WARN printf
+#define L_ERROR printf
+
+struct pubsub_nanomsg_admin {
+    celix_bundle_context_t *ctx;
+    log_helper_t *log;
+    const char *fwUUID;
+
+    char* ipAddress;
+
+    unsigned int basePort;
+    unsigned int maxPort;
+
+    double qosSampleScore;
+    double qosControlScore;
+    double defaultScore;
+
+    bool verbose;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
+    } serializers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
+    } topicSenders;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
+    } topicReceivers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
+    } discoveredEndpoints;
+
+};
+
+typedef struct psa_nanomsg_serializer_entry {
+    const char *serType;
+    long svcId;
+    pubsub_serializer_service_t *svc;
+} psa_nanomsg_serializer_entry_t;
+
+static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
+static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanomsg_admin_t *psa,
+                                                                    pubsub_nanomsg_topic_receiver_t *receiver,
+                                                                    const celix_properties_t *endpoint);
+static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_nanomsg_admin_t *psa,
+                                                                         pubsub_nanomsg_topic_receiver_t *receiver,
+                                                                         const celix_properties_t *endpoint);
+
+
+pubsub_nanomsg_admin_t* pubsub_nanoMsgAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(calloc(1, sizeof(*psa)));
+    psa->ctx = ctx;
+    psa->log = logHelper;
+    psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
+    psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+
+    char *ip = NULL;
+    const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , NULL);
+    if (confIp != NULL) {
+        ip = strndup(confIp, 1024);
+    }
+
+    if (ip == NULL) {
+        //TODO try to get ip from subnet (CIDR)
+    }
+
+    if (ip == NULL) {
+        //try to get ip from itf
+        const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, NULL);
+        nanoMsg_getIpAddress(interface, &ip);
+    }
+
+    if (ip == NULL) {
+        L_WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using default ip (%s)", PUBSUB_NANOMSG_DEFAULT_IP);
+        ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024);
+    }
+
+    psa->ipAddress = ip;
+    if (psa->verbose) {
+        L_INFO("[PSA_NANOMSG] Using %s for service annunciation", ip);
+    }
+
+
+    long basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT);
+    long maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT);
+    psa->basePort = (unsigned int)basePort;
+    psa->maxPort = (unsigned int)maxPort;
+    if (psa->verbose) {
+        L_INFO("[PSA_NANOMSG] Using base till max port: %i till %i", psa->basePort, psa->maxPort);
+    }
+
+
+//    long nrThreads = celix_bundleContext_getPropertyAsLong(ctx, PUBSUB_NANOMSG_NR_THREADS_KEY, 0);
+//    if (nrThreads > 0) {
+//        zsys_set_io_threads((size_t)nrThreads);
+//        L_INFO("[PSA_NANOMSG] Using %d threads for NanoMsg", (size_t)nrThreads);
+//    }
+
+
+    psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
+    psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
+    psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
+
+    celixThreadMutex_create(&psa->serializers.mutex, NULL);
+    psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+    celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
+    psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
+    psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
+    psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    return psa;
+}
+
+void pubsub_nanoMsgAdmin_destroy(pubsub_nanomsg_admin_t *psa) {
+    if (psa == NULL) {
+        return;
+    }
+
+    //note assuming al psa register services and service tracker are removed.
+
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
+        pubsub_nanoMsgTopicSender_destroy(sender);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_nanomsg_topic_receiver_t *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+        pubsub_nanoMsgTopicReceiver_destroy(recv);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_properties_t *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+        celix_properties_destroy(ep);
+    }
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    iter = hashMapIterator_construct(psa->serializers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
+        free(entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    celixThreadMutex_destroy(&psa->topicSenders.mutex);
+    hashMap_destroy(psa->topicSenders.map, true, false);
+
+    celixThreadMutex_destroy(&psa->topicReceivers.mutex);
+    hashMap_destroy(psa->topicReceivers.map, true, false);
+
+    celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
+    hashMap_destroy(psa->discoveredEndpoints.map, false, false);
+
+    celixThreadMutex_destroy(&psa->serializers.mutex);
+    hashMap_destroy(psa->serializers.map, false, false);
+
+    free(psa->ipAddress);
+
+    free(psa);
+}
+
+void pubsub_nanoMsgAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+    const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    if (serType == NULL) {
+        L_INFO("[PSA_NANOMSG] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
+        return;
+    }
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)svcId));
+    if (entry == NULL) {
+        entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
+        entry->serType = serType;
+        entry->svcId = svcId;
+        entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
+        hashMap_put(psa->serializers.map, (void*)svcId, entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void */*svc*/, const celix_properties_t *props) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    //remove serializer
+    // 1) First find entry and
+    // 2) loop and destroy all topic sender using the serializer and
+    // 3) loop and destroy all topic receivers using the serializer
+    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(psa->serializers.map, (void*)svcId));
+    if (entry != NULL) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry));
+            if (sender != NULL && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+                char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
+                hashMapIterator_remove(&iter);
+                pubsub_nanoMsgTopicSender_destroy(sender);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+            if (receiver != NULL && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+                char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
+                hashMapIterator_remove(&iter);
+                pubsub_nanoMsgTopicReceiver_destroy(receiver);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+        free(entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+celix_status_t pubsub_nanoMsgAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
+                                                  double *outScore, long *outSerializerSvcId) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+    L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+    *outScore = score;
+
+    return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_matchSubscriber(void *handle, long svcProviderBndId,
+                                                   const celix_properties_t *svcProperties, double *outScore,
+                                                   long *outSerializerSvcId) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+    L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+    if (outScore != NULL) {
+        *outScore = score;
+    }
+    return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+    L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint");
+    celix_status_t  status = CELIX_SUCCESS;
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, NULL);
+    if (outMatch != NULL) {
+        *outMatch = match;
+    }
+    return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
+                                                    long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Create TopicSender
+    //2) Store TopicSender
+    //3) Connect existing endpoints
+    //4) set outPublisherEndpoint
+
+    celix_properties_t *newEndpoint = NULL;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(psa->topicSenders.map, key));
+    if (sender == NULL) {
+        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serializerSvcId));
+        if (serEntry != NULL) {
+            sender = pubsub_nanoMsgTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
+                                                      psa->ipAddress, psa->basePort, psa->maxPort);
+        }
+        if (sender != NULL) {
+            const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+                                                serType, NULL);
+            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
+            }
+            hashMap_put(psa->topicSenders.map, key, sender);
+        } else {
+            L_ERROR("[PSA NANOMSG] Error creating a TopicSender");
+            free(key);
+        }
+    } else {
+        free(key);
+        L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (sender != NULL && newEndpoint != NULL) {
+        //TODO connect endpoints to sender, NOTE is this needed for a nanomsg topic sender?
+    }
+
+    if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+        *outPublisherEndpoint = newEndpoint;
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Find and remove TopicSender from map
+    //2) destroy topic sender
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
+    if (entry != NULL) {
+        char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
+        pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(psa->topicSenders.map, key));
+        free(mapKey);
+        //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
+        pubsub_nanoMsgTopicSender_destroy(sender);
+    } else {
+        L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    free(key);
+
+    return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
+                                                      long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+    celix_properties_t *newEndpoint = NULL;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(psa->topicReceivers.map, key));
+    if (receiver == NULL) {
+        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serializerSvcId));
+        if (serEntry != NULL) {
+            receiver = pubsub_nanoMsgTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId,
+                                                          serEntry->svc);
+        } else {
+            L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
+        }
+        if (receiver != NULL) {
+            const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
+            }
+            hashMap_put(psa->topicReceivers.map, key, receiver);
+        } else {
+            L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
+            free(key);
+        }
+    } else {
+        free(key);
+        L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (receiver != NULL && newEndpoint != NULL) {
+        celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_properties_t *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+            const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+                pubsub_nanoMsgAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
+        }
+        celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+    }
+
+    if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+        *outSubscriberEndpoint = newEndpoint;
+    }
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+    free(key);
+    if (entry != NULL) {
+        char *receiverKey = static_cast<char*>(hashMapEntry_getKey(entry));
+        pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(entry));
+        hashMap_remove(psa->topicReceivers.map, receiverKey);
+
+        free(receiverKey);
+        pubsub_nanoMsgTopicReceiver_destroy(receiver);
+    }
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanomsg_admin_t * /*psa*/,
+                                                                    pubsub_nanomsg_topic_receiver_t *receiver,
+                                                                    const celix_properties_t *endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
+    const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, NULL);
+
+    if (url == NULL) {
+//        const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+//        const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+//        L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type);
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        if (eScope != NULL && eTopic != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 &&
+            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+            pubsub_nanoMsgTopicReceiver_connectTo(receiver, url);
+        }
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+            pubsub_nanoMsgAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    celix_properties_t *cpy = celix_properties_copy(endpoint);
+    const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
+    hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+
+static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_nanomsg_admin_t * /*psa*/,
+                                                                         pubsub_nanomsg_topic_receiver_t *receiver,
+                                                                         const celix_properties_t *endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
+    const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, NULL);
+
+    if (url == NULL) {
+        L_WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        if (eScope != NULL && eTopic != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 &&
+            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+            pubsub_nanoMsgTopicReceiver_disconnectFrom(receiver, url);
+        }
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+            pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+    celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid));
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    if (found != NULL) {
+        celix_properties_destroy(found);
+    }
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out,
+                                                  FILE *errStream __attribute__((unused))) {
+    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+    celix_status_t  status = CELIX_SUCCESS;
+
+    fprintf(out, "\n");
+    fprintf(out, "Topic Senders:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
+        long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
+        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serSvcId));
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
+        const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
+        const char *url = pubsub_nanoMsgTopicSender_url(sender);
+        fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- url             = %s\n", url);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    fprintf(out, "\n");
+    fprintf(out, "\nTopic Receivers:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+        long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
+        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serSvcId));
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
+        const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+
+        std::vector<std::string> connected{};
+        std::vector<std::string> unconnected{};
+        pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
+
+        fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        for (auto url : connected) {
+            fprintf(out, "   |- connected url   = %s\n", url.c_str());
+        }
+        for (auto url : unconnected) {
+            fprintf(out, "   |- unconnected url = %s\n", url.c_str());
+        }
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+    fprintf(out, "\n");
+
+    return status;
+}
+
+#ifndef ANDROID
+static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+    struct ifaddrs *ifaddr, *ifa;
+    char host[NI_MAXHOST];
+
+    if (getifaddrs(&ifaddr) != -1)
+    {
+        for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+        {
+            if (ifa->ifa_addr == NULL)
+                continue;
+
+            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+                if (interface == NULL) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+                else if (strcmp(ifa->ifa_name, interface) == 0) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+            }
+        }
+
+        freeifaddrs(ifaddr);
+    }
+
+    return status;
+}
+#endif
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
new file mode 100644
index 0000000..195ccb7
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -0,0 +1,73 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_ZMQ_ADMIN_H
+#define CELIX_PUBSUB_ZMQ_ADMIN_H
+
+#include "celix_api.h"
+#include "log_helper.h"
+
+#define PUBSUB_NANOMSG_ADMIN_TYPE       "zmq"
+#define PUBSUB_NANOMSG_URL_KEY          "zmq.url"
+
+#define PUBSUB_NANOMSG_VERBOSE_KEY      "PSA_ZMQ_VERBOSE"
+#define PUBSUB_NANOMSG_VERBOSE_DEFAULT  true
+
+#define PUBSUB_NANOMSG_PSA_IP_KEY       "PSA_IP"
+#define PUBSUB_NANOMSG_PSA_ITF_KEY		"PSA_INTERFACE"
+#define PUBSUB_NANOMSG_NR_THREADS_KEY   "PSA_ZMQ_NR_THREADS"
+
+#define PUBSUB_NANOMSG_DEFAULT_IP       "127.0.0.1"
+
+typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
+
+pubsub_nanomsg_admin_t* pubsub_nanoMsgAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+void pubsub_nanoMsgAdmin_destroy(pubsub_nanomsg_admin_t *psa);
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+celix_status_t pubsub_nanoMsgAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
+                                                  double *score, long *serializerSvcId);
+                                                  celix_status_t pubsub_nanoMsgAdmin_matchSubscriber(void *handle, long svcProviderBndId,
+                                                   const celix_properties_t *svcProperties, double *score,
+                                                   long *serializerSvcId);
+celix_status_t pubsub_nanoMsgAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+
+celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
+                                                    long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_nanoMsgAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
+                                                      long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_nanoMsgAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_nanoMsgAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+
+void pubsub_nanoMsgAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+#ifdef __cplusplus
+}
+#endif
+
+celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
+
+#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
+

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
new file mode 100644
index 0000000..2a2bcfe
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
@@ -0,0 +1,56 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <memory.h>
+#include "pubsub_nanomsg_common.h"
+
+int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle __attribute__((unused)), const char *msgType,
+                                         unsigned int *msgTypeId) {
+    *msgTypeId = utils_stringHash(msgType);
+    return 0;
+}
+
+bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_header_t *hdr) {
+    bool check=false;
+    int major=0,minor=0;
+
+    if (msgVersion!=NULL) {
+        version_getMajor(msgVersion,&major);
+        version_getMinor(msgVersion,&minor);
+        if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+            check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+        }
+    }
+
+    return check;
+}
+
+void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter) {
+    for (int i = 0; i < 5; ++i) {
+        filter[i] = '\0';
+    }
+    if (scope != NULL && strnlen(scope, 3) >= 2)  {
+        filter[0] = scope[0];
+        filter[1] = scope[1];
+    }
+    if (topic != NULL && strnlen(topic, 3) >= 2)  {
+        filter[2] = topic[0];
+        filter[3] = topic[1];
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
new file mode 100644
index 0000000..28293a8
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -0,0 +1,56 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_ZMQ_COMMON_H
+#define CELIX_PUBSUB_ZMQ_COMMON_H
+
+#include <utils.h>
+
+#include "version.h"
+#include "pubsub_common.h"
+
+
+/*
+ * NOTE zmq is used by first sending three frames:
+ * 1) A subscription filter.
+ * This is a 5 char string of the first two chars of scope and topic combined and terminated with a '\0'.
+ *
+ * 2) The pubsub_zmq_msg_header_t is send containg the type id and major/minor version
+ *
+ * 3) The actual payload
+ */
+
+
+struct pubsub_zmq_msg_header {
+    //header
+    unsigned int type;
+    unsigned char major;
+    unsigned char minor;
+};
+
+typedef struct pubsub_zmq_msg_header pubsub_nanmosg_msg_header_t;
+
+
+int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
+void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter);
+
+bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_header_t *hdr);
+
+
+#endif //CELIX_PUBSUB_ZMQ_COMMON_H

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
new file mode 100644
index 0000000..646a80e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -0,0 +1,420 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <mutex>
+#include <memory.h>
+#include <vector>
+#include <string>
+#include <sstream>
+
+#include <stdlib.h>
+#include <assert.h>
+
+#include <sys/epoll.h>
+#include <arpa/inet.h>
+
+#include <nanomsg/nn.h>
+#include <nanomsg/bus.h>
+
+#include <pubsub_serializer.h>
+#include <pubsub/subscriber.h>
+#include <pubsub_constants.h>
+#include <pubsub_endpoint.h>
+#include <log_helper.h>
+
+#include "pubsub_nanomsg_topic_receiver.h"
+#include "pubsub_psa_nanomsg_constants.h"
+#include "pubsub_nanomsg_common.h"
+#include "pubsub_topology_manager.h"
+
+//TODO see if block and wakeup (reset) also works
+#define PSA_NANOMSG_RECV_TIMEOUT 1000
+
+/*
+#define L_DEBUG(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+*/
+#define L_DEBUG printf
+#define L_INFO printf
+#define L_WARN printf
+#define L_ERROR printf
+
+struct pubsub_nanomsg_topic_receiver {
+    celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
+    char *scope;
+    char *topic;
+    char scopeAndTopicFilter[5];
+
+    int nanoMsgSocket;
+
+    struct {
+        celix_thread_t thread;
+        std::mutex mutex;
+        bool running;
+    } recvThread;
+
+    struct {
+        std::mutex mutex;
+        hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+    } requestedConnections;
+
+    long subscriberTrackerId;
+    struct {
+        std::mutex mutex;
+        hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+    } subscribers;
+};
+
+typedef struct psa_zmq_requested_connection_entry {
+    char *url;
+    bool connected;
+    int id;
+} psa_nanomsg_requested_connection_entry_t;
+
+typedef struct psa_zmq_subscriber_entry {
+    int usageCount;
+    hash_map_t *msgTypes; //map from serializer svc
+    pubsub_subscriber_t *svc;
+} psa_nanomsg_subscriber_entry_t;
+
+
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
+                                                         const celix_bundle_t *owner);
+static void* psa_nanomsg_recvThread(void *data);
+
+
+pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+                                                                    log_helper_t *logHelper, const char *scope,
+                                                                    const char *topic, long serializerSvcId,
+                                                                    pubsub_serializer_service_t *serializer) {
+    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
+    receiver->ctx = ctx;
+    receiver->logHelper = logHelper;
+    receiver->serializerSvcId = serializerSvcId;
+    receiver->serializer = serializer;
+    psa_nanomsg_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
+
+
+    receiver->nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
+    if (receiver->nanoMsgSocket < 0) {
+        free(receiver);
+        receiver = NULL;
+        L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", scope, topic);
+    } else {
+        int timeout = PSA_NANOMSG_RECV_TIMEOUT;
+        if (nn_setsockopt(receiver->nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
+                          sizeof (timeout)) < 0) {
+            free(receiver);
+            receiver = NULL;
+            L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", scope, topic);
+        }
+
+        char subscribeFilter[5];
+        psa_nanomsg_setScopeAndTopicFilter(scope, topic, subscribeFilter);
+        //zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter);
+
+        receiver->scope = strndup(scope, 1024 * 1024);
+        receiver->topic = strndup(topic, 1024 * 1024);
+
+        receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        char buf[size + 1];
+        snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        celix_service_tracking_options_t opts{};
+        opts.filter.ignoreServiceLanguage = true;
+        opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+        opts.filter.filter = buf;
+        opts.callbackHandle = receiver;
+        opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
+        opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
+
+        receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+        receiver->recvThread.running = true;
+        celixThread_create(&receiver->recvThread.thread, NULL, psa_nanomsg_recvThread, receiver);
+        std::stringstream namestream;
+        namestream << "NANOMSG TR " << scope << "/" << topic;
+        celixThread_setName(&receiver->recvThread.thread, namestream.str().c_str());
+    }
+    return receiver;
+}
+
+void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver) {
+    if (receiver != NULL) {
+
+        {
+            std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
+            receiver->recvThread.running = false;
+        }
+        celixThread_join(receiver->recvThread.thread, NULL);
+
+        celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
+        hash_map_iterator_t iter=hash_map_iterator_t();
+        {
+            std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
+            iter = hashMapIterator_construct(receiver->subscribers.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
+                if (entry != NULL)  {
+                    receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                    free(entry);
+                }
+            }
+            hashMap_destroy(receiver->subscribers.map, false, false);
+        }
+
+
+        {
+            std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
+            iter = hashMapIterator_construct(receiver->requestedConnections.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
+                if (entry != NULL) {
+                    free(entry->url);
+                    free(entry);
+                }
+            }
+            hashMap_destroy(receiver->requestedConnections.map, false, false);
+        }
+
+        //celixThreadMutex_destroy(&receiver->subscribers.mutex);
+        //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+        //celixThreadMutex_destroy(&receiver->recvThread.mutex);
+
+        nn_close(receiver->nanoMsgSocket);
+
+        free(receiver->scope);
+        free(receiver->topic);
+    }
+    free(receiver);
+}
+
+const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver) {
+    return receiver->scope;
+}
+const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver) {
+    return receiver->topic;
+}
+
+long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver) {
+    return receiver->serializerSvcId;
+}
+
+void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
+                                                 std::vector<std::string> &connectedUrls,
+                                                 std::vector<std::string> &unconnectedUrls) {
+    std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter));
+        if (entry->connected) {
+            connectedUrls.push_back(std::string(entry->url));
+        } else {
+            unconnectedUrls.push_back(std::string(entry->url));
+        }
+    }
+}
+
+
+void pubsub_nanoMsgTopicReceiver_connectTo(
+        pubsub_nanomsg_topic_receiver_t *receiver,
+        const char *url) {
+    L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope, receiver->topic, url);
+
+    std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
+    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(receiver->requestedConnections.map, url));
+    if (entry == NULL) {
+        entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry)));
+        entry->url = strndup(url, 1024*1024);
+        entry->connected = false;
+        hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
+    }
+    if (!entry->connected) {
+        int connection_id = nn_connect(receiver->nanoMsgSocket, url);
+        if (connection_id >= 0) {
+            entry->connected = true;
+            entry->id = connection_id;
+        } else {
+            L_WARN("[PSA_NANOMSG] Error connecting to NANOMSG url %s. (%s)", url, strerror(errno));
+        }
+    }
+}
+
+void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url) {
+    L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope, receiver->topic, url);
+
+    std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
+    psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(receiver->requestedConnections.map, url));
+    if (entry != NULL && entry->connected) {
+        if (nn_shutdown(receiver->nanoMsgSocket, entry->id) == 0) {
+            entry->connected = false;
+        } else {
+            L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno));
+        }
+    }
+    if (entry != NULL) {
+        free(entry->url);
+        free(entry);
+    }
+}
+
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+
+    long bndId = celix_bundle_getId(bnd);
+    const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+    if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+        //not the same scope. ignore
+        return;
+    }
+
+    std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
+    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId));
+    if (entry != NULL) {
+        entry->usageCount += 1;
+    } else {
+        //new create entry
+        entry = static_cast<psa_nanomsg_subscriber_entry_t*>(calloc(1, sizeof(*entry)));
+        entry->usageCount = 1;
+        entry->svc = static_cast<pubsub_subscriber_t*>(svc);
+
+        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+        if (rc == 0) {
+            hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+        } else {
+            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            free(entry);
+        }
+    }
+}
+
+static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
+                                                         const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
+    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+
+    long bndId = celix_bundle_getId(bnd);
+
+    std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
+    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId));
+    if (entry != NULL) {
+        entry->usageCount -= 1;
+    }
+    if (entry != NULL && entry->usageCount <= 0) {
+        //remove entry
+        hashMap_remove(receiver->subscribers.map, (void*)bndId);
+        int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+        if (rc != 0) {
+            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+        }
+        free(entry);
+    }
+}
+
+static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+    pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
+    pubsub_subscriber_t *svc = entry->svc;
+
+    if (msgSer!= NULL) {
+        void *deserializedMsg = NULL;
+        bool validVersion = psa_nanomsg_checkVersion(msgSer->msgVersion, hdr);
+        if (validVersion) {
+            celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
+            if(status == CELIX_SUCCESS) {
+                bool release = false;
+                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
+                if (release) {
+                    msgSer->freeMsg(msgSer->handle, deserializedMsg);
+                }
+            } else {
+                L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
+            }
+        }
+    } else {
+        L_WARN("[PSA_NANOMSG_TR] Cannot find serializer for type id %i", hdr->type);
+    }
+}
+
+static inline void processMsg(pubsub_nanomsg_topic_receiver_t *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+    std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
+        if (entry != NULL) {
+            processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize);
+        }
+    }
+}
+
+static void* psa_nanomsg_recvThread(void *data) {
+    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(data);
+    bool running{};
+    {
+        std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
+        running = receiver->recvThread.running;
+    }
+    while (running) {
+        void * payload = nullptr;
+        nn_iovec iov[2];
+        iov[0].iov_base = &payload;
+        iov[0].iov_len = NN_MSG;
+
+        nn_msghdr msgHdr;
+        memset(&msgHdr, 0, sizeof(msgHdr));
+
+        msgHdr.msg_iov = iov;
+        msgHdr.msg_iovlen = 1;//2;
+
+        msgHdr.msg_control = nullptr;
+        msgHdr.msg_controllen = 0;
+
+        errno = 0;
+        int recvBytes = nn_recvmsg(receiver->nanoMsgSocket, &msgHdr, 0);
+        if (payload && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
+            pubsub_nanmosg_msg_header_t *header = static_cast<pubsub_nanmosg_msg_header_t*>(payload);
+            void* msg = ((char*)payload) + sizeof(*header);
+            printf("HEADER: Type %d, major %d, minor %d\n", header->type , (int)header->major, (int)header->minor);
+            fprintf(stderr, "RECEIVED %d bytes\n", recvBytes);
+            processMsg(receiver, header, (char*)msg, recvBytes-sizeof(header));
+
+            nn_freemsg(payload);
+        } else if (recvBytes >= 0) {
+            L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
+        } else if (errno == EAGAIN || errno == ETIMEDOUT) {
+            //nop
+        } else if (errno == EINTR) {
+            L_DEBUG("[PSA_ZMQ_TR] zmsg_recv interrupted");
+        } else {
+            L_WARN("[PSA_ZMQ_TR] Error receiving zmq message: errno %d: %s\n", errno, strerror(errno));
+        }
+    } // while
+
+    return NULL;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
new file mode 100644
index 0000000..786fb90
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -0,0 +1,45 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
+#define CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
+#include <string>
+#include <vector>
+
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+
+pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+                                                                    log_helper_t *logHelper, const char *scope,
+                                                                    const char *topic, long serializerSvcId,
+                                                                    pubsub_serializer_service_t *serializer);
+void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+
+const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
+const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+
+long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
+void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
+                                                 std::vector<std::string> &connectedUrls,
+                                                 std::vector<std::string> &unconnectedUrls);
+
+void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+
+#endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H