You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/05/26 18:04:14 UTC

[celix] 01/02: Adds use of pubsub serializer handler to pubsub v2 tcp and v2 zmq

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 14f3bc1a81ad4d4a672f899f360d93ee0603bfd4
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed May 26 19:05:17 2021 +0200

    Adds use of pubsub serializer handler to pubsub v2 tcp and v2 zmq
---
 .../pubsub/pubsub_admin_tcp/v2/src/psa_activator.c |  15 --
 .../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c     | 247 +++++----------------
 .../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h     |   4 -
 .../v2/src/pubsub_tcp_topic_receiver.c             |  70 ++----
 .../v2/src/pubsub_tcp_topic_receiver.h             |   7 +-
 .../v2/src/pubsub_tcp_topic_sender.c               |  72 +++---
 .../v2/src/pubsub_tcp_topic_sender.h               |   4 +-
 .../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c     |  20 --
 .../v2/src/pubsub_zmq_topic_receiver.c             |  12 +-
 .../v2/src/pubsub_zmq_topic_receiver.h             |   2 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |  22 +-
 .../v2/src/pubsub_zmq_topic_sender.h               |   2 +-
 .../src/PubSubSerializationHandlerTestSuite.cc     |  23 +-
 .../include/pubsub_serializer_handler.h            |  27 ++-
 .../pubsub_utils/src/pubsub_serializer_handler.c   | 108 ++++++---
 15 files changed, 248 insertions(+), 387 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
index ec9badb..ec3f853 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
@@ -20,7 +20,6 @@
 #include <stdlib.h>
 
 #include "celix_api.h"
-#include "pubsub_serializer.h"
 #include "pubsub_protocol.h"
 #include "celix_log_helper.h"
 
@@ -34,7 +33,6 @@ typedef struct psa_tcp_activator {
 
     pubsub_tcp_admin_t *admin;
 
-    long serializersTrackerId;
     long protocolsTrackerId;
 
     pubsub_admin_service_t adminService;
@@ -50,7 +48,6 @@ typedef struct psa_tcp_activator {
 int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
     act->adminSvcId = -1L;
     act->cmdSvcId = -1L;
-    act->serializersTrackerId = -1L;
     act->protocolsTrackerId = -1L;
 
     act->logHelper = celix_logHelper_create(ctx, "celix_psa_admin_tcp_v2");
@@ -58,17 +55,6 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
     act->admin = pubsub_tcpAdmin_create(ctx, act->logHelper);
     celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
 
-    //track serializers
-    if (status == CELIX_SUCCESS) {
-        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-        opts.filter.ignoreServiceLanguage = true;
-        opts.callbackHandle = act->admin;
-        opts.addWithProperties = pubsub_tcpAdmin_addSerializerSvc;
-        opts.removeWithProperties = pubsub_tcpAdmin_removeSerializerSvc;
-        act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    }
-
     //track protocols
     if (status == CELIX_SUCCESS) {
         celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
@@ -132,7 +118,6 @@ int psa_tcp_stop(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
     celix_bundleContext_unregisterService(ctx, act->adminSvcId);
     celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
     celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
-    celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
     celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
     pubsub_tcpAdmin_destroy(act->admin);
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
index 66c92ef..f5f200f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
@@ -55,11 +55,6 @@ struct pubsub_tcp_admin {
     bool verbose;
 
     struct {
-        celix_thread_rwlock_t mutex;
-        hash_map_t *map; //key = svcId, value = psa_tcp_serializer_entry_t*
-    } serializers;
-
-    struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = svcId, value = psa_tcp_protocol_entry_t*
     } protocols;
@@ -79,6 +74,11 @@ struct pubsub_tcp_admin {
         hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
     } discoveredEndpoints;
 
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+    } serializationHandlers;
+
     pubsub_tcp_endPointStore_t endpointStore;
 };
 
@@ -101,11 +101,6 @@ static bool pubsub_tcpAdmin_endpointIsPublisher(const celix_properties_t *endpoi
     return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
 }
 
-static void pubsub_tcpAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
-    const char** out = handle;
-    *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
 pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) {
     pubsub_tcp_admin_t *psa = calloc(1, sizeof(*psa));
     psa->ctx = ctx;
@@ -120,11 +115,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo
     psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_CONTROL_SCORE_KEY,
                                                                    PSA_TCP_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadRwlock_create(&psa->serializers.mutex, NULL);
-    psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
     celixThreadMutex_create(&psa->protocols.mutex, NULL);
     psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
     psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
@@ -137,6 +130,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo
     celixThreadMutex_create(&psa->endpointStore.mutex, NULL);
     psa->endpointStore.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
+    celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+    psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     return psa;
 }
 
@@ -177,13 +173,13 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
 
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    iter = hashMapIterator_construct(psa->serializers.map);
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    iter = hashMapIterator_construct(psa->serializationHandlers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        psa_tcp_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
-        free(entry);
+        pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+        pubsub_serializerHandler_destroy(entry);
     }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
 
     celixThreadMutex_lock(&psa->protocols.mutex);
     iter = hashMapIterator_construct(psa->protocols.map);
@@ -205,8 +201,9 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
     hashMap_destroy(psa->discoveredEndpoints.map, false, false);
 
-    celixThreadRwlock_destroy(&psa->serializers.mutex);
-    hashMap_destroy(psa->serializers.map, false, false);
+    celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+    hashMap_destroy(psa->serializationHandlers.map, false, false);
+
     celixThreadMutex_destroy(&psa->protocols.mutex);
     hashMap_destroy(psa->protocols.map, false, false);
 
@@ -215,101 +212,6 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
     free(psa);
 }
 
-void pubsub_tcpAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_tcp_admin_t *psa = handle;
-
-    const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-    const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
-    const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
-    if (serType == NULL || msgId == -1L || msgFqn == NULL) {
-        L_INFO("[PSA_TCP_V2] Ignoring serializer service without one of the following properties: %s or %s or %s",
-               PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
-        L_INFO("[PSA_TCP_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-        return;
-    }
-    L_INFO("[PSA_TCP_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries == NULL) {
-        typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
-        hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
-        L_INFO("[PSA_TCP_V2] typeEntries added %p %s", psa->serializers.map, serType);
-    }
-    psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
-    if (entry == NULL) {
-        entry = calloc(1, sizeof(psa_tcp_serializer_entry_t));
-        entry->svc = svc;
-        entry->fqn = celix_utils_strdup(msgFqn);
-        entry->version = celix_utils_strdup(msgVersion);
-        hashMap_put(typeEntries, (void*)msgId, entry);
-        L_INFO("[PSA_TCP_V2] entry added");
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_tcp_admin_t *psa = handle;
-    const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
-    //remove serializer
-    // 1) First find entry and
-    // 2) loop and destroy all topic sender using the serializer and
-    // 3) loop and destroy all topic receivers using the serializer
-    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries != NULL) {
-        psa_tcp_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
-        free((void*)entry->fqn);
-        free((void*)entry->version);
-        free(entry);
-
-        // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
-        if(hashMap_size(typeEntries) == 0) {
-            hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-            celixThreadMutex_lock(&psa->topicSenders.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-                pubsub_tcp_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
-                if (sender != NULL && strncmp(serType, pubsub_tcpTopicSender_serializerType(sender), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(senderEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_tcpTopicSender_destroy(sender);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
-            celixThreadMutex_lock(&psa->topicReceivers.mutex);
-            iter = hashMapIterator_construct(psa->topicReceivers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter);
-                pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry);
-                if (receiver != NULL && strncmp(serType, pubsub_tcpTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(receiverEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_tcpTopicReceiver_destroy(receiver);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-        } else {
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-        }
-    } else {
-        celixThreadRwlock_unlock(&psa->serializers.mutex);
-    }
-}
-
 void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_tcp_admin_t *psa = handle;
 
@@ -421,30 +323,40 @@ pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *
     return status;
 }
 
+static pubsub_serializer_handler_t* pubsub_tcpAdmin_getSerializationHandler(pubsub_tcp_admin_t* psa, long msgSerializationMarkerSvcId) {
+    pubsub_serializer_handler_t* handler = NULL;
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+    if (handler == NULL) {
+        handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+        if (handler != NULL) {
+            hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+        }
+    }
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+    return handler;
+}
+
 celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
                                                 const celix_properties_t *topicProperties, long serializerSvcId,
                                                 long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_tcp_admin_t *psa = handle;
     celix_status_t status = CELIX_SUCCESS;
 
-    //1) Create TopicSender
-    //2) Store TopicSender
-    //3) Connect existing endpoints
-    //4) set outPublisherEndpoint
+    //1) Get serialization handler
+    //2) Create TopicSender
+    //3) Store TopicSender
+    //4) Connect existing endpoints
+    //5) set outPublisherEndpoint
+
+    pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic sender without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
 
     celix_properties_t *newEndpoint = NULL;
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-    //get serializer type
-    const char *serType = NULL;
-    celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
-    opts.callbackHandle = &serType;
-    opts.useWithProperties = pubsub_tcpAdmin_getSerType;
-    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-    char filter[32];
-    snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
-    opts.filter.filter = filter;
-    celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
     
     celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
@@ -453,14 +365,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
     if (sender == NULL) {
         psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
         if (protEntry != NULL) {
-            sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+            sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle, topicProperties,
                                                   &psa->endpointStore, protocolSvcId,
                                                   protEntry->svc);
         }
         if (sender != NULL) {
             const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
             const char *protType = protEntry->protType;
-            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+                                                pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
 
             celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
@@ -525,21 +438,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
                                                   long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_tcp_admin_t *psa = handle;
 
-    celix_properties_t *newEndpoint = NULL;
+    pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic receiver without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
 
+    celix_properties_t *newEndpoint = NULL;
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
-    //get serializer type
-    const char *serType = NULL;
-    celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
-    opts.callbackHandle = &serType;
-    opts.useWithProperties = pubsub_tcpAdmin_getSerType;
-    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-    char filter[32];
-    snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
-    opts.filter.filter = filter;
-    celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
     celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
@@ -547,7 +454,8 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
     if (receiver == NULL) {
         psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
         if (protEntry != NULL) {
-            receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+            receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic,
+                                                      handler, handle, topicProperties,
                                                       &psa->endpointStore, protocolSvcId, protEntry->svc);
         } else {
             L_ERROR("[PSA_TCP_V2] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
@@ -556,7 +464,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
             const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
             const char *protType = protEntry->protType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {
@@ -795,53 +703,6 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
     return status;
 }
 
-
-
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
-    pubsub_tcp_admin_t *psa = handle;
-    psa_tcp_serializer_entry_t *serializer = NULL;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
-    if(typeEntries != NULL) {
-        serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
-    }
-
-    return serializer;
-}
-
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer __attribute__((unused))) {
-    pubsub_tcp_admin_t *psa = handle;
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
-    pubsub_tcp_admin_t *psa = handle;
-    int64_t id = -1L;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
-    if(typeEntries != NULL) {
-        hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
-        while(hashMapIterator_hasNext(&iterator)) {
-            void *key = hashMapIterator_nextKey(&iterator);
-            psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, key);
-            L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn);
-            if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
-                id = (uint32_t)(uintptr_t)key;
-                break;
-            }
-        }
-    } else {
-        L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn);
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-    L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
-
-    return id;
-}
-
 pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle) {
     return NULL;
 }
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
index 2440fbb..513a934 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
@@ -82,10 +82,6 @@ void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_propert
 void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
 bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
 
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId);
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer);
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn);
-
 pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle);
 
 #endif //CELIX_PUBSUB_TCP_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 853e49d..b02768a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -56,7 +56,7 @@ struct pubsub_tcp_topic_receiver {
     pubsub_protocol_service_t *protocol;
     char *scope;
     char *topic;
-    char *serType;
+    pubsub_serializer_handler_t* serializerHandler;
     void *admin;
     size_t timeout;
     bool isPassive;
@@ -104,13 +104,12 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
 static void processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime);
 static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
 static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
 
 pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
                                                             celix_log_helper_t *logHelper,
                                                             const char *scope,
                                                             const char *topic,
-                                                            const char *serType,
+                                                            pubsub_serializer_handler_t* serializerHandler,
                                                             void *admin,
                                                             const celix_properties_t *topicProperties,
                                                             pubsub_tcp_endPointStore_t *handlerStore,
@@ -119,7 +118,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
-    receiver->serType = celix_utils_strdup(serType);
+    receiver->serializerHandler = serializerHandler;
     receiver->admin = admin;
     receiver->protocolSvcId = protocolSvcId;
     receiver->protocol = protocol;
@@ -269,7 +268,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             }
         }
         hashMap_destroy(receiver->subscribers.map, false, false);
-
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -299,7 +297,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             free(receiver->scope);
         }
         free(receiver->topic);
-        free(receiver->serType);
     }
     free(receiver);
 }
@@ -313,7 +310,7 @@ const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver)
 }
 
 const char *pubsub_tcpTopicReceiver_serializerType(pubsub_tcp_topic_receiver_t *receiver) {
-    return receiver->serType;
+    return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
 }
 
 long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver) {
@@ -460,47 +457,39 @@ static inline void
 processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
                              const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
     //NOTE receiver->subscribers.mutex locked
-    psa_tcp_serializer_entry_t *msgSer = pubsub_tcpAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, message->header.msgId);
 
-    if(msgSer == NULL) {
-        pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
-        L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X. Received payload size is %u.", message->header.msgId, message->payload.length);
-        return;
-    }
+    const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
 
     void *deSerializedMsg = NULL;
-    celix_version_t *version = celix_version_createVersionFromString(msgSer->version);
-    bool validVersion = psa_tcp_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion);
-    celix_version_destroy(version);
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
     if (validVersion) {
         struct iovec deSerializeBuffer;
         deSerializeBuffer.iov_base = message->payload.payload;
         deSerializeBuffer.iov_len = message->payload.length;
-        celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
         // When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver
         if (message->payload.payload == deSerializedMsg) {
             *releaseMsg = true;
         }
-        const char *msgType = msgSer->fqn;
 
         if (status == CELIX_SUCCESS) {
             uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata);
             bool release = true;
             if (cont) {
                 hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
                 while (hashMapIterator_hasNext(&iter)) {
                     pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle,msgType, msgId, deSerializedMsg, message->metadata.metadata, &release);
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
+                    svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release);
+                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
                     if (!release && hashMapIterator_hasNext(&iter)) {
                         //receive function has taken ownership and still more receive function to come ..
                         //deserialize again for new message
-                        status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+                        status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
                         if (status != CELIX_SUCCESS) {
                             L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
-                                   msgType,
+                                   msgFqn,
                                    receiver->scope == NULL ? "(null)" : receiver->scope,
                                    receiver->topic);
                             break;
@@ -509,19 +498,25 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                     }
                 }
                 if (release) {
-                    msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deSerializedMsg);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
                 }
                 if (message->metadata.metadata) {
                     celix_properties_destroy(message->metadata.metadata);
                 }
             }
         } else {
-            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgType,
+            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
                    receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
+    } else {
+        L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+               msgFqn,
+               pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+               (int)message->header.msgMajorVersion,
+               (int)message->header.msgMinorVersion,
+               pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
+               pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
     }
-
-    pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
 }
 
 static void
@@ -664,24 +659,3 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
-
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
-    bool check = false;
-
-    if (major == 0 && minor == 0) {
-        //no check
-        return true;
-    }
-
-    int versionMajor;
-    int versionMinor;
-    if (msgVersion!=NULL) {
-        version_getMajor(msgVersion, &versionMajor);
-        version_getMinor(msgVersion, &versionMinor);
-        if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
-            check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
-        }
-    }
-
-    return check;
-}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
index a7de405..d06fe4a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
@@ -20,10 +20,11 @@
 #ifndef CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
 #define CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
 
-#include <pubsub_admin_metrics.h>
+#include "pubsub_admin_metrics.h"
 #include "celix_bundle_context.h"
-#include <pubsub_protocol.h>
+#include "pubsub_protocol.h"
 #include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
 
 typedef struct pubsub_tcp_topic_receiver pubsub_tcp_topic_receiver_t;
 
@@ -31,7 +32,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
                                                             celix_log_helper_t *logHelper,
                                                             const char *scope,
                                                             const char *topic,
-                                                            const char *serType,
+                                                            pubsub_serializer_handler_t* serializerHandler,
                                                             void *admin,
                                                             const celix_properties_t *topicProperties,
                                                             pubsub_tcp_endPointStore_t *handlerStore,
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 847d538..11d73d9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -57,12 +57,12 @@ struct pubsub_tcp_topic_sender {
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
     pubsub_interceptors_handler_t *interceptorsHandler;
+    pubsub_serializer_handler_t* serializerHandler;
 
     void *admin;
     char *scope;
     char *topic;
     char *url;
-    char *serializerType;
     bool isStatic;
     bool isPassive;
     bool verbose;
@@ -85,7 +85,6 @@ typedef struct psa_tcp_send_msg_entry {
     uint8_t major;
     uint8_t minor;
     unsigned char originUUID[16];
-//    pubsub_msg_serializer_t *msgSer;
     pubsub_protocol_service_t *protSer;
     struct iovec *serializedIoVecOutput;
     size_t serializedIoVecOutputLen;
@@ -118,7 +117,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     celix_log_helper_t *logHelper,
     const char *scope,
     const char *topic,
-    const char *serializerType,
+    pubsub_serializer_handler_t* serializerHandler,
     void *admin,
     const celix_properties_t *topicProperties,
     pubsub_tcp_endPointStore_t *handlerStore,
@@ -127,7 +126,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->logHelper = logHelper;
-    sender->serializerType = celix_utils_strdup(serializerType);
+    sender->serializerHandler = serializerHandler;
     sender->admin = admin;
     sender->protocolSvcId = protocolSvcId;
     sender->protocol = protocol;
@@ -189,7 +188,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
         pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
         pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
-        // Hhen passiveKey is specified, enable receive event for full-duplex connection using key.
+        // When passiveKey is specified, enable receive event for full-duplex connection using key.
         // Because the topic receiver is already started, enable the receive event.
         pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
         pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
@@ -301,7 +300,6 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
         }
         free(sender->topic);
         free(sender->url);
-        free(sender->serializerType);
         free(sender);
     }
 }
@@ -319,7 +317,7 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender) {
-    return sender->serializerType;
+    return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
@@ -337,15 +335,6 @@ bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
     return sender->isPassive;
 }
 
-static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
-    psa_tcp_bounded_service_entry_t *entry = (psa_tcp_bounded_service_entry_t *) handle;
-    int64_t rc = pubsub_tcpAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType);
-    if(rc >= 0) {
-        *msgTypeId = (unsigned int)rc;
-    }
-    return 0;
-}
-
 static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
                                          const celix_properties_t *svcProperties __attribute__((unused))) {
     pubsub_tcp_topic_sender_t *sender = handle;
@@ -406,39 +395,35 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
     psa_tcp_bounded_service_entry_t *bound = handle;
     pubsub_tcp_topic_sender_t *sender = bound->parent;
 
-    psa_tcp_serializer_entry_t *serializer = pubsub_tcpAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId);
-
-    if(serializer == NULL) {
-        pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
-        L_WARN("[PSA_TCP_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
-        return CELIX_SERVICE_EXCEPTION;
-    }
 
     psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
 
-    if(entry == NULL) {
-        entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
-        entry->protSer = sender->protocol;
-        entry->type = msgTypeId;
-        entry->fqn = serializer->fqn;
-        celix_version_t* version = celix_version_createVersionFromString(serializer->version);
-        entry->major = (uint8_t)celix_version_getMajor(version);
-        entry->minor = (uint8_t)celix_version_getMinor(version);
-        celix_version_destroy(version);
-        uuid_copy(entry->originUUID, sender->fwUUID);
-        hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+    if (entry == NULL) {
+        const char* fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
+        if (fqn != NULL) {
+            entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
+            entry->protSer = sender->protocol;
+            entry->type = msgTypeId;
+            entry->fqn = fqn;
+            entry->major = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId);
+            entry->minor = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId);
+            uuid_copy(entry->originUUID, sender->fwUUID);
+            hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+        } else {
+            L_WARN("Cannot find message serialization for msg id %i", (int)msgTypeId);
+        }
     }
 
     delay_first_send_for_late_joiners(sender);
 
     size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
     struct iovec *serializedIoVecOutput = NULL;
-    status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
+    status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
     entry->serializedIoVecOutputLen = MAX(serializedIoVecOutputLen, entry->serializedIoVecOutputLen);
 
     bool cont = false;
     if (status == CELIX_SUCCESS) /*ser ok*/ {
-        cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata);
+        cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata);
     }
     if (cont) {
         pubsub_protocol_message_t message;
@@ -468,12 +453,12 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
                 status = -1;
                 sendOk = false;
             }
-            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata);
+            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata);
             if (message.metadata.metadata) {
                 celix_properties_destroy(message.metadata.metadata);
             }
             if (serializedIoVecOutput) {
-                serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedIoVecOutput, serializedIoVecOutputLen);
+                pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
                 serializedIoVecOutput = NULL;
             }
         }
@@ -482,12 +467,10 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
         }
     } else {
-        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn,
+        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn,
                sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
     }
 
-    pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
-
     return status;
 }
 
@@ -502,4 +485,11 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender)
         usleep(sender->send_delay * 1000);
         firstSend = false;
     }
+}
+
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
+    psa_tcp_bounded_service_entry_t* entry = handle;
+    uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
+    *msgTypeId = (unsigned int)msgId;
+    return 0;
 }
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
index dfb5014..29c8f7a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
@@ -24,6 +24,7 @@
 #include "pubsub_admin_metrics.h"
 #include "pubsub_protocol.h"
 #include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
 
 typedef struct pubsub_tcp_topic_sender pubsub_tcp_topic_sender_t;
 
@@ -32,7 +33,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     celix_log_helper_t *logHelper,
     const char *scope,
     const char *topic,
-    const char *serializerType,
+    pubsub_serializer_handler_t* serializerHandler,
     void *admin,
     const celix_properties_t *topicProperties,
     pubsub_tcp_endPointStore_t *handlerStore,
@@ -46,7 +47,6 @@ const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
 const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender);
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
 bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
-long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
 long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
 
 /**
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index e30403e..cc6de56 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -66,11 +66,6 @@ struct pubsub_zmq_admin {
     bool verbose;
 
     struct {
-        celix_thread_rwlock_t mutex;
-        hash_map_t *map; //key = svcId, value = psa_zmq_serializer_entry_t*
-    } serializers;
-
-    struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = svcId, value = psa_zmq_protocol_entry_t*
     } protocols;
@@ -189,9 +184,6 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_lo
     psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE);
     psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadRwlock_create(&psa->serializers.mutex, NULL);
-    psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
     celixThreadMutex_create(&psa->protocols.mutex, NULL);
     psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
 
@@ -241,15 +233,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
 
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    iter = hashMapIterator_construct(psa->serializers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        hash_map_t *entry = hashMapIterator_nextValue(&iter);
-        hashMap_destroy(entry, false, true);
-    }
-    hashMap_clear(psa->serializers.map, false, false);
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-
     celixThreadMutex_lock(&psa->protocols.mutex);
     iter = hashMapIterator_construct(psa->protocols.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -275,9 +258,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
     hashMap_destroy(psa->discoveredEndpoints.map, false, false);
 
-    celixThreadRwlock_destroy(&psa->serializers.mutex);
-    hashMap_destroy(psa->serializers.map, false, false);
-
     celixThreadMutex_destroy(&psa->protocols.mutex);
     hashMap_destroy(psa->protocols.map, false, false);
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 1051d77..a00ce34 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -95,7 +95,6 @@ struct pubsub_zmq_topic_receiver {
     struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
-        hash_map_t *msgFqns; //key = msg id, value = char* msgFqn
         bool allInitialized;
     } subscribers;
 };
@@ -207,7 +206,6 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
 
         receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
-        receiver->subscribers.msgFqns = hashMap_create(NULL, NULL, NULL, NULL);
         receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
     }
 
@@ -288,7 +286,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
             }
         }
         hashMap_destroy(receiver->subscribers.map, false, false);
-        hashMap_destroy(receiver->subscribers.msgFqns, false, true);
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -454,11 +451,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
     int updateReceiveCount = 0;
     int updateSerError = 0;
 
-    char* msgFqn = hashMap_get(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId);
-    if (msgFqn == NULL) {
-        msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
-        hashMap_put(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId, msgFqn);
-    }
+    const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
 
     void *deserializedMsg = NULL;
     bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
@@ -505,8 +498,9 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
             L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
     } else {
-        L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s', version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+        L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
                msgFqn,
+               pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
                (int)message->header.msgMajorVersion,
                (int)message->header.msgMinorVersion,
                pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
index a2f953b..3900f55 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
@@ -32,7 +32,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         const char *scope,
         const char *topic,
         const celix_properties_t *topicProperties,
-        pubsub_serializer_handler_t* serHandler,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin,
         long protocolSvcId,
         pubsub_protocol_service_t *protocol);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 9996c53..8ae4489 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -52,7 +52,7 @@
 struct pubsub_zmq_topic_sender {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    pubsub_serializer_handler_t* serializationHandler;
+    pubsub_serializer_handler_t* serializerHandler;
     void *admin;
     long protocolSvcId;
     pubsub_protocol_service_t *protocol;
@@ -137,7 +137,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        pubsub_serializer_handler_t* serializationHandler,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin,
         long protocolSvcId,
         pubsub_protocol_service_t *prot,
@@ -148,7 +148,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->logHelper = logHelper;
-    sender->serializationHandler = serializationHandler;
+    sender->serializerHandler = serializerHandler;
     sender->admin = admin;
     sender->protocolSvcId = protocolSvcId;
     sender->protocol = prot;
@@ -359,7 +359,7 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
 }
 
 const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
-    return pubsub_serializerHandler_getSerializationType(sender->serializationHandler);
+    return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
 }
 
 long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) {
@@ -384,7 +384,7 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender) {
 
 static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
     psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t *) handle;
-    *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializationHandler, msgType);
+    *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
     return 0;
 }
 
@@ -513,9 +513,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
         entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t));
         entry->protSer = sender->protocol;
         entry->type = msgTypeId;
-        entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializationHandler, msgTypeId);
-        entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializationHandler, msgTypeId);
-        entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializationHandler, msgTypeId);
+        entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
+        entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId);
+        entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId);
         uuid_copy(entry->originUUID, sender->fwUUID);
         celixThreadMutex_create(&entry->metrics.mutex, NULL);
         hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
@@ -528,7 +528,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
     }
     size_t serializedOutputLen = 0;
     struct iovec *serializedOutput = NULL;
-    status = pubsub_serializerHandler_serialize(sender->serializationHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
+    status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
     if (monitor) {
         clock_gettime(CLOCK_REALTIME, &serializationEnd);
     }
@@ -588,7 +588,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 zmq_msg_t msg4; // Footer
                 void *socket = zsock_resolve(sender->zmq.socket);
                 psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
-                freeMsgEntry->serHandler = sender->serializationHandler;
+                freeMsgEntry->serHandler = sender->serializerHandler;
                 freeMsgEntry->msgId = msgTypeId;
                 freeMsgEntry->serializedOutput = serializedOutput;
                 freeMsgEntry->serializedOutputLen = serializedOutputLen;
@@ -665,7 +665,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 celix_properties_destroy(message.metadata.metadata);
             }
             if (!bound->parent->zeroCopyEnabled && serializedOutput) {
-                pubsub_serializerHandler_freeSerializedMsg(sender->serializationHandler, msgTypeId, serializedOutput, serializedOutputLen);
+                pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen);
             }
 
             if (sendOk) {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
index c2c0d57..584b88d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
@@ -32,7 +32,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        pubsub_serializer_handler_t* serializationHandler,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin,
         long protocolSvcId,
         pubsub_protocol_service_t *prot,
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
index 550c85f..77a2fa0 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
@@ -111,7 +111,6 @@ TEST_F(PubSubSerializationHandlerTestSuite, SerializationServiceFound) {
     EXPECT_EQ(42, pubsub_serializerHandler_getMsgId(handler, "example::Msg"));
     auto *fqn = pubsub_serializerHandler_getMsgFqn(handler, 42);
     EXPECT_STREQ("example::Msg",  fqn);
-    free(fqn);
     EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 0));
     EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
 
@@ -293,3 +292,25 @@ TEST_F(PubSubSerializationHandlerTestSuite, CreateHandlerFromMarker) {
 
     celix_logHelper_destroy(logHelper);
 }
+
+TEST_F(PubSubSerializationHandlerTestSuite, GetMsgInfo) {
+    auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
+    EXPECT_FALSE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+    EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+
+    long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+    EXPECT_TRUE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+    EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+    const char* msgFqn;
+    int major;
+    int minor;
+    EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, &msgFqn, &major, &minor));
+    EXPECT_STREQ("example::Msg1", msgFqn);
+    EXPECT_EQ(1, major);
+    EXPECT_EQ(0, minor);
+
+    celix_bundleContext_unregisterService(ctx.get(), svcId1);
+    pubsub_serializerHandler_destroy(handler);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
index 0519891..f2c58ac 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
@@ -138,10 +138,15 @@ celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_ha
 bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* handler, uint32_t msgId, int majorVersion, int minorVersion);
 
 /**
+ * @brief Whether the serializer handler has found 1 or more pubsub_message_serialization_service for the provided msg id.
+ */
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
  * @brief Get msg fqn from a msg id.
- * @return msg fqn or NULL if msg id is not known.
+ * @return msg fqn or NULL if msg id is not known. msg fqn is valid as long as the handler exists.
  */
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
 
 /**
  * @brief Get a msg id from a msgFqn.
@@ -176,6 +181,24 @@ int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* han
  */
 int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
 
+
+/**
+ * @brief Returns msg info (fqn, major version, minor version) in 1 call.
+ *
+ * @param handler               The serializer handler
+ * @param msgId                 The msg id where to get the info for
+ * @param msgFqnOut             If not NULL will be set to the msgFqn (valid as long as the serializer handler is valid)
+ * @param msgMajorVersionOut    If not NULL will be set to the msg major version
+ * @param msgMinorVersionOut    If not NULL will be set to the msg minor version
+ * @return                      CELIX_SUCCESS on success, CELIX_ILLEGAL_ARGUMENT if the message for the provided msg id cannot be found.
+ */
+celix_status_t pubsub_serializerHandler_getMsgInfo(
+        pubsub_serializer_handler_t* handler,
+        uint32_t msgId,
+        const char** msgFqnOut,
+        int* msgMajorVersionOut,
+        int* msgMinorVersionOut);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index 7bfa5e0..418b4b5 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -48,6 +48,7 @@ typedef struct pubsub_serialization_service_entry {
 
 struct pubsub_serializer_handler {
     celix_bundle_context_t* ctx;
+    char* filter;
     char* serType;
     bool backwardCompatible;
     long serializationSvcTrackerId;
@@ -55,6 +56,7 @@ struct pubsub_serializer_handler {
 
     celix_thread_rwlock_t lock;
     hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+    hash_map_t *msgFullyQualifiedNames; //key = msg id, value = msg fqn. Non destructive map with msg fqn
 };
 
 static void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
@@ -104,17 +106,6 @@ static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serializat
     return compatible;
 }
 
-static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
-    //NOTE assumes mutex is locked
-    const char *result = NULL;
-    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
-    if (entries != NULL) {
-        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
-        result = entry->msgFqn;
-    }
-    return result;
-}
-
 pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
     pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
     handler->ctx = ctx;
@@ -125,18 +116,18 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_contex
 
     celixThreadRwlock_create(&handler->lock, NULL);
     handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+    handler->msgFullyQualifiedNames = hashMap_create(NULL, NULL, NULL, NULL);
 
-    char *filter = NULL;
-    asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+    asprintf(&handler->filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
     celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
     opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
     opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
-    opts.filter.filter = filter;
+    opts.filter.filter = handler->filter;
     opts.callbackHandle = handler;
     opts.addWithProperties = addSerializationService;
     opts.removeWithProperties = removeSerializationService;
-    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    free(filter);
+    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptionsAsync(ctx, &opts);
+
 
     return handler;
 }
@@ -186,26 +177,31 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(cel
     return data.handler;
 }
 
+static void pubsub_serializerHandler_destroyCallback(void* data) {
+    pubsub_serializer_handler_t* handler = data;
+    celixThreadRwlock_destroy(&handler->lock);
+    hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+        for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+            pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+            free(entry->msgFqn);
+            celix_version_destroy(entry->msgVersion);
+            free(entry);
+        }
+        celix_arrayList_destroy(entries);
+    }
+    hashMap_destroy(handler->serializationServices, false, false);
+    hashMap_destroy(handler->msgFullyQualifiedNames, false, true);
+    celix_logHelper_destroy(handler->logHelper);
+    free(handler->serType);
+    free(handler->filter);
+    free(handler);
+}
 
 void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
     if (handler != NULL) {
-        celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
-        celixThreadRwlock_destroy(&handler->lock);
-        hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
-        while (hashMapIterator_hasNext(&iter)) {
-            celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
-            for (int i = 0; i < celix_arrayList_size(entries); ++i) {
-                pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
-                free(entry->msgFqn);
-                celix_version_destroy(entry->msgVersion);
-                free(entry);
-            }
-            celix_arrayList_destroy(entries);
-        }
-        hashMap_destroy(handler->serializationServices, false, false);
-        celix_logHelper_destroy(handler->logHelper);
-        free(handler->serType);
-        free(handler);
+        celix_bundleContext_stopTrackerAsync(handler->ctx, handler->serializationSvcTrackerId, handler, pubsub_serializerHandler_destroyCallback);
     }
 }
 
@@ -257,10 +253,16 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
         celix_arrayList_add(entries, entry);
         celix_arrayList_sort(entries, compareEntries);
 
-        hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+        hashMap_put(handler->serializationServices, (void*)(uintptr_t)msgId, entries);
     } else {
         celix_version_destroy(msgVersion);
     }
+
+    char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
+    if (fqn == NULL) {
+        hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, celix_utils_strdup(msgFqn));
+    }
+
     celixThreadRwlock_unlock(&handler->lock);
 }
 
@@ -375,9 +377,16 @@ bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* ha
     return compatible;
 }
 
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    celixThreadRwlock_readLock(&handler->lock);
+    void* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    celixThreadRwlock_unlock(&handler->lock);
+    return entries != NULL;
+}
+
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
     celixThreadRwlock_readLock(&handler->lock);
-    char *msgFqn = celix_utils_strdup(getMsgFqn(handler, msgId));
+    char *msgFqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
     celixThreadRwlock_unlock(&handler->lock);
     return msgFqn;
 }
@@ -434,4 +443,31 @@ size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializ
 
 const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler) {
     return handler->serType;
+}
+
+int pubsub_serializerHandler_getMsgInfo(
+        pubsub_serializer_handler_t* handler,
+        uint32_t msgId,
+        const char** msgFqnOut,
+        int* msgMajorVersionOut,
+        int* msgMinorVersionOut) {
+    int result = CELIX_SUCCESS;
+    celixThreadRwlock_readLock(&handler->lock);
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        if (msgFqnOut != NULL) {
+            *msgFqnOut = entry->msgFqn;
+        }
+        if (msgMinorVersionOut != NULL) {
+            *msgMajorVersionOut = celix_version_getMajor(entry->msgVersion);
+        }
+        if (msgMinorVersionOut != NULL) {
+            *msgMinorVersionOut = celix_version_getMinor(entry->msgVersion);
+        }
+    } else {
+        result = CELIX_ILLEGAL_ARGUMENT;
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return result;
 }
\ No newline at end of file