You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/01 19:43:20 UTC

celix git commit: Updates to nanomsg admin

Repository: celix
Updated Branches:
  refs/heads/nanomsg 4fc1f3d3f -> cb740b0d4


Updates to nanomsg admin


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/cb740b0d
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/cb740b0d
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/cb740b0d

Branch: refs/heads/nanomsg
Commit: cb740b0d41df786fead29ab1d9f3930187a7fe82
Parents: 4fc1f3d
Author: Erjan Altena <er...@gmail.com>
Authored: Thu Nov 1 20:43:05 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Thu Nov 1 20:43:05 2018 +0100

----------------------------------------------------------------------
 .../pubsub_admin_nanomsg/src/nanomsg_crypto.cc  | 281 ---------
 .../pubsub_admin_nanomsg/src/nanomsg_crypto.h   |  41 --
 .../src/psa_nanomsg_activator.cc                | 171 +++---
 .../src/pubsub_nanomsg_admin.cc                 | 573 ++++++++++---------
 .../src/pubsub_nanomsg_admin.h                  | 110 +++-
 .../src/pubsub_nanomsg_topic_receiver.cc        |  21 +-
 .../src/pubsub_nanomsg_topic_sender.cc          |   2 -
 libs/framework/include/celix_bundle_activator.h |   2 +-
 8 files changed, 476 insertions(+), 725 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
deleted file mode 100644
index d7d88bb..0000000
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * zmq_crypto.c
- *
- *  \date       Dec 2, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include "nanomsg_crypto.h"
-
-#include <zmq.h>
-#include <openssl/conf.h>
-#include <openssl/evp.h>
-#include <openssl/err.h>
-
-#include <string.h>
-
-#define MAX_FILE_PATH_LENGTH 512
-#define ZMQ_KEY_LENGTH 40
-#define AES_KEY_LENGTH 32
-#define AES_IV_LENGTH 16
-
-#define KEY_TO_GET "aes_key"
-#define IV_TO_GET "aes_iv"
-
-static char* read_file_content(char* filePath, char* fileName);
-static void parse_key_lines(char *keysBuffer, char **key, char **iv);
-static void parse_key_line(char *line, char **key, char **iv);
-static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey);
-
-/**
- * Return a valid zcert_t from an encoded file
- * Caller is responsible for freeing by calling zcert_destroy(zcert** cert);
- */
-zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path)
-{
-
-	if (keysFilePath == NULL){
-		keysFilePath = DEFAULT_KEYS_FILE_PATH;
-	}
-
-	if (keysFileName == NULL){
-		keysFileName = DEFAULT_KEYS_FILE_NAME;
-	}
-
-	char* keys_data = read_file_content(keysFilePath, keysFileName);
-	if (keys_data == NULL){
-		return NULL;
-	}
-
-	char *key = NULL;
-	char *iv = NULL;
-	parse_key_lines(keys_data, &key, &iv);
-	free(keys_data);
-
-	if (key == NULL || iv == NULL){
-		free(key);
-		free(iv);
-
-		printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
-		return NULL;
-	}
-
-	//At this point, we know an aes key and iv are stored and loaded
-
-	// generate sha256 hashes
-	unsigned char key_digest[EVP_MAX_MD_SIZE];
-	unsigned char iv_digest[EVP_MAX_MD_SIZE];
-	generate_sha256_hash((char*) key, key_digest);
-	generate_sha256_hash((char*) iv, iv_digest);
-
-	zchunk_t* encoded_secret = zchunk_slurp (file_path, 0);
-	if (encoded_secret == NULL){
-		free(key);
-		free(iv);
-
-		return NULL;
-	}
-
-	int encoded_secret_size = (int) zchunk_size (encoded_secret);
-	char* encoded_secret_data = zchunk_strdup(encoded_secret);
-	zchunk_destroy (&encoded_secret);
-
-	// Decryption of data
-	int decryptedtext_len;
-	unsigned char decryptedtext[encoded_secret_size];
-	decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext);
-	decryptedtext[decryptedtext_len] = '\0';
-
-	EVP_cleanup();
-
-	free(encoded_secret_data);
-	free(key);
-	free(iv);
-
-	// The public and private keys are retrieved
-	char* public_text = NULL;
-	char* secret_text = NULL;
-
-	extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text);
-
-	byte public_key [32] = { 0 };
-	byte secret_key [32] = { 0 };
-
-	zmq_z85_decode (public_key, public_text);
-	zmq_z85_decode (secret_key, secret_text);
-
-	zcert_t* cert_loaded = zcert_new_from(public_key, secret_key);
-
-	free(public_text);
-	free(secret_text);
-
-	return cert_loaded;
-}
-
-int generate_sha256_hash(char* text, unsigned char* digest)
-{
-	unsigned int digest_len;
-
-	EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
-	EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
-	EVP_DigestUpdate(mdctx, text, strlen(text));
-	EVP_DigestFinal_ex(mdctx, digest, &digest_len);
-	EVP_MD_CTX_free(mdctx);
-
-	return digest_len;
-}
-
-int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
-{
-	int len;
-	int plaintext_len;
-
-	EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
-
-	EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
-	EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
-	plaintext_len = len;
-	EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
-	plaintext_len += len;
-
-	EVP_CIPHER_CTX_free(ctx);
-
-	return plaintext_len;
-}
-
-/**
- * Caller is responsible for freeing the returned value
- */
-static char* read_file_content(char* filePath, char* fileName){
-
-	char fileNameWithPath[MAX_FILE_PATH_LENGTH];
-	snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName);
-	int rc = 0;
-
-	if (!zsys_file_exists(fileNameWithPath)){
-		printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath);
-		return NULL;
-	}
-
-	zfile_t* keys_file = zfile_new (filePath, fileName);
-	rc = zfile_input (keys_file);
-	if (rc != 0){
-		zfile_destroy(&keys_file);
-		printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath);
-		return NULL;
-	}
-
-	ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
-	zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
-	if (keys_chunk == NULL){
-		zfile_close(keys_file);
-		zfile_destroy(&keys_file);
-		printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
-		return NULL;
-	}
-
-	char* keys_data = zchunk_strdup(keys_chunk);
-	zchunk_destroy(&keys_chunk);
-	zfile_close(keys_file);
-	zfile_destroy (&keys_file);
-
-	return keys_data;
-}
-
-static void parse_key_lines(char *keysBuffer, char **key, char **iv){
-	char *line = NULL, *saveLinePointer = NULL;
-
-	bool firstTime = true;
-	do {
-		if (firstTime){
-			line = strtok_r(keysBuffer, "\n", &saveLinePointer);
-			firstTime = false;
-		}else {
-			line = strtok_r(NULL, "\n", &saveLinePointer);
-		}
-
-		if (line == NULL){
-			break;
-		}
-
-		parse_key_line(line, key, iv);
-
-	} while((*key == NULL || *iv == NULL) && line != NULL);
-
-}
-
-static void parse_key_line(char *line, char **key, char **iv){
-	char *detectedKey = NULL, *detectedValue= NULL;
-
-	char* sep_at = strchr(line, ':');
-	if (sep_at == NULL){
-		return;
-	}
-
-	*sep_at = '\0'; // overwrite first separator, creating two strings.
-	detectedKey = line;
-	detectedValue = sep_at + 1;
-
-	if (detectedKey == NULL || detectedValue == NULL){
-		return;
-	}
-	if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
-		return;
-	}
-
-	if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
-		*key = strndup(detectedValue, AES_KEY_LENGTH);
-	} else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
-		*iv = strndup(detectedValue, AES_IV_LENGTH);
-	}
-}
-
-static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) {
-	// Load decrypted text buffer
-	zchunk_t* secret_decrypted = zchunk_new(input, inputlen);
-	if (secret_decrypted == NULL){
-		printf("CRYPTO: Failed to create zchunk\n");
-		return;
-	}
-
-	zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted);
-	zchunk_destroy (&secret_decrypted);
-	if (secret_config == NULL){
-		printf("CRYPTO: Failed to create zconfig\n");
-		return;
-	}
-
-	// Extract public and secret key from text buffer
-	char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL);
-	char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL);
-
-	if (public_text == NULL || secret_text == NULL){
-		zconfig_destroy(&secret_config);
-		printf("CRYPTO: Loading public / secret key from text-buffer failed!\n");
-		return;
-	}
-
-	*publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
-	*secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
-
-	zconfig_destroy(&secret_config);
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
deleted file mode 100644
index f1a990f..0000000
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * zmq_crypto.h
- *
- *  \date       Dec 2, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef ZMQ_CRYPTO_H_
-#define ZMQ_CRYPTO_H_
-
-#include <czmq.h>
-
-#define PROPERTY_KEYS_FILE_PATH "keys.file.path"
-#define PROPERTY_KEYS_FILE_NAME "keys.file.name"
-#define DEFAULT_KEYS_FILE_PATH "/etc/"
-#define DEFAULT_KEYS_FILE_NAME "pubsub.keys"
-
-zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path);
-int generate_sha256_hash(char* text, unsigned char* digest);
-int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext);
-
-#endif

http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
index 79ea1d4..ec3ee7d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
@@ -19,95 +19,104 @@
 
 
 #include <stdlib.h>
-
+#include <new>
+#include <iostream>
 #include "celix_api.h"
 #include "pubsub_serializer.h"
 #include "log_helper.h"
 
 #include "pubsub_admin.h"
 #include "pubsub_nanomsg_admin.h"
-#include "../../../shell/shell/include/command.h"
-
-typedef struct psa_nanomsg_activator {
-	log_helper_t *logHelper;
-
-	pubsub_nanomsg_admin_t *admin;
-
-	long serializersTrackerId;
-
-	pubsub_admin_service_t adminService;
-	long adminSvcId;
-
-	command_service_t cmdSvc;
-	long cmdSvcId;
-} psa_nanomsg_activator_t;
-
-int psa_nanomsg_start(psa_nanomsg_activator_t *act, celix_bundle_context_t *ctx) {
-	act->adminSvcId = -1L;
-	act->cmdSvcId = -1L;
-	act->serializersTrackerId = -1L;
-
-	logHelper_create(ctx, &act->logHelper);
-	logHelper_start(act->logHelper);
-
-	act->admin = pubsub_nanoMsgAdmin_create(ctx, act->logHelper);
-	celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
-
-	//track serializers
-	if (status == CELIX_SUCCESS) {
-		celix_service_tracking_options_t opts{};
-		opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
-		opts.filter.ignoreServiceLanguage = true;
-		opts.callbackHandle = act->admin;
-		opts.addWithProperties = pubsub_nanoMsgAdmin_addSerializerSvc;
-		opts.removeWithProperties = pubsub_nanoMsgAdmin_removeSerializerSvc;
-		act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-	}
-
-	//register pubsub admin service
-	if (status == CELIX_SUCCESS) {
-		pubsub_admin_service_t *psaSvc = &act->adminService;
-		psaSvc->handle = act->admin;
-		psaSvc->matchPublisher = pubsub_nanoMsgAdmin_matchPublisher;
-		psaSvc->matchSubscriber = pubsub_nanoMsgAdmin_matchSubscriber;
-		psaSvc->matchEndpoint = pubsub_nanoMsgAdmin_matchEndpoint;
-		psaSvc->setupTopicSender = pubsub_nanoMsgAdmin_setupTopicSender;
-		psaSvc->teardownTopicSender = pubsub_nanoMsgAdmin_teardownTopicSender;
-		psaSvc->setupTopicReceiver = pubsub_nanoMsgAdmin_setupTopicReceiver;
-		psaSvc->teardownTopicReceiver = pubsub_nanoMsgAdmin_teardownTopicReceiver;
-		psaSvc->addEndpoint = pubsub_nanoMsgAdmin_addEndpoint;
-		psaSvc->removeEndpoint = pubsub_nanoMsgAdmin_removeEndpoint;
-
-		celix_properties_t *props = celix_properties_create();
-		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE);
-
-		act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
-	}
-
-	//register shell command service
-	{
-		act->cmdSvc.handle = act->admin;
-		act->cmdSvc.executeCommand = pubsub_nanoMsgAdmin_executeCommand;
-		celix_properties_t *props = celix_properties_create();
-		celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
-		celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
-		celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
-		act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
-	}
-
-	return status;
-}
 
-int psa_nanomsg_stop(psa_nanomsg_activator_t *act, celix_bundle_context_t *ctx) {
-	celix_bundleContext_unregisterService(ctx, act->adminSvcId);
-	celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
-	celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
-	pubsub_nanoMsgAdmin_destroy(act->admin);
+class LogHelper {
+public:
+    LogHelper(celix_bundle_context_t *ctx) : context{ctx} {
+        if (logHelper_create(context, &logHelper)!= CELIX_SUCCESS) {
+            std::bad_alloc{};
+        }
+
+    }
+    ~LogHelper() {
+        logHelper_destroy(&logHelper);
+    }
+
+    LogHelper(const LogHelper &) = delete;
+    LogHelper & operator=(const LogHelper&) = delete;
+    celix_status_t start () {
+        return logHelper_start(logHelper);
+    }
+
+    celix_status_t stop () {
+        return logHelper_stop(logHelper);
+    }
+
+    log_helper_t *get() {
+        return logHelper;
+    }
+private:
+    celix_bundle_context_t *context;
+    log_helper_t *logHelper{};
+
+};
+
+class psa_nanomsg_activator {
+public:
+    psa_nanomsg_activator(celix_bundle_context_t *ctx) : context{ctx}, logHelper{context}, admin(context, logHelper.get()) {
+    }
+    psa_nanomsg_activator(const psa_nanomsg_activator&) = delete;
+    psa_nanomsg_activator& operator=(const psa_nanomsg_activator&) = delete;
+
+    ~psa_nanomsg_activator() {
+
+    }
+
+    celix_status_t  start() {
+        admin.start();
+        auto status = logHelper.start();
+
+        return status;
+    }
+
+    celix_status_t stop() {
+        admin.stop();
+        return logHelper.stop();
+    };
+
+private:
+    celix_bundle_context_t *context{};
+    LogHelper logHelper;
+	pubsub_nanomsg_admin admin;
+
+
+//    command_service_t cmdSvc{};
+
+//	long cmdSvcId = -1L;
+};
+
+celix_status_t  celix_bundleActivator_create(celix_bundle_context_t *ctx , void **userData) {
+    celix_status_t status = CELIX_SUCCESS;
+    auto data = new  (std::nothrow) psa_nanomsg_activator{ctx};
+    if (data != NULL) {
+        *userData = data;
+    } else {
+        status = CELIX_ENOMEM;
+    }
+    return status;
+}
 
-	logHelper_stop(act->logHelper);
-	logHelper_destroy(&act->logHelper);
+celix_status_t celix_bundleActivator_start(void *userData, celix_bundle_context_t *) {
+    auto act = static_cast<psa_nanomsg_activator*>(userData);
+    return act->start();
+}
 
-	return CELIX_SUCCESS;
+celix_status_t celix_bundleActivator_stop(void *userData, celix_bundle_context_t *) {
+    auto act = static_cast<psa_nanomsg_activator*>(userData);
+    return act->stop();
 }
 
-CELIX_GEN_BUNDLE_ACTIVATOR(psa_nanomsg_activator_t, psa_nanomsg_start, psa_nanomsg_stop);
+
+celix_status_t celix_bundleActivator_destroy(void *userData, celix_bundle_context_t *) {
+    auto act = static_cast<psa_nanomsg_activator*>(userData);
+    delete act;
+    return CELIX_SUCCESS;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index f3e6831..bd1d0a5 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -19,7 +19,9 @@
 
 #include <string>
 #include <vector>
+#include <functional>
 #include <memory.h>
+#include <iostream>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
@@ -48,43 +50,6 @@
 #define L_WARN printf
 #define L_ERROR printf
 
-struct pubsub_nanomsg_admin {
-    celix_bundle_context_t *ctx;
-    log_helper_t *log;
-    const char *fwUUID;
-
-    char* ipAddress;
-
-    unsigned int basePort;
-    unsigned int maxPort;
-
-    double qosSampleScore;
-    double qosControlScore;
-    double defaultScore;
-
-    bool verbose;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
-    } serializers;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
-    } topicSenders;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
-    } topicReceivers;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
-    } discoveredEndpoints;
-
-};
 
 typedef struct psa_nanomsg_serializer_entry {
     const char *serType;
@@ -93,164 +58,221 @@ typedef struct psa_nanomsg_serializer_entry {
 } psa_nanomsg_serializer_entry_t;
 
 static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
-static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanomsg_admin_t *psa,
-                                                                    pubsub_nanomsg_topic_receiver_t *receiver,
-                                                                    const celix_properties_t *endpoint);
-static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_nanomsg_admin_t *psa,
-                                                                         pubsub_nanomsg_topic_receiver_t *receiver,
-                                                                         const celix_properties_t *endpoint);
-
-
-pubsub_nanomsg_admin_t* pubsub_nanoMsgAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(calloc(1, sizeof(*psa)));
-    psa->ctx = ctx;
-    psa->log = logHelper;
-    psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
-    psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
-
-    char *ip = NULL;
-    const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , NULL);
-    if (confIp != NULL) {
+
+pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_helper_t *logHelper):
+    ctx{_ctx},
+    log{logHelper} {
+    verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
+    fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, nullptr);
+
+    char *ip = nullptr;
+    const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , nullptr);
+    if (confIp != nullptr) {
         ip = strndup(confIp, 1024);
     }
 
-    if (ip == NULL) {
+    if (ip == nullptr) {
         //TODO try to get ip from subnet (CIDR)
     }
 
-    if (ip == NULL) {
+    if (ip == nullptr) {
         //try to get ip from itf
-        const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, NULL);
+        const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, nullptr);
         nanoMsg_getIpAddress(interface, &ip);
     }
 
-    if (ip == NULL) {
+    if (ip == nullptr) {
         L_WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using default ip (%s)", PUBSUB_NANOMSG_DEFAULT_IP);
         ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024);
     }
 
-    psa->ipAddress = ip;
-    if (psa->verbose) {
+    ipAddress = ip;
+    if (verbose) {
         L_INFO("[PSA_NANOMSG] Using %s for service annunciation", ip);
     }
 
 
-    long basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT);
-    long maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT);
-    psa->basePort = (unsigned int)basePort;
-    psa->maxPort = (unsigned int)maxPort;
-    if (psa->verbose) {
-        L_INFO("[PSA_NANOMSG] Using base till max port: %i till %i", psa->basePort, psa->maxPort);
+    long _basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT);
+    long _maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT);
+    basePort = (unsigned int)_basePort;
+    maxPort = (unsigned int)_maxPort;
+    if (verbose) {
+        L_INFO("[PSA_NANOMSG] Using base till max port: %li till %li", _basePort, _maxPort);
     }
 
 
-//    long nrThreads = celix_bundleContext_getPropertyAsLong(ctx, PUBSUB_NANOMSG_NR_THREADS_KEY, 0);
-//    if (nrThreads > 0) {
-//        zsys_set_io_threads((size_t)nrThreads);
-//        L_INFO("[PSA_NANOMSG] Using %d threads for NanoMsg", (size_t)nrThreads);
-//    }
-
-
-    psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
-    psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
-    psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
+    defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
+    qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
+    qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadMutex_create(&psa->serializers.mutex, NULL);
-    psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+    celixThreadMutex_create(&serializers.mutex, nullptr);
+    serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
 
-    celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
-    psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    celixThreadMutex_create(&topicSenders.mutex, nullptr);
+    topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 
-    celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
-    psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    celixThreadMutex_create(&topicReceivers.mutex, nullptr);
+    topicReceivers.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 
-    celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
-    psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
-    return psa;
+    celixThreadMutex_create(&discoveredEndpoints.mutex, nullptr);
+    discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 }
 
-void pubsub_nanoMsgAdmin_destroy(pubsub_nanomsg_admin_t *psa) {
-    if (psa == NULL) {
-        return;
-    }
-
+pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
     //note assuming al psa register services and service tracker are removed.
 
-    celixThreadMutex_lock(&psa->topicSenders.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    celixThreadMutex_lock(&topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
+        auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
         pubsub_nanoMsgTopicSender_destroy(sender);
     }
-    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&topicSenders.mutex);
 
-    celixThreadMutex_lock(&psa->topicReceivers.mutex);
-    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    celixThreadMutex_lock(&topicReceivers.mutex);
+    iter = hashMapIterator_construct(topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_nanomsg_topic_receiver_t *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+        auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
         pubsub_nanoMsgTopicReceiver_destroy(recv);
     }
-    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&topicReceivers.mutex);
 
-    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
-    iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+    celixThreadMutex_lock(&discoveredEndpoints.mutex);
+    iter = hashMapIterator_construct(discoveredEndpoints.map);
     while (hashMapIterator_hasNext(&iter)) {
-        celix_properties_t *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+        auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
         celix_properties_destroy(ep);
     }
-    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+    celixThreadMutex_unlock(&discoveredEndpoints.mutex);
 
-    celixThreadMutex_lock(&psa->serializers.mutex);
-    iter = hashMapIterator_construct(psa->serializers.map);
+    celixThreadMutex_lock(&serializers.mutex);
+    iter = hashMapIterator_construct(serializers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
+        auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
         free(entry);
     }
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&serializers.mutex);
 
-    celixThreadMutex_destroy(&psa->topicSenders.mutex);
-    hashMap_destroy(psa->topicSenders.map, true, false);
+    celixThreadMutex_destroy(&topicSenders.mutex);
+    hashMap_destroy(topicSenders.map, true, false);
 
-    celixThreadMutex_destroy(&psa->topicReceivers.mutex);
-    hashMap_destroy(psa->topicReceivers.map, true, false);
+    celixThreadMutex_destroy(&topicReceivers.mutex);
+    hashMap_destroy(topicReceivers.map, true, false);
 
-    celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
-    hashMap_destroy(psa->discoveredEndpoints.map, false, false);
+    celixThreadMutex_destroy(&discoveredEndpoints.mutex);
+    hashMap_destroy(discoveredEndpoints.map, false, false);
 
-    celixThreadMutex_destroy(&psa->serializers.mutex);
-    hashMap_destroy(psa->serializers.map, false, false);
+    celixThreadMutex_destroy(&serializers.mutex);
+    hashMap_destroy(serializers.map, false, false);
 
-    free(psa->ipAddress);
+    free(ipAddress);
 
-    free(psa);
 }
 
-void pubsub_nanoMsgAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+void pubsub_nanomsg_admin::start() {
+    adminService.handle = this;
+    adminService.matchPublisher = [](void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->matchPublisher(svcRequesterBndId, svcFilter,score, serializerSvcId);
+    };
+    adminService.matchSubscriber = [](void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->matchSubscriber(svcProviderBndId, svcProperties, score, serializerSvcId);
+    };
+    adminService.matchEndpoint = [](void *handle, const celix_properties_t *endpoint, bool *match) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->matchEndpoint(endpoint, match);
+    };
+    adminService.setupTopicSender = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->setupTopicSender(scope, topic, serializerSvcId, publisherEndpoint);
+    };
+    adminService.teardownTopicSender = [](void *handle, const char *scope, const char *topic) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->teardownTopicSender(scope, topic);
+    };
+    adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->setupTopicReceiver(scope, topic,serializerSvcId, subscriberEndpoint);
+    };
+
+    adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->teardownTopicReceiver(scope, topic);
+    };
+    adminService.addEndpoint = [](void *handle, const celix_properties_t *endpoint) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->addEndpoint(endpoint);
+    };
+    adminService.removeEndpoint = [](void *handle, const celix_properties_t *endpoint) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->removeEndpoint(endpoint);
+    };
+
+    celix_properties_t *props = celix_properties_create();
+    celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE);
+
+    adminSvcId = celix_bundleContext_registerService(ctx, static_cast<void*>(&adminService), PUBSUB_ADMIN_SERVICE_NAME, props);
+
+
+    celix_service_tracking_options_t opts{};
+    opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+    opts.filter.ignoreServiceLanguage = true;
+    opts.callbackHandle = this;
+    opts.addWithProperties = [](void *handle, void *svc, const celix_properties_t *props) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        me->addSerializerSvc(svc, props);
+    };
+    opts.removeWithProperties = [](void *handle, void *svc, const celix_properties_t *props) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        me->removeSerializerSvc(svc, props);
+    };
+    serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+    //register shell command service
+    cmdSvc.handle = this;
+    cmdSvc.executeCommand = [](void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
+        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+        return me->executeCommand(commandLine, outStream, errorStream);
+    };
+
+    celix_properties_t* shellProps = celix_properties_create();
+    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
+    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
+    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
+    cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps);
+
+}
 
-    const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+void pubsub_nanomsg_admin::stop() {
+    celix_bundleContext_unregisterService(ctx, adminSvcId);
+    celix_bundleContext_unregisterService(ctx, cmdSvcId);
+    celix_bundleContext_stopTracker(ctx, serializersTrackerId);
+}
+
+void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t *props) {
+    const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, nullptr);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
 
-    if (serType == NULL) {
+    if (serType == nullptr) {
         L_INFO("[PSA_NANOMSG] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
         return;
     }
 
-    celixThreadMutex_lock(&psa->serializers.mutex);
-    psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)svcId));
-    if (entry == NULL) {
+    celixThreadMutex_lock(&serializers.mutex);
+    auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId));
+    if (entry == nullptr) {
         entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
         entry->serType = serType;
         entry->svcId = svcId;
         entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
-        hashMap_put(psa->serializers.map, (void*)svcId, entry);
+        hashMap_put(serializers.map, (void*)svcId, entry);
     }
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&serializers.mutex);
 }
 
-void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void */*svc*/, const celix_properties_t *props) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_properties_t *props) {
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
 
     //remove serializer
@@ -259,82 +281,78 @@ void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void */*svc*/, const
     // 3) loop and destroy all topic receivers using the serializer
     // Note that it is the responsibility of the topology manager to create new topic senders/receivers
 
-    celixThreadMutex_lock(&psa->serializers.mutex);
-    psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(psa->serializers.map, (void*)svcId));
-    if (entry != NULL) {
-        celixThreadMutex_lock(&psa->topicSenders.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    celixThreadMutex_lock(&serializers.mutex);
+    auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(serializers.map, (void*)svcId));
+    if (entry != nullptr) {
+        celixThreadMutex_lock(&topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
         while (hashMapIterator_hasNext(&iter)) {
             hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-            pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry));
-            if (sender != NULL && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+            auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry));
+            if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
                 char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
                 hashMapIterator_remove(&iter);
                 pubsub_nanoMsgTopicSender_destroy(sender);
                 free(key);
             }
         }
-        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+        celixThreadMutex_unlock(&topicSenders.mutex);
 
-        celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        iter = hashMapIterator_construct(psa->topicReceivers.map);
+        celixThreadMutex_lock(&topicReceivers.mutex);
+        iter = hashMapIterator_construct(topicReceivers.map);
         while (hashMapIterator_hasNext(&iter)) {
             hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-            pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
-            if (receiver != NULL && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+            auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+            if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
                 char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
                 hashMapIterator_remove(&iter);
                 pubsub_nanoMsgTopicReceiver_destroy(receiver);
                 free(key);
             }
         }
-        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+        celixThreadMutex_unlock(&topicReceivers.mutex);
 
         free(entry);
     }
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&serializers.mutex);
 }
 
-celix_status_t pubsub_nanoMsgAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
+celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter,
                                                   double *outScore, long *outSerializerSvcId) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
     L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
-    double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
-                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+    double score = pubsub_utils_matchPublisher(ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
+            qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId);
     *outScore = score;
 
     return status;
 }
 
-celix_status_t pubsub_nanoMsgAdmin_matchSubscriber(void *handle, long svcProviderBndId,
+celix_status_t pubsub_nanomsg_admin::matchSubscriber(long svcProviderBndId,
                                                    const celix_properties_t *svcProperties, double *outScore,
                                                    long *outSerializerSvcId) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
     L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
-    double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
-            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
-    if (outScore != NULL) {
+    double score = pubsub_utils_matchSubscriber(ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
+            qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId);
+    if (outScore != nullptr) {
         *outScore = score;
     }
     return status;
 }
 
-celix_status_t pubsub_nanoMsgAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t *endpoint, bool *outMatch) {
     L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
-    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, NULL);
-    if (outMatch != NULL) {
+    bool match = pubsub_utils_matchEndpoint(ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, nullptr);
+    if (outMatch != nullptr) {
         *outMatch = match;
     }
     return status;
 }
 
-celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
+celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic,
                                                     long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
     celix_status_t  status = CELIX_SUCCESS;
 
     //1) Create TopicSender
@@ -342,31 +360,31 @@ celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *sc
     //3) Connect existing endpoints
     //4) set outPublisherEndpoint
 
-    celix_properties_t *newEndpoint = NULL;
+    celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
-    celixThreadMutex_lock(&psa->serializers.mutex);
-    celixThreadMutex_lock(&psa->topicSenders.mutex);
-    pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(psa->topicSenders.map, key));
-    if (sender == NULL) {
-        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serializerSvcId));
-        if (serEntry != NULL) {
-            sender = pubsub_nanoMsgTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
-                                                      psa->ipAddress, psa->basePort, psa->maxPort);
+    celixThreadMutex_lock(&serializers.mutex);
+    celixThreadMutex_lock(&topicSenders.mutex);
+    auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(topicSenders.map, key));
+    if (sender == nullptr) {
+        auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId));
+        if (serEntry != nullptr) {
+            sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc,
+                                                      ipAddress, basePort, maxPort);
         }
-        if (sender != NULL) {
+        if (sender != nullptr) {
             const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
             const char *serType = serEntry->serType;
-            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
-                                                serType, NULL);
+            newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+                                                serType, nullptr);
             celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
             //if available also set container name
-            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
-            if (cn != NULL) {
+            const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
+            if (cn != nullptr) {
                 celix_properties_set(newEndpoint, "container_name", cn);
             }
-            hashMap_put(psa->topicSenders.map, key, sender);
+            hashMap_put(topicSenders.map, key, sender);
         } else {
             L_ERROR("[PSA NANOMSG] Error creating a TopicSender");
             free(key);
@@ -375,74 +393,72 @@ celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *sc
         free(key);
         L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
     }
-    celixThreadMutex_unlock(&psa->topicSenders.mutex);
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&topicSenders.mutex);
+    celixThreadMutex_unlock(&serializers.mutex);
 
-    if (sender != NULL && newEndpoint != NULL) {
+    if (sender != nullptr && newEndpoint != nullptr) {
         //TODO connect endpoints to sender, NOTE is this needed for a nanomsg topic sender?
     }
 
-    if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+    if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) {
         *outPublisherEndpoint = newEndpoint;
     }
 
     return status;
 }
 
-celix_status_t pubsub_nanoMsgAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, const char *topic) {
     celix_status_t  status = CELIX_SUCCESS;
 
     //1) Find and remove TopicSender from map
     //2) destroy topic sender
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&psa->topicSenders.mutex);
-    hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
-    if (entry != NULL) {
+    celixThreadMutex_lock(&topicSenders.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key);
+    if (entry != nullptr) {
         char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
-        pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(psa->topicSenders.map, key));
+        pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(topicSenders.map, key));
         free(mapKey);
         //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
         pubsub_nanoMsgTopicSender_destroy(sender);
     } else {
         L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
     }
-    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&topicSenders.mutex);
     free(key);
 
     return status;
 }
 
-celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
+celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const char *topic,
                                                       long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
 
-    celix_properties_t *newEndpoint = NULL;
+    celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&psa->serializers.mutex);
-    celixThreadMutex_lock(&psa->topicReceivers.mutex);
-    pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(psa->topicReceivers.map, key));
-    if (receiver == NULL) {
-        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serializerSvcId));
-        if (serEntry != NULL) {
-            receiver = pubsub_nanoMsgTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId,
+    celixThreadMutex_lock(&serializers.mutex);
+    celixThreadMutex_lock(&topicReceivers.mutex);
+    auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(topicReceivers.map, key));
+    if (receiver == nullptr) {
+        auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId));
+        if (serEntry != nullptr) {
+            receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId,
                                                           serEntry->svc);
         } else {
             L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
         }
-        if (receiver != NULL) {
+        if (receiver != nullptr) {
             const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
             const char *serType = serEntry->serType;
-            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+            newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic,
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, nullptr);
             //if available also set container name
-            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
-            if (cn != NULL) {
+            const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
+            if (cn != nullptr) {
                 celix_properties_set(newEndpoint, "container_name", cn);
             }
-            hashMap_put(psa->topicReceivers.map, key, receiver);
+            hashMap_put(topicReceivers.map, key, receiver);
         } else {
             L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
             free(key);
@@ -451,23 +467,23 @@ celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *
         free(key);
         L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
     }
-    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&topicReceivers.mutex);
+    celixThreadMutex_unlock(&serializers.mutex);
 
-    if (receiver != NULL && newEndpoint != NULL) {
-        celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+    if (receiver != nullptr && newEndpoint != nullptr) {
+        celixThreadMutex_lock(&discoveredEndpoints.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
-            celix_properties_t *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
-            const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-                pubsub_nanoMsgAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            auto *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+            const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
+            if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+                connectEndpointToReceiver(receiver, endpoint);
             }
         }
-        celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+        celixThreadMutex_unlock(&discoveredEndpoints.mutex);
     }
 
-    if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+    if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) {
         *outSubscriberEndpoint = newEndpoint;
     }
 
@@ -475,29 +491,26 @@ celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *
     return status;
 }
 
-celix_status_t pubsub_nanoMsgAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
-
+celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) {
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&psa->topicReceivers.mutex);
-    hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+    celixThreadMutex_lock(&topicReceivers.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key);
     free(key);
-    if (entry != NULL) {
+    if (entry != nullptr) {
         char *receiverKey = static_cast<char*>(hashMapEntry_getKey(entry));
         pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(entry));
-        hashMap_remove(psa->topicReceivers.map, receiverKey);
+        hashMap_remove(topicReceivers.map, receiverKey);
 
         free(receiverKey);
         pubsub_nanoMsgTopicReceiver_destroy(receiver);
     }
-    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    celixThreadMutex_lock(&topicReceivers.mutex);
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
 }
 
-static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanomsg_admin_t * /*psa*/,
-                                                                    pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
                                                                     const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
@@ -505,17 +518,17 @@ static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanom
     const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
     const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
 
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, NULL);
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
 
-    if (url == NULL) {
-//        const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
-//        const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+    if (url == nullptr) {
+//        const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, nullptr);
+//        const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 //        L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type);
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eScope != NULL && eTopic != NULL &&
+        if (eScope != nullptr && eTopic != nullptr &&
             strncmp(eScope, scope, 1024 * 1024) == 0 &&
             strncmp(eTopic, topic, 1024 * 1024) == 0) {
             pubsub_nanoMsgTopicReceiver_connectTo(receiver, url);
@@ -525,50 +538,47 @@ static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanom
     return status;
 }
 
-celix_status_t pubsub_nanoMsgAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
-
-    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpoint) {
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 
-    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-        celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+    if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
         while (hashMapIterator_hasNext(&iter)) {
             pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
-            pubsub_nanoMsgAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            connectEndpointToReceiver(receiver, endpoint);
         }
-        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+        celixThreadMutex_unlock(&topicReceivers.mutex);
     }
 
-    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    celixThreadMutex_lock(&discoveredEndpoints.mutex);
     celix_properties_t *cpy = celix_properties_copy(endpoint);
-    const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
-    hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
-    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+    const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr);
+    hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy);
+    celixThreadMutex_unlock(&discoveredEndpoints.mutex);
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
 }
 
 
-static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_nanomsg_admin_t * /*psa*/,
-                                                                         pubsub_nanomsg_topic_receiver_t *receiver,
-                                                                         const celix_properties_t *endpoint) {
+celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+                                                                            const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
     const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
     const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
 
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, NULL);
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
 
-    if (url == NULL) {
+    if (url == nullptr) {
         L_WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eScope != NULL && eTopic != NULL &&
+        if (eScope != nullptr && eTopic != nullptr &&
             strncmp(eScope, scope, 1024 * 1024) == 0 &&
             strncmp(eTopic, topic, 1024 * 1024) == 0) {
             pubsub_nanoMsgTopicReceiver_disconnectFrom(receiver, url);
@@ -578,27 +588,25 @@ static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_
     return status;
 }
 
-celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
-
-    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *endpoint) {
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 
-    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-        celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+    if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
         while (hashMapIterator_hasNext(&iter)) {
             pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
-            pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+            disconnectEndpointFromReceiver(receiver, endpoint);
         }
-        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+        celixThreadMutex_unlock(&topicReceivers.mutex);
     }
 
-    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
-    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
-    celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid));
-    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+    celixThreadMutex_lock(&discoveredEndpoints.mutex);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
+    celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid));
+    celixThreadMutex_unlock(&discoveredEndpoints.mutex);
 
-    if (found != NULL) {
+    if (found != nullptr) {
         celix_properties_destroy(found);
     }
 
@@ -606,21 +614,20 @@ celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_prop
     return status;
 }
 
-celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out,
+celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribute__((unused)), FILE *out,
                                                   FILE *errStream __attribute__((unused))) {
-    pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
     celix_status_t  status = CELIX_SUCCESS;
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
-    celixThreadMutex_lock(&psa->serializers.mutex);
-    celixThreadMutex_lock(&psa->topicSenders.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    celixThreadMutex_lock(&serializers.mutex);
+    celixThreadMutex_lock(&topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
         long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
-        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serSvcId));
-        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId));
+        const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
         const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
         const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
         const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -628,19 +635,19 @@ celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLin
         fprintf(out, "   |- serializer type = %s\n", serType);
         fprintf(out, "   |- url             = %s\n", url);
     }
-    celixThreadMutex_unlock(&psa->topicSenders.mutex);
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&topicSenders.mutex);
+    celixThreadMutex_unlock(&serializers.mutex);
 
     fprintf(out, "\n");
     fprintf(out, "\nTopic Receivers:\n");
-    celixThreadMutex_lock(&psa->serializers.mutex);
-    celixThreadMutex_lock(&psa->topicReceivers.mutex);
-    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    celixThreadMutex_lock(&serializers.mutex);
+    celixThreadMutex_lock(&topicReceivers.mutex);
+    iter = hashMapIterator_construct(topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
         long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
-        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serSvcId));
-        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId));
+        const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
         const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
         const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
 
@@ -657,8 +664,8 @@ celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLin
             fprintf(out, "   |- unconnected url = %s\n", url.c_str());
         }
     }
-    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&topicReceivers.mutex);
+    celixThreadMutex_unlock(&serializers.mutex);
     fprintf(out, "\n");
 
     return status;
@@ -673,13 +680,13 @@ static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip) {
 
     if (getifaddrs(&ifaddr) != -1)
     {
-        for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+        for (ifa = ifaddr; ifa != nullptr && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
         {
-            if (ifa->ifa_addr == NULL)
+            if (ifa->ifa_addr == nullptr)
                 continue;
 
-            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
-                if (interface == NULL) {
+            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+                if (interface == nullptr) {
                     *ip = strdup(host);
                     status = CELIX_SUCCESS;
                 }

http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 195ccb7..385b400 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -20,8 +20,11 @@
 #ifndef CELIX_PUBSUB_ZMQ_ADMIN_H
 #define CELIX_PUBSUB_ZMQ_ADMIN_H
 
+#include <pubsub_admin.h>
 #include "celix_api.h"
 #include "log_helper.h"
+#include "pubsub_nanomsg_topic_receiver.h"
+#include "../../../shell/shell/include/command.h"
 
 #define PUBSUB_NANOMSG_ADMIN_TYPE       "zmq"
 #define PUBSUB_NANOMSG_URL_KEY          "zmq.url"
@@ -31,38 +34,95 @@
 
 #define PUBSUB_NANOMSG_PSA_IP_KEY       "PSA_IP"
 #define PUBSUB_NANOMSG_PSA_ITF_KEY		"PSA_INTERFACE"
-#define PUBSUB_NANOMSG_NR_THREADS_KEY   "PSA_ZMQ_NR_THREADS"
 
 #define PUBSUB_NANOMSG_DEFAULT_IP       "127.0.0.1"
 
-typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
-
-pubsub_nanomsg_admin_t* pubsub_nanoMsgAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
-void pubsub_nanoMsgAdmin_destroy(pubsub_nanomsg_admin_t *psa);
+//typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
+class pubsub_nanomsg_admin {
+public:
+    pubsub_nanomsg_admin(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+    pubsub_nanomsg_admin(const pubsub_nanomsg_admin&) = delete;
+    pubsub_nanomsg_admin& operator=(const pubsub_nanomsg_admin&) = delete;
+    ~pubsub_nanomsg_admin();
+    void start();
+    void stop();
+
+private:
+    void addSerializerSvc(void *svc, const celix_properties_t *props);
+    void removeSerializerSvc(void */*svc*/, const celix_properties_t *props);
+    celix_status_t matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter,
+                                                      double *score, long *serializerSvcId);
+    celix_status_t matchSubscriber(long svcProviderBndId,
+                                                       const celix_properties_t *svcProperties, double *score,
+                                                       long *serializerSvcId);
+    celix_status_t matchEndpoint(const celix_properties_t *endpoint, bool *match);
+
+    celix_status_t setupTopicSender(const char *scope, const char *topic,
+                                                        long serializerSvcId, celix_properties_t **publisherEndpoint);
+    celix_status_t teardownTopicSender(const char *scope, const char *topic);
+
+    celix_status_t setupTopicReceiver(const char *scope, const char *topic,
+                                                          long serializerSvcId, celix_properties_t **subscriberEndpoint);
+    celix_status_t teardownTopicReceiver(const char *scope, const char *topic);
+
+    celix_status_t addEndpoint(const celix_properties_t *endpoint);
+    celix_status_t removeEndpoint(const celix_properties_t *endpoint);
+
+    celix_status_t executeCommand(char *commandLine __attribute__((unused)), FILE *out,
+                                                        FILE *errStream __attribute__((unused)));
+
+    celix_status_t connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+                                                                   const celix_properties_t *endpoint);
+
+    celix_status_t disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+                                                                        const celix_properties_t *endpoint);
+    celix_bundle_context_t *ctx;
+    log_helper_t *log;
+    pubsub_admin_service_t adminService{};
+    long adminSvcId = -1L;
+    long cmdSvcId = -1L;
+    command_service_t cmdSvc{};
+    long serializersTrackerId = -1L;
+
+    const char *fwUUID{};
+
+    char* ipAddress{};
+
+    unsigned int basePort{};
+    unsigned int maxPort{};
+
+    double qosSampleScore{};
+    double qosControlScore{};
+    double defaultScore{};
+
+    bool verbose{};
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
+    } serializers{};
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
+    } topicSenders{};
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
+    } topicReceivers{};
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
+    } discoveredEndpoints{};
+
+};
 
 #ifdef __cplusplus
 extern "C" {
 #endif
-celix_status_t pubsub_nanoMsgAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
-                                                  double *score, long *serializerSvcId);
-                                                  celix_status_t pubsub_nanoMsgAdmin_matchSubscriber(void *handle, long svcProviderBndId,
-                                                   const celix_properties_t *svcProperties, double *score,
-                                                   long *serializerSvcId);
-celix_status_t pubsub_nanoMsgAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-
-celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
-                                                    long serializerSvcId, celix_properties_t **publisherEndpoint);
-celix_status_t pubsub_nanoMsgAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-
-celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
-                                                      long serializerSvcId, celix_properties_t **subscriberEndpoint);
-celix_status_t pubsub_nanoMsgAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
-
-celix_status_t pubsub_nanoMsgAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
-
-void pubsub_nanoMsgAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
 #ifdef __cplusplus
 }
 #endif

http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 646a80e..42f6423 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -373,6 +373,10 @@ static inline void processMsg(pubsub_nanomsg_topic_receiver_t *receiver, const p
     }
 }
 
+struct Message {
+    pubsub_nanmosg_msg_header_t header;
+    char payload[];
+};
 static void* psa_nanomsg_recvThread(void *data) {
     pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(data);
     bool running{};
@@ -381,30 +385,25 @@ static void* psa_nanomsg_recvThread(void *data) {
         running = receiver->recvThread.running;
     }
     while (running) {
-        void * payload = nullptr;
+        Message *msg = nullptr;
         nn_iovec iov[2];
-        iov[0].iov_base = &payload;
+        iov[0].iov_base = &msg;
         iov[0].iov_len = NN_MSG;
 
         nn_msghdr msgHdr;
         memset(&msgHdr, 0, sizeof(msgHdr));
 
         msgHdr.msg_iov = iov;
-        msgHdr.msg_iovlen = 1;//2;
+        msgHdr.msg_iovlen = 1;
 
         msgHdr.msg_control = nullptr;
         msgHdr.msg_controllen = 0;
 
         errno = 0;
         int recvBytes = nn_recvmsg(receiver->nanoMsgSocket, &msgHdr, 0);
-        if (payload && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
-            pubsub_nanmosg_msg_header_t *header = static_cast<pubsub_nanmosg_msg_header_t*>(payload);
-            void* msg = ((char*)payload) + sizeof(*header);
-            printf("HEADER: Type %d, major %d, minor %d\n", header->type , (int)header->major, (int)header->minor);
-            fprintf(stderr, "RECEIVED %d bytes\n", recvBytes);
-            processMsg(receiver, header, (char*)msg, recvBytes-sizeof(header));
-
-            nn_freemsg(payload);
+        if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
+            processMsg(receiver, &msg->header, msg->payload, recvBytes-sizeof(msg->header));
+            nn_freemsg(msg);
         } else if (recvBytes >= 0) {
             L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
         } else if (errno == EAGAIN || errno == ETIMEDOUT) {

http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index feead76..ff0d4f7 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -122,7 +122,6 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
         while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
             /* Randomized part due to same bundle publishing on different topics */
             unsigned int port = rand_range(basePort,maxPort);
-
             size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
             char *url = static_cast<char*>(calloc(len, sizeof(char*)));
             snprintf(url, len, "tcp://%s:%u", bindIP, port);
@@ -130,7 +129,6 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
             len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
             char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
             snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
-
             rv = nn_bind (socket, bindUrl);
             if (rv == -1) {
                 perror("Error for nn_bind");

http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/libs/framework/include/celix_bundle_activator.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_bundle_activator.h b/libs/framework/include/celix_bundle_activator.h
index 75e34d2..9bb2f6a 100644
--- a/libs/framework/include/celix_bundle_activator.h
+++ b/libs/framework/include/celix_bundle_activator.h
@@ -115,7 +115,7 @@ celix_status_t celix_bundleActivator_destroy(void *userData, celix_bundle_contex
 #define CELIX_GEN_BUNDLE_ACTIVATOR(actType, actStart, actStop)                                                         \
 celix_status_t celix_bundleActivator_create(celix_bundle_context_t *ctx __attribute__((unused)), void **userData) {    \
     celix_status_t status = CELIX_SUCCESS;                                                                             \
-    actType *data = (actType*)calloc(1, sizeof(*data));                                                                          \
+    actType *data = (actType*)calloc(1, sizeof(*data));                                                                \
     if (data != NULL) {                                                                                                \
         *userData = data;                                                                                              \
     } else {                                                                                                           \