You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:35 UTC

[18/34] celix git commit: CELIX-454: Some bug and mem leak fixes for the PSA refactoring

CELIX-454: Some bug and mem leak fixes for the PSA refactoring


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b635e274
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b635e274
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b635e274

Branch: refs/heads/develop
Commit: b635e27490fedea9197e89121407f93644b46df5
Parents: f1dfdbd
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Thu Sep 27 21:20:22 2018 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Thu Sep 27 21:20:22 2018 +0200

----------------------------------------------------------------------
 .../pubsub_admin_udp_mc/src/psa_activator.c     |  16 ++
 .../src/pubsub_udpmc_admin.c                    | 269 +++++++++++++------
 .../src/pubsub_udpmc_admin.h                    |   3 +
 .../src/pubsub_udpmc_topic_receiver.c           | 111 ++------
 .../src/pubsub_udpmc_topic_receiver.h           |  24 +-
 .../src/pubsub_udpmc_topic_sender.c             | 122 +++------
 .../src/pubsub_udpmc_topic_sender.h             |  24 +-
 .../pubsub/pubsub_admin_zmq/src/psa_activator.c |  15 ++
 .../pubsub_admin_zmq/src/pubsub_zmq_admin.c     | 189 ++++++++++---
 .../pubsub_admin_zmq/src/pubsub_zmq_admin.h     |   3 +
 .../src/pubsub_zmq_topic_receiver.c             | 158 +++--------
 .../src/pubsub_zmq_topic_receiver.h             |  21 +-
 .../src/pubsub_zmq_topic_sender.c               | 114 +++-----
 .../src/pubsub_zmq_topic_sender.h               |   5 +-
 .../src/pubsub_topology_manager.c               | 111 ++++----
 .../src/pubsub_topology_manager.h               |   5 -
 16 files changed, 602 insertions(+), 588 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index 682efc5..ebe39db 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -33,6 +33,8 @@ typedef struct psa_udpmc_activator {
 
 	pubsub_udpmc_admin_t *admin;
 
+	long serializersTrackerId;
+
 	pubsub_admin_service_t adminService;
 	long adminSvcId;
 
@@ -43,6 +45,8 @@ typedef struct psa_udpmc_activator {
 int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
 	act->adminSvcId = -1L;
 	act->cmdSvcId = -1L;
+	act->serializersTrackerId = -1L;
+
 
 	logHelper_create(ctx, &act->logHelper);
 	logHelper_start(act->logHelper);
@@ -50,6 +54,17 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
 	act->admin = pubsub_udpmcAdmin_create(ctx, act->logHelper);
 	celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
 
+	//track serializers
+	if (status == CELIX_SUCCESS) {
+		celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+		opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+		opts.filter.ignoreServiceLanguage = true;
+		opts.callbackHandle = act->admin;
+		opts.addWithProperties = pubsub_udpmcAdmin_addSerializerSvc;
+		opts.removeWithProperties = pubsub_udpmcAdmin_removeSerializerSvc;
+		act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+	}
+
 	//register pubsub admin service
 	if (status == CELIX_SUCCESS) {
 		pubsub_admin_service_t *psaSvc = &act->adminService;
@@ -87,6 +102,7 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
 int psa_udpmc_stop(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
 	celix_bundleContext_unregisterService(ctx, act->adminSvcId);
 	celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+	celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
 	pubsub_udpmcAdmin_destroy(act->admin);
 
 	logHelper_stop(act->logHelper);

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 1935c15..aea6de2 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -24,6 +24,7 @@
 #include <netdb.h>
 #include <ifaddrs.h>
 #include <pubsub_endpoint.h>
+#include <pubsub_serializer.h>
 
 #include "pubsub_utils.h"
 #include "pubsub_udpmc_admin.h"
@@ -35,13 +36,13 @@
 #define PUBSUB_UDPMC_SOCKET_ADDRESS_KEY                "udpmc.socket_address"
 #define PUBSUB_UDPMC_SOCKET_PORT_KEY                   "udpmc.socker_port"
 
-#define LOG_DEBUG(...) \
+#define L_DEBUG(...) \
     logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
-#define LOG_INFO(...) \
+#define L_INFO(...) \
     logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__);
-#define LOG_WARN(...) \
+#define L_WARN(...) \
     logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__);
-#define LOG_ERROR(...) \
+#define L_ERROR(...) \
     logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
 struct pubsub_udpmc_admin {
@@ -57,6 +58,11 @@ struct pubsub_udpmc_admin {
     const char *fwUUID;
 
     struct {
+      celix_thread_mutex_t mutex;
+      hash_map_t *map;
+    } serializers;
+
+    struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
     } topicSenders;
@@ -73,9 +79,15 @@ struct pubsub_udpmc_admin {
 
 };
 
+typedef struct psa_zmq_serializer_entry {
+    const char *serType;
+    long svcId;
+    pubsub_serializer_service_t *svc;
+} psa_udpmc_serializer_entry_t;
+
 static celix_status_t udpmc_getIpAddress(const char* interface, char** ip);
-static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
-static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
 
 
 pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
@@ -102,9 +114,9 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
             PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT);
     const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_ITF_KEY, NULL);
     if (udpmc_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
-        LOG_WARN("[PSA_UDPMC] Could not retrieve IP address for interface %s", interface);
+        L_WARN("[PSA_UDPMC] Could not retrieve IP address for interface %s", interface);
     } else if (psa->verbose) {
-        LOG_INFO("[PSA_UDPMC] Using IP address %s", if_ip);
+        L_INFO("[PSA_UDPMC] Using IP address %s", if_ip);
     }
 
     if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
@@ -117,19 +129,19 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
 
     sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
     if(sendSocket == -1) {
-        LOG_ERROR("[PSA_UDPMC] Error creating socket: %s", strerror(errno));
+        L_ERROR("[PSA_UDPMC] Error creating socket: %s", strerror(errno));
     } else {
         char loop = 1;
         int rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
         if(rc != 0) {
-            LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_LOOP): %s", strerror(errno));
+            L_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_LOOP): %s", strerror(errno));
         }
         if (rc == 0) {
             struct in_addr multicast_interface;
             inet_aton(if_ip, &multicast_interface);
             rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface));
             if (rc != 0) {
-                LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_IF): %s", strerror(errno));
+                L_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_IF): %s", strerror(errno));
             }
         }
         if (rc == 0) {
@@ -144,7 +156,7 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
 
     }
     if (psa->verbose) {
-        LOG_INFO("[PSA_UDPMC] Using %s as interface for multicast communication", psa->ifIpAddress);
+        L_INFO("[PSA_UDPMC] Using %s as interface for multicast communication", psa->ifIpAddress);
     }
 
 
@@ -154,13 +166,16 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
         psa->mcIpAddress = strdup(PUBSUB_UDPMC_MC_IP_DEFAULT);
     }
     if (psa->verbose) {
-        LOG_INFO("[PSA_UDPMC] Using %s for service annunciation", psa->mcIpAddress);
+        L_INFO("[PSA_UDPMC] Using %s for service annunciation", psa->mcIpAddress);
     }
 
     psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_DEFAULT_SCORE_KEY, PSA_UDPMC_DEFAULT_SCORE);
     psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_SAMPLE_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE);
     psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_CONTROL_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE);
 
+    celixThreadMutex_create(&psa->serializers.mutex, NULL);
+    psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
     psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
@@ -183,7 +198,7 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        pubsub_udpmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
         pubsub_udpmcTopicSender_destroy(sender);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
@@ -191,7 +206,7 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+        pubsub_udpmc_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
         pubsub_udpmcTopicReceiver_destroy(recv);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -204,6 +219,14 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
 
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    iter = hashMapIterator_construct(psa->serializers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_udpmc_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
     celixThreadMutex_destroy(&psa->topicSenders.mutex);
     hashMap_destroy(psa->topicSenders.map, true, false);
 
@@ -213,6 +236,9 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
     hashMap_destroy(psa->discoveredEndpoints.map, true, false);
 
+    celixThreadMutex_destroy(&psa->serializers.mutex);
+    hashMap_destroy(psa->serializers.map, false, false);
+
     free(psa->mcIpAddress);
     free(psa->ifIpAddress);
     free(psa);
@@ -220,7 +246,7 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
 
 celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
     pubsub_udpmc_admin_t *psa = handle;
-    LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
+    L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
                                                 psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
@@ -231,7 +257,7 @@ celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterB
 
 celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
     pubsub_udpmc_admin_t *psa = handle;
-    LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
+    L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
             psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
@@ -243,7 +269,7 @@ celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderB
 
 celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
     pubsub_udpmc_admin_t *psa = handle;
-    LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
+    L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
     bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, NULL);
     if (outMatch != NULL) {
@@ -264,37 +290,41 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
     celix_properties_t *newEndpoint = NULL;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
-    pubsub_updmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
+    pubsub_udpmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
     if (sender == NULL) {
-        sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, psa->sendSocket, psa->mcIpAddress);
-        const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
-        const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
-        newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
-        celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
-        celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
-        //if available also set container name
-        const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
-        if (cn != NULL) {
-            celix_properties_set(newEndpoint, "container_name", cn);
+        psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, serEntry->svc, psa->sendSocket, psa->mcIpAddress);
         }
-        bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
-        if (!valid) {
-            LOG_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid");
-            celix_properties_destroy(newEndpoint);
-            pubsub_udpmcTopicSender_destroy(sender);
-            free(key);
-        } else {
+        if (sender != NULL) {
+            const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+                    PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+            celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
+            celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
+            }
             hashMap_put(psa->topicSenders.map, key, sender);
-            //TODO connect endpoints to sender, NOTE is this needed for a udpmc topic sender?
+        } else {
+            free(key);
+            L_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid");
         }
     } else {
         free(key);
-        LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+        L_ERROR("[PSA_UDPMC] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
 
+    if (sender != NULL && newEndpoint != NULL) {
+        //TODO connect endpoints to sender, NOTE is this needed for a udpmc topic sender?
+    }
 
     if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
         *outPublisherEndpoint = newEndpoint;
@@ -315,12 +345,12 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
     hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
     if (entry != NULL) {
         char *mapKey = hashMapEntry_getKey(entry);
-        pubsub_updmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
+        pubsub_udpmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
         free(mapKey);
         //TODO disconnect endpoints to sender. note is this needed for a udpmc topic sender?
         pubsub_udpmcTopicSender_destroy(sender);
     } else {
-        LOG_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
+        L_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
     free(key);
@@ -334,43 +364,45 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
     celix_properties_t *newEndpoint = NULL;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
-    pubsub_updmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+    pubsub_udpmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
     if (receiver == NULL) {
-        receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, scope, topic, psa->ifIpAddress, serializerSvcId);
-        const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver);
-        const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver);
-        newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                                            PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
-
-        //if available also set container name
-        const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
-        if (cn != NULL) {
-            celix_properties_set(newEndpoint, "container_name", cn);
+        psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, scope, topic, psa->ifIpAddress, serializerSvcId, serEntry->svc);
         }
-        bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
-        if (!valid) {
-            LOG_ERROR("[PSA UDPMC] Error creating a valid TopicReceiver. Endpoints are not valid");
-            celix_properties_destroy(newEndpoint);
-            pubsub_udpmcTopicReceiver_destroy(receiver);
-            free(key);
-        } else {
-            hashMap_put(psa->topicReceivers.map, key, receiver);
-
-            celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-                pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        if (receiver != NULL) {
+            const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
             }
-            celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+            hashMap_put(psa->topicReceivers.map, key, receiver);
+        } else {
+            L_ERROR("[PSA UDPMC] Error creating a valid TopicReceiver. Endpoints are not valid");
+            free(key);
         }
     } else {
         free(key);
-        LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+        L_ERROR("[PSA_UDPMC] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (receiver != NULL && newEndpoint != NULL) {
+        celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+            pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+    }
 
     if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
         *outSubscriberEndpoint = newEndpoint;
@@ -389,7 +421,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char
     free(key);
     if (entry != NULL) {
         char *receiverKey = hashMapEntry_getKey(entry);
-        pubsub_updmc_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+        pubsub_udpmc_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
         hashMap_remove(psa->topicReceivers.map, receiverKey);
 
         free(receiverKey);
@@ -401,7 +433,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char
     return status;
 }
 
-static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
@@ -434,7 +466,7 @@ celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_propertie
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
         pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -450,7 +482,7 @@ celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_propertie
 }
 
 
-static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
@@ -483,7 +515,7 @@ celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_proper
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
         pubsub_udpmcAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -507,37 +539,41 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
-        const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
-        const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
+        pubsub_udpmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        long serSvcId = pubsub_udpmcTopicSender_serializerSvcId(sender);
+        psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
         const char *scope = pubsub_udpmcTopicSender_scope(sender);
         const char *topic = pubsub_udpmcTopicSender_topic(sender);
         const char *sockAddr = pubsub_udpmcTopicSender_socketAddress(sender);
         fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
-        fprintf(out, "   |- psa type        = %s\n", psaType);
         fprintf(out, "   |- serializer type = %s\n", serType);
         fprintf(out, "   |- socket address  = %s\n", sockAddr);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
 
     fprintf(out, "\n");
     fprintf(out, "\nTopic Receivers:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
-        const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver);
-        const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver);
+        pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        long serSvcId = pubsub_udpmcTopicReceiver_serializerSvcId(receiver);
+        psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
         const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
         const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
         fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
-        fprintf(out, "   |- psa type        = %s\n", psaType);
         fprintf(out, "   |- serializer type = %s\n", serType);
     }
-    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
     fprintf(out, "\n");
 
     //TODO topic receivers/senders connection count
@@ -545,6 +581,73 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
     return status;
 }
 
+void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_udpmc_admin_t *psa = handle;
+
+    const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    if (serType == NULL) {
+        L_INFO("[PSA_ZMQ] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
+        return;
+    }
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_udpmc_serializer_entry_t *entry = hashMap_get(psa->serializers.map, (void*)svcId);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->serType = serType;
+        entry->svcId = svcId;
+        entry->svc = svc;
+        hashMap_put(psa->serializers.map, (void*)svcId, entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_udpmcAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_udpmc_admin_t *psa = handle;
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    //remove serializer
+    // 1) First find entry and
+    // 2) loop and destroy all topic sender using the serializer and
+    // 3) loop and destroy all topic receivers using the serializer
+    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_udpmc_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId);
+    if (entry != NULL) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_udpmc_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+            if (sender != NULL && entry->svcId == pubsub_udpmcTopicSender_serializerSvcId(sender)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_udpmcTopicSender_destroy(sender);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_udpmc_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+            if (receiver != NULL && entry->svcId == pubsub_udpmcTopicReceiver_serializerSvcId(receiver)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_udpmcTopicReceiver_destroy(receiver);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
 #ifndef ANDROID
 static celix_status_t udpmc_getIpAddress(const char* interface, char** ip) {
     celix_status_t status = CELIX_BUNDLE_EXCEPTION;

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index e050384..02ebd44 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -50,6 +50,9 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
 celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
 celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
+void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_udpmcAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
 celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
 celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index f3e320a..24dfc08 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -36,23 +36,16 @@
 #define UDP_BUFFER_SIZE         65535
 #define MAX_UDP_SESSIONS        16
 
-struct pubsub_updmc_topic_receiver {
+struct pubsub_udpmc_topic_receiver {
     celix_bundle_context_t *ctx;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
     char *scope;
     char *topic;
     char* ifIpAddress;
     largeUdp_pt largeUdpHandle;
     int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
 
-    //serialiser svc
-    long serializerTrackerId;
-    struct {
-
-        celix_thread_mutex_t mutex; //protect svc
-        pubsub_serializer_service_t *svc;
-        const celix_properties_t *props;
-    } serializer;
-
     struct {
         celix_thread_t thread;
         celix_thread_mutex_t mutex;
@@ -92,20 +85,22 @@ typedef struct pubsub_msg{
     unsigned int payloadSize;
 } pubsub_msg_t;
 
-static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props);
 static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
-static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg);
+static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg);
 static void* psa_udpmc_recvThread(void * data);
 
 
-pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
                                                                 const char *scope,
                                                                 const char *topic,
                                                                 const char *ifIP,
-                                                                long serializerSvcId) {
-    pubsub_updmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+                                                                long serializerSvcId,
+                                                                pubsub_serializer_service_t *serializer) {
+    pubsub_udpmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
+    receiver->serializerSvcId = serializerSvcId;
+    receiver->serializer = serializer;
     receiver->scope = strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->ifIpAddress = strndup(ifIP, 1024 * 1024);
@@ -114,7 +109,6 @@ pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
     receiver->topicEpollFd = epoll_create1(0);
 
 
-    celixThreadMutex_create(&receiver->serializer.mutex, NULL);
     celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
     celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
     celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
@@ -122,20 +116,6 @@ pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
     receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
     receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
-    //track serializer svc based on the provided serializerSvcId
-    {
-        char filter[64];
-        snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
-
-        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
-        opts.filter.filter = filter;
-        opts.filter.ignoreServiceLanguage = true;
-        opts.callbackHandle = receiver;
-        opts.setWithProperties = pubsub_udpmcTopicReceiver_setSerializer;
-        receiver->serializerTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    }
-
     //track subscribers
     {
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -157,9 +137,8 @@ pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
     return receiver;
 }
 
-void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
+void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver) {
     if (receiver != NULL) {
-        celix_bundleContext_stopTracker(receiver->ctx, receiver->serializerTrackerId);
         celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
 
         celixThreadMutex_lock(&receiver->recvThread.mutex);
@@ -167,7 +146,6 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver)
         celixThreadMutex_unlock(&receiver->recvThread.mutex);
         celixThread_join(receiver->recvThread.thread, NULL);
 
-        celixThreadMutex_destroy(&receiver->serializer.mutex);
         celixThreadMutex_destroy(&receiver->subscribers.mutex);
         celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
         celixThreadMutex_destroy(&receiver->recvThread.mutex);
@@ -180,29 +158,19 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver)
     free(receiver);
 }
 
-const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver) {
-    return PSA_UDPMC_PUBSUB_ADMIN_TYPE;
-}
-
-const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver) {
-    const char *result = NULL;
-    celixThreadMutex_lock(&receiver->serializer.mutex);
-    if (receiver->serializer.props != NULL) {
-        result = celix_properties_get(receiver->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
-    }
-    celixThreadMutex_unlock(&receiver->serializer.mutex);
-    return result;
-}
-
-const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver) {
+const char* pubsub_udpmcTopicReceiver_scope(pubsub_udpmc_topic_receiver_t *receiver) {
     return receiver->scope;
 }
-const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver) {
+const char* pubsub_udpmcTopicReceiver_topic(pubsub_udpmc_topic_receiver_t *receiver) {
     return receiver->topic;
 }
 
+long pubsub_udpmcTopicReceiver_serializerSvcId(pubsub_udpmc_topic_receiver_t *receiver) {
+    return receiver->serializerSvcId;
+}
+
 void pubsub_udpmcTopicReceiver_connectTo(
-        pubsub_updmc_topic_receiver_t *receiver,
+        pubsub_udpmc_topic_receiver_t *receiver,
         const char *socketAddress,
         long socketPort) {
     printf("[PSA UDPMC] TopicReceiver %s/%s connect to socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
@@ -261,7 +229,7 @@ void pubsub_udpmcTopicReceiver_connectTo(
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
-void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) {
+void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_udpmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) {
     printf("[PSA UDPMC] TopicReceiver %s/%s disconnect from socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
 
     int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort);
@@ -287,23 +255,8 @@ void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *rec
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
-
-static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_updmc_topic_receiver_t *receiver = handle;
-    pubsub_serializer_service_t *ser = svc;
-
-    if (ser == NULL) {
-        //TODO -> no serializer -> remove all publishers
-    }
-
-    celixThreadMutex_lock(&receiver->serializer.mutex);
-    receiver->serializer.svc = ser;
-    receiver->serializer.props = props;
-    celixThreadMutex_unlock(&receiver->serializer.mutex);
-}
-
 static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
-    pubsub_updmc_topic_receiver_t *receiver = handle;
+    pubsub_udpmc_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -322,22 +275,19 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
         entry->usageCount = 1;
         entry->svc = svc;
 
-        celixThreadMutex_lock(&receiver->serializer.mutex);
-        if (receiver->serializer.svc != NULL) {
-            receiver->serializer.svc->createSerializerMap(receiver->serializer.svc->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+        if (rc == 0) {
+            hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
         } else {
+            free(entry);
             fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
         }
-        celixThreadMutex_unlock(&receiver->serializer.mutex);
-
-        hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
-
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
 static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
-    pubsub_updmc_topic_receiver_t *receiver = handle;
+    pubsub_udpmc_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
 
@@ -349,20 +299,17 @@ static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc,
     if (entry != NULL && entry->usageCount <= 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
-        celixThreadMutex_lock(&receiver->serializer.mutex);
-        if (receiver->serializer.svc != NULL) {
-            receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle, entry->msgTypes);
-        } else {
+        int rc =  receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+        if (rc != 0) {
             fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
         }
-        celixThreadMutex_unlock(&receiver->serializer.mutex);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
 static void* psa_udpmc_recvThread(void * data) {
-    pubsub_updmc_topic_receiver_t *receiver = data;
+    pubsub_udpmc_topic_receiver_t *receiver = data;
 
     struct epoll_event events[MAX_EPOLL_EVENTS];
 
@@ -397,7 +344,7 @@ static void* psa_udpmc_recvThread(void * data) {
     return NULL;
 }
 
-static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg) {
+static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg) {
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
     while (hashMapIterator_hasNext(&iter)) {

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
index ee2c113..2610489 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
@@ -20,26 +20,28 @@
 #define CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H
 
 #include "celix_bundle_context.h"
+#include "pubsub_serializer.h"
 
-typedef struct pubsub_updmc_topic_receiver pubsub_updmc_topic_receiver_t;
+typedef struct pubsub_udpmc_topic_receiver pubsub_udpmc_topic_receiver_t;
 
-pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx, 
+pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx, 
         const char *scope, 
         const char *topic, 
         const char *ifIP, 
-        long serializerSvcId);
-void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver);
+        long serializerSvcId,
+        pubsub_serializer_service_t *serializer);
+void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver);
 
-const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_udpmcTopicReceiver_socketAddress(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_scope(pubsub_udpmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_topic(pubsub_udpmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_socketAddress(pubsub_udpmc_topic_receiver_t *receiver);
+
+long pubsub_udpmcTopicReceiver_serializerSvcId(pubsub_udpmc_topic_receiver_t *receiver);
 
 void pubsub_udpmcTopicReceiver_connectTo(
-        pubsub_updmc_topic_receiver_t *receiver,
+        pubsub_udpmc_topic_receiver_t *receiver,
         const char *socketAddress,
         long socketPort);
-void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort);
+void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_udpmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort);
 
 #endif //CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 4a6d027..3b13935 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -38,8 +38,10 @@
 #define UDP_MAX_PORT	                        65000
 
 
-struct pubsub_updmc_topic_sender {
+struct pubsub_udpmc_topic_sender {
     celix_bundle_context_t *ctx;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
     char *scope;
     char *topic;
     char *socketAddress;
@@ -48,13 +50,6 @@ struct pubsub_updmc_topic_sender {
     int sendSocket;
     struct sockaddr_in destAddr;
 
-    long serTrackerId;
-    struct {
-        celix_thread_mutex_t mutex;
-        pubsub_serializer_service_t *svc;
-        const celix_properties_t *props;
-    } serializer;
-
     struct {
         long svcId;
         celix_service_factory_t factory;
@@ -67,7 +62,7 @@ struct pubsub_updmc_topic_sender {
 };
 
 typedef struct psa_udpmc_bounded_service_entry {
-    pubsub_updmc_topic_sender_t *parent;
+    pubsub_udpmc_topic_sender_t *parent;
     pubsub_publisher_t service;
     long bndId;
     hash_map_t *msgTypes;
@@ -81,26 +76,27 @@ typedef struct pubsub_msg{
     unsigned int payloadSize;
 } pubsub_msg_t;
 
-static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
 static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
 static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback);
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
-pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
+pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
         celix_bundle_context_t *ctx,
         const char *scope,
         const char *topic,
         long serializerSvcId,
+        pubsub_serializer_service_t *serializer,
         int sendSocket,
         const char *bindIP) {
-    pubsub_updmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
+    pubsub_udpmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
+    sender->serializerSvcId = serializerSvcId;
+    sender->serializer = serializer;
     sender->scope = strndup(scope, 1024 * 1024);
     sender->topic = strndup(topic, 1024 * 1024);
 
-    celixThreadMutex_create(&sender->serializer.mutex, NULL);
     celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
     sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
 
@@ -116,20 +112,6 @@ pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
         sender->socketPort = port;
     }
 
-    //track serializer svc based on the provided serializerSvcId
-    {
-        char filter[64];
-        snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
-
-        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
-        opts.filter.filter = filter;
-        opts.filter.ignoreServiceLanguage = true;
-        opts.callbackHandle = sender;
-        opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer;
-        sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    }
-
     //register publisher services using a service factory
     {
         sender->publisher.factory.handle = sender;
@@ -147,12 +129,10 @@ pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
     return sender;
 }
 
-void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) {
+void pubsub_udpmcTopicSender_destroy(pubsub_udpmc_topic_sender_t *sender) {
     if (sender != NULL) {
-        celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
         celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
 
-        celixThreadMutex_destroy(&sender->serializer.mutex);
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
 
         //TODO loop and cleanup?
@@ -165,62 +145,41 @@ void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) {
     }
 }
 
-const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender __attribute__((unused))) {
+const char* pubsub_udpmcTopicSender_psaType(pubsub_udpmc_topic_sender_t *sender __attribute__((unused))) {
     return PSA_UDPMC_PUBSUB_ADMIN_TYPE;
 }
 
-const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *sender) {
-    const char *result = NULL;
-    celixThreadMutex_lock(&sender->serializer.mutex);
-    if (sender->serializer.props != NULL) {
-        result = celix_properties_get(sender->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
-    }
-    celixThreadMutex_unlock(&sender->serializer.mutex);
-    return result;
-}
 
-const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender) {
+const char* pubsub_udpmcTopicSender_scope(pubsub_udpmc_topic_sender_t *sender) {
     return sender->scope;
 }
 
-const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender) {
+const char* pubsub_udpmcTopicSender_topic(pubsub_udpmc_topic_sender_t *sender) {
     return sender->topic;
 }
 
-const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender) {
+const char* pubsub_udpmcTopicSender_socketAddress(pubsub_udpmc_topic_sender_t *sender) {
     return sender->socketAddress;
 }
 
-long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender) {
+long pubsub_udpmcTopicSender_socketPort(pubsub_udpmc_topic_sender_t *sender) {
     return sender->socketPort;
 }
 
-void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_udpmcTopicSender_connectTo(pubsub_udpmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
     //TODO subscriber count -> topic info
 }
 
-void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_udpmcTopicSender_disconnectFrom(pubsub_udpmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
     //TODO
 }
 
-static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_updmc_topic_sender_t *sender = handle;
-    pubsub_serializer_service_t *ser = svc;
-
-    if (ser == NULL) {
-        //TODO -> no serializer -> remove all publishers
-    }
-
-    celixThreadMutex_lock(&sender->serializer.mutex);
-    sender->serializer.svc = ser;
-    sender->serializer.props = props;
-    celixThreadMutex_unlock(&sender->serializer.mutex);
-}
-
 static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
-    pubsub_updmc_topic_sender_t *sender = handle;
+    pubsub_udpmc_topic_sender_t *sender = handle;
     long bndId = celix_bundle_getId(requestingBundle);
 
+    pubsub_publisher_t *svc = NULL;
+
     celixThreadMutex_lock(&sender->boundedServices.mutex);
     psa_udpmc_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
     if (entry != NULL) {
@@ -232,32 +191,26 @@ static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *r
         entry->bndId = bndId;
         entry->largeUdpHandle = largeUdp_create(1);
 
-        celixThreadMutex_lock(&sender->serializer.mutex);
-        celix_status_t rc = CELIX_SUCCESS;
-        if (sender->serializer.svc != NULL) {
-            rc = sender->serializer.svc->createSerializerMap(sender->serializer.svc->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
-        }
-        if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
-            //TODO destroy and return NULL?
+        int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+        if (rc == 0) {
+            entry->service.handle = entry;
+            entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
+            entry->service.send = psa_udpmc_topicPublicationSend;
+            entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
+            hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+            svc = &entry->service;
+        } else {
             fprintf(stderr, "Error creating publisher service, serializer not available / cannot get msg serializer map\n");
+            free(entry);
         }
-        celixThreadMutex_unlock(&sender->serializer.mutex);
-
-
-        entry->service.handle = entry;
-        entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
-        entry->service.send = psa_udpmc_topicPublicationSend;
-        entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
-
-        hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 
-    return &entry->service;
+    return svc;
 }
 
 static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
-    pubsub_updmc_topic_sender_t *sender = handle;
+    pubsub_udpmc_topic_sender_t *sender = handle;
     long bndId = celix_bundle_getId(requestingBundle);
 
     celixThreadMutex_lock(&sender->boundedServices.mutex);
@@ -269,17 +222,10 @@ static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *
         //free entry
         hashMap_remove(sender->boundedServices.map, (void*)bndId);
 
-
-        celixThreadMutex_lock(&sender->serializer.mutex);
-        celix_status_t rc = CELIX_SUCCESS;
-        if (sender->serializer.svc != NULL) {
-            rc = sender->serializer.svc->destroySerializerMap(sender->serializer.svc->handle, entry->msgTypes);
-        }
-        if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+        int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+        if (rc != 0) {
             fprintf(stderr, "Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
         }
-        celixThreadMutex_unlock(&sender->serializer.mutex);
-
 
         largeUdp_destroy(entry->largeUdpHandle);
         free(entry);

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
index 89e56ec..07c1301 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
@@ -20,28 +20,30 @@
 #define CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H
 
 #include "celix_bundle_context.h"
+#include "pubsub_serializer.h"
 
-typedef struct pubsub_updmc_topic_sender pubsub_updmc_topic_sender_t;
+typedef struct pubsub_udpmc_topic_sender pubsub_udpmc_topic_sender_t;
 
-pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
+pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
         celix_bundle_context_t *ctx,
         const char *scope,
         const char *topic,
         long serializerSvcId,
+        pubsub_serializer_service_t *serializer,
         int sendSocket,
         const char *bindIP);
-void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender);
+void pubsub_udpmcTopicSender_destroy(pubsub_udpmc_topic_sender_t *sender);
 
-const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender);
-const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *sender);
-const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender);
-const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender);
-const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender);
-long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender);
+const char* pubsub_udpmcTopicSender_scope(pubsub_udpmc_topic_sender_t *sender);
+const char* pubsub_udpmcTopicSender_topic(pubsub_udpmc_topic_sender_t *sender);
+const char* pubsub_udpmcTopicSender_socketAddress(pubsub_udpmc_topic_sender_t *sender);
+long pubsub_udpmcTopicSender_socketPort(pubsub_udpmc_topic_sender_t *sender);
+
+long pubsub_udpmcTopicSender_serializerSvcId(pubsub_udpmc_topic_sender_t *sender);
 
 //TODO connections etc
 
-void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint);
-void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint);
+void pubsub_udpmcTopicSender_connectTo(pubsub_udpmc_topic_sender_t *sender, const celix_properties_t *endpoint);
+void pubsub_udpmcTopicSender_disconnectFrom(pubsub_udpmc_topic_sender_t *sender, const celix_properties_t *endpoint);
 
 #endif //CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index b7cc138..353c33e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -33,6 +33,8 @@ typedef struct psa_zmq_activator {
 
 	pubsub_zmq_admin_t *admin;
 
+	long serializersTrackerId;
+
 	pubsub_admin_service_t adminService;
 	long adminSvcId;
 
@@ -43,6 +45,7 @@ typedef struct psa_zmq_activator {
 int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
 	act->adminSvcId = -1L;
 	act->cmdSvcId = -1L;
+	act->serializersTrackerId = -1L;
 
 	logHelper_create(ctx, &act->logHelper);
 	logHelper_start(act->logHelper);
@@ -50,6 +53,17 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
 	act->admin = pubsub_zmqAdmin_create(ctx, act->logHelper);
 	celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
 
+	//track serializers
+	if (status == CELIX_SUCCESS) {
+		celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+		opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+		opts.filter.ignoreServiceLanguage = true;
+		opts.callbackHandle = act->admin;
+		opts.addWithProperties = pubsub_zmqAdmin_addSerializerSvc;
+		opts.removeWithProperties = pubsub_zmqAdmin_removeSerializerSvc;
+		act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+	}
+
 	//register pubsub admin service
 	if (status == CELIX_SUCCESS) {
 		pubsub_admin_service_t *psaSvc = &act->adminService;
@@ -87,6 +101,7 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
 int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
 	celix_bundleContext_unregisterService(ctx, act->adminSvcId);
 	celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+	celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
 	pubsub_zmqAdmin_destroy(act->admin);
 
 	logHelper_stop(act->logHelper);

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index d3ba3b5..5b6c384 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -25,6 +25,7 @@
 #include <ifaddrs.h>
 #include <pubsub_endpoint.h>
 #include <czmq.h>
+#include <pubsub_serializer.h>
 
 #include "pubsub_utils.h"
 #include "pubsub_zmq_admin.h"
@@ -60,24 +61,35 @@ struct pubsub_zmq_admin {
 
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t
+        hash_map_t *map; //key = svcId, value = psa_zmq_serializer_entry_t*
+    } serializers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t*
     } topicSenders;
 
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t
+        hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t*
     } topicReceivers;
 
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = endpoint uuid, value = psa_zmq_connected_endpoint_entry_t
+        hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
     } discoveredEndpoints;
 
 };
 
+typedef struct psa_zmq_serializer_entry {
+    const char *serType;
+    long svcId;
+    pubsub_serializer_service_t *svc;
+} psa_zmq_serializer_entry_t;
+
 static celix_status_t zmq_getIpAddress(const char* interface, const char** ip);
-static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
-static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
 
 
 pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
@@ -148,6 +160,9 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_help
     psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE);
     psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE);
 
+    celixThreadMutex_create(&psa->serializers.mutex, NULL);
+    psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
     psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
@@ -178,7 +193,7 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+        pubsub_zmq_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
         pubsub_zmqTopicReceiver_destroy(recv);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -191,6 +206,14 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
 
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    iter = hashMapIterator_construct(psa->serializers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_zmq_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
     celixThreadMutex_destroy(&psa->topicSenders.mutex);
     hashMap_destroy(psa->topicSenders.map, true, false);
 
@@ -200,6 +223,9 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
     hashMap_destroy(psa->discoveredEndpoints.map, true, false);
 
+    celixThreadMutex_destroy(&psa->serializers.mutex);
+    hashMap_destroy(psa->serializers.map, false, false);
+
     if (psa->zmq_auth != NULL){
         zactor_destroy(&psa->zmq_auth);
     }
@@ -209,6 +235,73 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     free(psa);
 }
 
+void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_zmq_admin_t *psa = handle;
+
+    const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    if (serType == NULL) {
+        L_INFO("[PSA_ZMQ] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
+        return;
+    }
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_zmq_serializer_entry_t *entry = hashMap_get(psa->serializers.map, (void*)svcId);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->serType = serType;
+        entry->svcId = svcId;
+        entry->svc = svc;
+        hashMap_put(psa->serializers.map, (void*)svcId, entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_zmq_admin_t *psa = handle;
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    //remove serializer
+    // 1) First find entry and
+    // 2) loop and destroy all topic sender using the serializer and
+    // 3) loop and destroy all topic receivers using the serializer
+    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_zmq_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId);
+    if (entry != NULL) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_zmq_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+            if (sender != NULL && entry->svcId == pubsub_zmqTopicSender_serializerSvcId(sender)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_zmqTopicSender_destroy(sender);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+            if (receiver != NULL && entry->svcId == pubsub_zmqTopicReceiver_serializerSvcId(receiver)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_zmqTopicReceiver_destroy(receiver);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
 celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchPublisher");
@@ -255,13 +348,19 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
     celix_properties_t *newEndpoint = NULL;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     pubsub_zmq_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
     if (sender == NULL) {
-        sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, psa->ipAddress, psa->basePort, psa->maxPort);
+        psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc, psa->ipAddress,
+                                                  psa->basePort, psa->maxPort);
+        }
         if (sender != NULL) {
-            const char *psaType = pubsub_zmqTopicSender_psaType(sender);
-            const char *serType = pubsub_zmqTopicSender_serializerType(sender);
+            const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
                                                 serType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
@@ -270,8 +369,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
             if (cn != NULL) {
                 celix_properties_set(newEndpoint, "container_name", cn);
             }
-
-            //TODO connect endpoints to sender, NOTE is this needed for a zmq topic sender?
             hashMap_put(psa->topicSenders.map, key, sender);
         } else {
             L_ERROR("[PSA ZMQ] Error creating a TopicSender");
@@ -282,7 +379,11 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
         L_ERROR("[PSA_ZMQ] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
 
+    if (sender != NULL && newEndpoint != NULL) {
+        //TODO connect endpoints to sender, NOTE is this needed for a zmq topic sender?
+    }
 
     if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
         *outPublisherEndpoint = newEndpoint;
@@ -322,30 +423,26 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
     celix_properties_t *newEndpoint = NULL;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
-    pubsub_updmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+    pubsub_zmq_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
     if (receiver == NULL) {
-        receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId);
+        psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc);
+        } else {
+            L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
+        }
         if (receiver != NULL) {
-            const char *psaType = pubsub_zmqTopicReceiver_psaType(receiver);
-            const char *serType = pubsub_zmqTopicReceiver_serializerType(receiver);
+            const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
                                                 PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
-
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {
                 celix_properties_set(newEndpoint, "container_name", cn);
             }
-
-            celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-                pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
-            }
-            celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
-
             hashMap_put(psa->topicReceivers.map, key, receiver);
         } else {
             L_ERROR("[PSA ZMQ] Error creating a TopicReceiver.");
@@ -356,7 +453,17 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
         L_ERROR("[PSA_ZMQ] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (receiver != NULL && newEndpoint != NULL) {
+        celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+            pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+    }
 
     if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
         *outSubscriberEndpoint = newEndpoint;
@@ -375,7 +482,7 @@ celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *s
     free(key);
     if (entry != NULL) {
         char *receiverKey = hashMapEntry_getKey(entry);
-        pubsub_updmc_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+        pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
         hashMap_remove(psa->topicReceivers.map, receiverKey);
 
         free(receiverKey);
@@ -387,7 +494,7 @@ celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *s
     return status;
 }
 
-static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
@@ -419,7 +526,7 @@ celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
         pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -435,7 +542,7 @@ celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_
 }
 
 
-static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
@@ -467,7 +574,7 @@ celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properti
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
         pubsub_zmqAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -491,37 +598,41 @@ celix_status_t pubsub_zmqAdmin_executeCommand(void *handle, char *commandLine __
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_zmq_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
-        const char *psaType = pubsub_zmqTopicSender_psaType(sender);
-        const char *serType = pubsub_zmqTopicSender_serializerType(sender);
+        long serSvcId = pubsub_zmqTopicSender_serializerSvcId(sender);
+        psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
         const char *scope = pubsub_zmqTopicSender_scope(sender);
         const char *topic = pubsub_zmqTopicSender_topic(sender);
         const char *url = pubsub_zmqTopicSender_url(sender);
         fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
-        fprintf(out, "   |- psa type        = %s\n", psaType);
         fprintf(out, "   |- serializer type = %s\n", serType);
         fprintf(out, "   |- url             = %s\n", url);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
 
     fprintf(out, "\n");
     fprintf(out, "\nTopic Receivers:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
-        const char *psaType = pubsub_zmqTopicReceiver_psaType(receiver);
-        const char *serType = pubsub_zmqTopicReceiver_serializerType(receiver);
+        pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        long serSvcId = pubsub_zmqTopicReceiver_serializerSvcId(receiver);
+        psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
         const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
         const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
         fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
-        fprintf(out, "   |- psa type        = %s\n", psaType);
         fprintf(out, "   |- serializer type = %s\n", serType);
     }
-    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
     fprintf(out, "\n");
 
     //TODO topic receivers/senders connection count

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index a1c5a75..180a9db 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -53,6 +53,9 @@ celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *s
 celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
 celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
 
+void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
 celix_status_t pubsub_zmqAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
 
 #endif //CELIX_PUBSUB_ZMQ_ADMIN_H