You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/05/26 18:04:13 UTC

[celix] branch feature/use_ser_hander_in_psa updated (e840b83 -> fea2116)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a change to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git.


    from e840b83  Updates pubsub zmq v2 to use pubsub_serializer_handler_t
     new 14f3bc1  Adds use of pubsub serializer handler to pubsub v2 tcp and v2 zmq
     new fea2116  Adds serializer handler to pubsub websocket.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pubsub/pubsub_admin_tcp/v2/src/psa_activator.c |  15 --
 .../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c     | 247 +++++----------------
 .../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h     |   4 -
 .../v2/src/pubsub_tcp_topic_receiver.c             |  70 ++----
 .../v2/src/pubsub_tcp_topic_receiver.h             |   7 +-
 .../v2/src/pubsub_tcp_topic_sender.c               | 118 +++-------
 .../v2/src/pubsub_tcp_topic_sender.h               |   4 +-
 .../pubsub_admin_websocket/v2/src/psa_activator.c  |  11 -
 .../v2/src/pubsub_websocket_admin.c                | 227 +++++--------------
 .../v2/src/pubsub_websocket_topic_receiver.c       |  52 ++---
 .../v2/src/pubsub_websocket_topic_receiver.h       |   5 +-
 .../v2/src/pubsub_websocket_topic_sender.c         | 102 +++------
 .../v2/src/pubsub_websocket_topic_sender.h         |   3 +-
 .../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c     |  20 --
 .../v2/src/pubsub_zmq_topic_receiver.c             |  12 +-
 .../v2/src/pubsub_zmq_topic_receiver.h             |   2 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |  23 +-
 .../v2/src/pubsub_zmq_topic_sender.h               |   2 +-
 .../src/PubSubSerializationHandlerTestSuite.cc     |  23 +-
 .../include/pubsub_serializer_handler.h            |  27 ++-
 .../pubsub_utils/src/pubsub_serializer_handler.c   | 113 ++++++----
 21 files changed, 365 insertions(+), 722 deletions(-)

[celix] 02/02: Adds serializer handler to pubsub websocket.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git

commit fea21167fcfc4c351f9d35a1bf72898bfbee89d5
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed May 26 20:03:57 2021 +0200

    Adds serializer handler to pubsub websocket.
---
 .../v2/src/pubsub_tcp_topic_receiver.c             |   4 +
 .../v2/src/pubsub_tcp_topic_sender.c               |  80 ++------
 .../pubsub_admin_websocket/v2/src/psa_activator.c  |  11 -
 .../v2/src/pubsub_websocket_admin.c                | 227 +++++----------------
 .../v2/src/pubsub_websocket_topic_receiver.c       |  52 +++--
 .../v2/src/pubsub_websocket_topic_receiver.h       |   5 +-
 .../v2/src/pubsub_websocket_topic_sender.c         | 102 +++------
 .../v2/src/pubsub_websocket_topic_sender.h         |   3 +-
 .../v2/src/pubsub_zmq_topic_receiver.c             |   4 +
 .../v2/src/pubsub_zmq_topic_sender.c               |   1 +
 .../pubsub_utils/src/pubsub_serializer_handler.c   |  17 +-
 11 files changed, 144 insertions(+), 362 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index b02768a..f602be9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -459,6 +459,10 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
     //NOTE receiver->subscribers.mutex locked
 
     const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+    if (msgFqn == NULL) {
+        L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
+        return;
+    }
 
     void *deSerializedMsg = NULL;
     bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 11d73d9..47ef05a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -67,6 +67,7 @@ struct pubsub_tcp_topic_sender {
     bool isPassive;
     bool verbose;
     unsigned long send_delay;
+    int seqNr; //atomic
 
     struct {
         long svcId;
@@ -79,23 +80,10 @@ struct pubsub_tcp_topic_sender {
     } boundedServices;
 };
 
-typedef struct psa_tcp_send_msg_entry {
-    uint32_t type; //msg type id (hash of fqn)
-    const char *fqn;
-    uint8_t major;
-    uint8_t minor;
-    unsigned char originUUID[16];
-    pubsub_protocol_service_t *protSer;
-    struct iovec *serializedIoVecOutput;
-    size_t serializedIoVecOutputLen;
-    unsigned int seqNr;
-} psa_tcp_send_msg_entry_t;
-
 typedef struct psa_tcp_bounded_service_entry {
     pubsub_tcp_topic_sender_t *parent;
     pubsub_publisher_t service;
     long bndId;
-    hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t
     int getCount;
 } psa_tcp_bounded_service_entry_t;
 
@@ -272,18 +260,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
         hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (entry != NULL) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
-                    if (msgEntry->serializedIoVecOutput)
-                        free(msgEntry->serializedIoVecOutput);
-                    msgEntry->serializedIoVecOutput = NULL;
-                    free(msgEntry);
-                }
-                hashMap_destroy(entry->msgEntries, false, false);
-                free(entry);
-            }
+            free(entry);
         }
         hashMap_destroy(sender->boundedServices.map, false, false);
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -349,7 +326,6 @@ static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *req
         entry->getCount = 1;
         entry->parent = sender;
         entry->bndId = bndId;
-        entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
         entry->service.handle = entry;
         entry->service.localMsgTypeIdForMsgType = psa_tcp_localMsgTypeIdForMsgType;
         entry->service.send = psa_tcp_topicPublicationSend;
@@ -373,16 +349,6 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
     if (entry != NULL && entry->getCount == 0) {
         //free entry
         hashMap_remove(sender->boundedServices.map, (void *) bndId);
-
-        hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
-            if (msgEntry->serializedIoVecOutput)
-                free(msgEntry->serializedIoVecOutput);
-            msgEntry->serializedIoVecOutput = NULL;
-            free(msgEntry);
-        }
-        hashMap_destroy(entry->msgEntries, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -391,27 +357,17 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
 
 static int
 psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
-    int status = CELIX_SUCCESS;
     psa_tcp_bounded_service_entry_t *bound = handle;
     pubsub_tcp_topic_sender_t *sender = bound->parent;
-
-
-    psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
-
-    if (entry == NULL) {
-        const char* fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
-        if (fqn != NULL) {
-            entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
-            entry->protSer = sender->protocol;
-            entry->type = msgTypeId;
-            entry->fqn = fqn;
-            entry->major = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId);
-            entry->minor = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId);
-            uuid_copy(entry->originUUID, sender->fwUUID);
-            hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
-        } else {
-            L_WARN("Cannot find message serialization for msg id %i", (int)msgTypeId);
-        }
+    const char* msgFqn;
+    int majorVersion;
+    int minorversion;
+    celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorversion);
+
+    if (status != CELIX_SUCCESS) {
+        L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+               pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+        return status;
     }
 
     delay_first_send_for_late_joiners(sender);
@@ -419,11 +375,10 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
     size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
     struct iovec *serializedIoVecOutput = NULL;
     status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
-    entry->serializedIoVecOutputLen = MAX(serializedIoVecOutputLen, entry->serializedIoVecOutputLen);
 
     bool cont = false;
     if (status == CELIX_SUCCESS) /*ser ok*/ {
-        cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata);
+        cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
     }
     if (cont) {
         pubsub_protocol_message_t message;
@@ -435,9 +390,9 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             message.payload.length = serializedIoVecOutput->iov_len;
         }
         message.header.msgId = msgTypeId;
-        message.header.seqNr = entry->seqNr;
-        message.header.msgMajorVersion = entry->major;
-        message.header.msgMinorVersion = entry->minor;
+        message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
+        message.header.msgMajorVersion = (uint16_t)majorVersion;
+        message.header.msgMinorVersion = (uint16_t)minorversion;
         message.header.payloadSize = 0;
         message.header.payloadPartSize = 0;
         message.header.payloadOffset = 0;
@@ -445,7 +400,6 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
         if (metadata != NULL) {
             message.metadata.metadata = metadata;
         }
-        entry->seqNr++;
         bool sendOk = true;
         {
             int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
@@ -453,7 +407,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
                 status = -1;
                 sendOk = false;
             }
-            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata);
+            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
             if (message.metadata.metadata) {
                 celix_properties_destroy(message.metadata.metadata);
             }
@@ -467,7 +421,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
         }
     } else {
-        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn,
+        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn,
                sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
     }
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
index 159d8ed..33cc86f 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
@@ -53,17 +53,6 @@ int psa_websocket_start(psa_websocket_activator_t *act, celix_bundle_context_t *
     act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper);
     celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
 
-    //track serializers (only json)
-    if (status == CELIX_SUCCESS) {
-        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-        opts.filter.ignoreServiceLanguage = true;
-        opts.callbackHandle = act->admin;
-        opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc;
-        opts.removeWithProperties = pubsub_websocketAdmin_removeSerializerSvc;
-        act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    }
-
     //register pubsub admin service
     if (status == CELIX_SUCCESS) {
         pubsub_admin_service_t *psaSvc = &act->adminService;
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
index 8950fda..bf74d38 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
@@ -18,18 +18,18 @@
  */
 
 #include <memory.h>
-#include <pubsub_endpoint.h>
-#include <pubsub_serializer.h>
 #include <ip_utils.h>
-#include <pubsub_message_serialization_service.h>
-#include <pubsub_matching.h>
 
+#include "pubsub_endpoint.h"
+#include "pubsub_serializer.h"
+#include "pubsub_matching.h"
 #include "pubsub_utils.h"
 #include "pubsub_websocket_admin.h"
 #include "pubsub_psa_websocket_constants.h"
 #include "pubsub_websocket_topic_sender.h"
 #include "pubsub_websocket_topic_receiver.h"
 #include "pubsub_websocket_common.h"
+#include "pubsub_serializer_handler.h"
 
 #define L_DEBUG(...) \
     celix_logHelper_log(psa->log, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
@@ -52,11 +52,6 @@ struct pubsub_websocket_admin {
     bool verbose;
 
     struct {
-        celix_thread_rwlock_t mutex;
-        hash_map_t *map; //key = svcId, value = psa_websocket_serializer_entry_t*
-    } serializers;
-
-    struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t*
     } topicSenders;
@@ -71,6 +66,10 @@ struct pubsub_websocket_admin {
         hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
     } discoveredEndpoints;
 
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+    } serializationHandlers;
 };
 
 static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint);
@@ -93,9 +92,6 @@ pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *c
     psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE);
     psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadRwlock_create(&psa->serializers.mutex, NULL);
-    psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
     celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
     psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
@@ -105,6 +101,9 @@ pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *c
     celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
     psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
+    celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+    psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     return psa;
 }
 
@@ -138,13 +137,13 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
 
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    iter = hashMapIterator_construct(psa->serializers.map);
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    iter = hashMapIterator_construct(psa->serializationHandlers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        psa_websocket_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
-        free(entry);
+        pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+        pubsub_serializerHandler_destroy(entry);
     }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
 
     celixThreadMutex_destroy(&psa->topicSenders.mutex);
     hashMap_destroy(psa->topicSenders.map, true, false);
@@ -155,112 +154,12 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
     hashMap_destroy(psa->discoveredEndpoints.map, false, false);
 
-    celixThreadRwlock_destroy(&psa->serializers.mutex);
-    hashMap_destroy(psa->serializers.map, false, false);
+    celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+    hashMap_destroy(psa->serializationHandlers.map, false, false);
 
     free(psa);
 }
 
-void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_websocket_admin_t *psa = handle;
-
-    const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-    const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
-    const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
-    if (serType == NULL || msgId == -1L || msgFqn == NULL) {
-        L_INFO("[PSA_WEBSOCKET_V2] Ignoring serializer service without one of the following properties: %s or %s or %s",
-               PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
-        L_INFO("[PSA_WEBSOCKET_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-        return;
-    }
-    L_INFO("[PSA_WEBSOCKET_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries == NULL) {
-        typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
-        hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
-        L_INFO("[PSA_WEBSOCKET_V2] typeEntries added %p %s", psa->serializers.map, serType);
-    }
-    psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
-    if (entry == NULL) {
-        entry = calloc(1, sizeof(psa_websocket_serializer_entry_t));
-        entry->svc = svc;
-        entry->fqn = celix_utils_strdup(msgFqn);
-        entry->version = celix_utils_strdup(msgVersion);
-        hashMap_put(typeEntries, (void*)msgId, entry);
-        L_INFO("[PSA_WEBSOCKET_V2] entry added");
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_websocket_admin_t *psa = handle;
-    const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
-    if(serType == NULL || msgId == -1) {
-        L_ERROR("[PSA_WEBSOCKET_V2] Error removing serializer svc %s %i", serType, msgId);
-        return;
-    }
-
-    //remove serializer
-    // 1) First find entry and
-    // 2) loop and destroy all topic sender using the serializer and
-    // 3) loop and destroy all topic receivers using the serializer
-    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries != NULL) {
-        psa_websocket_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
-        free((void*)entry->fqn);
-        free((void*)entry->version);
-        free(entry);
-
-        // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
-        if(hashMap_size(typeEntries) == 0) {
-            hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-            celixThreadMutex_lock(&psa->topicSenders.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-                pubsub_websocket_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
-                if (sender != NULL && strncmp(serType, pubsub_websocketTopicSender_serializerType(sender), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(senderEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_websocketTopicSender_destroy(sender);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
-            celixThreadMutex_lock(&psa->topicReceivers.mutex);
-            iter = hashMapIterator_construct(psa->topicReceivers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-                pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
-                if (receiver != NULL && strncmp(serType, pubsub_websocketTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(senderEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_websocketTopicReceiver_destroy(receiver);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-        } else {
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-        }
-    } else {
-        celixThreadRwlock_unlock(&psa->serializers.mutex);
-    }
-}
-
 celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
     pubsub_websocket_admin_t *psa = handle;
     L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchPublisher");
@@ -297,14 +196,36 @@ celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const
     return status;
 }
 
+static pubsub_serializer_handler_t* pubsub_websocketAdmin_getSerializationHandler(pubsub_websocket_admin_t* psa, long msgSerializationMarkerSvcId) {
+    pubsub_serializer_handler_t* handler = NULL;
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+    if (handler == NULL) {
+        handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+        if (handler != NULL) {
+            hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+        }
+    }
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+    return handler;
+}
+
+
 celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_websocket_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
-    //1) Create TopicSender
-    //2) Store TopicSender
-    //3) Connect existing endpoints
-    //4) set outPublisherEndpoint
+    //1) Get serialization handler
+    //2) Create TopicSender
+    //3) Store TopicSender
+    //4) Connect existing endpoints
+    //5) set outPublisherEndpoint
+
+    pubsub_serializer_handler_t* handler = pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic sender without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
 
     celix_properties_t *newEndpoint = NULL;
 
@@ -324,7 +245,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     pubsub_websocket_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
     if (sender == NULL) {
-        sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, serType, psa);
+        sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, handler, psa);
         if (sender != NULL) {
             const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
@@ -382,9 +303,14 @@ celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const cha
 
 celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_websocket_admin_t *psa = handle;
-
     celix_properties_t *newEndpoint = NULL;
 
+    pubsub_serializer_handler_t* handler = pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic receiver without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
+
     //get serializer type
     const char *serType = NULL;
     celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
@@ -400,7 +326,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
     if (receiver == NULL) {
-        receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serType, psa);
+        receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, handler, psa);
         if (receiver != NULL) {
             const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
@@ -578,57 +504,11 @@ celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, cons
     return status;
 }
 
-psa_websocket_serializer_entry_t* pubsub_websocketAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
-    pubsub_websocket_admin_t *psa = handle;
-    psa_websocket_serializer_entry_t *serializer = NULL;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
-    if(typeEntries != NULL) {
-        serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
-    }
-
-    return serializer;
-}
-
-void pubsub_websocketAdmin_releaseSerializer(void *handle, psa_websocket_serializer_entry_t* serializer) {
-    pubsub_websocket_admin_t *psa = handle;
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_websocketAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
-    pubsub_websocket_admin_t *psa = handle;
-    int64_t id = -1L;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
-    if(typeEntries != NULL) {
-        hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
-        while(hashMapIterator_hasNext(&iterator)) {
-            void *key = hashMapIterator_nextKey(&iterator);
-            psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, key);
-            L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn);
-            if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
-                id = (uint32_t)(uintptr_t)key;
-                break;
-            }
-        }
-    } else {
-        L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn);
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-    L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
-
-    return id;
-}
-
 bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
     pubsub_websocket_admin_t *psa = handle;
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -642,11 +522,9 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine
         fprintf(out, "   |- url             = %s\n", url);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
 
     fprintf(out, "\n");
     fprintf(out, "\nTopic Receivers:\n");
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -677,7 +555,6 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine
         celix_arrayList_destroy(unconnected);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
     fprintf(out, "\n");
 
     return true;
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index c93f078..ea997e9 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -68,10 +68,11 @@ struct pubsub_websocket_topic_receiver {
     void *admin;
     char *scope;
     char *topic;
-    char *serType;
     char scopeAndTopicFilter[5];
     char *uri;
 
+    pubsub_serializer_handler_t* serializerHandler;
+
     celix_websocket_service_t sockSvc;
     long svcId;
 
@@ -131,12 +132,12 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu
                                                               const char *scope,
                                                               const char *topic,
                                                               const celix_properties_t *topicProperties,
-                                                              const char *serType,
+                                                              pubsub_serializer_handler_t* serializerHandler,
                                                               void *admin) {
     pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
-    receiver->serType = celix_utils_strdup(serType);
+    receiver->serializerHandler = serializerHandler;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->admin = admin;
@@ -309,7 +310,6 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
         free(receiver->uri);
         free(receiver->scope);
         free(receiver->topic);
-        free(receiver->serType);
     }
     free(receiver);
 }
@@ -325,7 +325,7 @@ const char* pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t
 }
 
 const char *pubsub_websocketTopicReceiver_serializerType(pubsub_websocket_topic_receiver_t *receiver) {
-    return receiver->serType;
+    return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
 }
 
 void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) {
@@ -451,58 +451,52 @@ static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
 static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, const char* payload, size_t payloadSize) {
     //NOTE receiver->subscribers.mutex locked
 
-    int64_t msgTypeId = pubsub_websocketAdmin_getMessageIdForMessageFqn(receiver->admin, receiver->serType, hdr->id);
-
-    if(msgTypeId < 0) {
-        L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-        return;
-    }
-
-    psa_websocket_serializer_entry_t *serializer = pubsub_websocketAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, msgTypeId);
+    uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, hdr->id);
 
-    if(serializer == NULL) {
-        pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
-        L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+    if (msgId == 0) {
+        L_WARN("Cannot find msg id for msg fqn %s", hdr->id);
         return;
     }
 
     void *deSerializedMsg = NULL;
-
-    celix_version_t* version = celix_version_createVersionFromString(serializer->version);
-    bool validVersion = psa_websocket_checkVersion(version, hdr);
-    celix_version_destroy(version);
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, msgId, hdr->major, hdr->minor);
     if (validVersion) {
         struct iovec deSerializeBuffer;
         deSerializeBuffer.iov_base = (void *)payload;
         deSerializeBuffer.iov_len  = payloadSize;
-        celix_status_t status = serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, &deSerializedMsg);
-
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
         if (status == CELIX_SUCCESS) {
             hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
             bool release = true;
             while (hashMapIterator_hasNext(&iter)) {
                 pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                svc->receive(svc->handle, serializer->fqn, msgTypeId, deSerializedMsg, NULL, &release);
+                svc->receive(svc->handle, hdr->id, msgId, deSerializedMsg, NULL, &release);
                 if (!release && hashMapIterator_hasNext(&iter)) {
                     //receive function has taken ownership and still more receive function to come ..
                     //deserialize again for new message
-                    status = serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, &deSerializedMsg);
+                    status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
                     if (status != CELIX_SUCCESS) {
-                        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
                         break;
                     }
                     release = true;
                 }
             }
             if (release) {
-                serializer->svc->freeDeserializedMsg(serializer->svc->handle, deSerializedMsg);
+                pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deSerializedMsg);
             }
         } else {
-            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
+    } else {
+        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+               hdr->id,
+               pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+               (int)hdr->major,
+               (int)hdr->minor,
+               pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, msgId),
+               pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, msgId));
     }
-
-    pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
 }
 
 static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg, size_t msgSize) {
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
index 55d5255..f5edda5 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
@@ -20,8 +20,9 @@
 #ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
 #define CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
 
-#include <pubsub_admin_metrics.h>
+#include "pubsub_admin_metrics.h"
 #include "celix_bundle_context.h"
+#include "pubsub_serializer_handler.h"
 
 typedef struct pubsub_websocket_topic_receiver pubsub_websocket_topic_receiver_t;
 
@@ -30,7 +31,7 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu
         const char *scope,
         const char *topic,
         const celix_properties_t *topicProperties,
-        const char *serType,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin);
 void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver);
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index 98a1ad7..28a8af1 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -52,10 +52,13 @@ struct pubsub_websocket_topic_sender {
     void *admin;
     char *scope;
     char *topic;
-    char *serType;
     char scopeAndTopicFilter[5];
     char *uri;
 
+    pubsub_serializer_handler_t* serializerHandler;
+
+    int seqNr; //atomic
+
     celix_websocket_service_t websockSvc;
     long websockSvcId;
     struct mg_connection *sockConnection;
@@ -71,16 +74,10 @@ struct pubsub_websocket_topic_sender {
     } boundedServices;
 };
 
-typedef struct psa_websocket_send_msg_entry {
-    pubsub_websocket_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
-    uint32_t type; //msg type id (hash of fqn)
-} psa_websocket_send_msg_entry_t;
-
 typedef struct psa_websocket_bounded_service_entry {
     pubsub_websocket_topic_sender_t *parent;
     pubsub_publisher_t service;
     long bndId;
-    hash_map_t *msgEntries; //key = msg type id, value = psa_websocket_send_msg_entry_t
     int getCount;
 } psa_websocket_bounded_service_entry_t;
 
@@ -99,18 +96,12 @@ pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        const char *serType,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin) {
     pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->logHelper = logHelper;
-    sender->serType = celix_utils_strdup(serType);
-
-    if(sender->serType == NULL) {
-        L_ERROR("[PSA_WEBSOCKET_V2_TS] Error getting serType");
-        free(sender);
-        return NULL;
-    }
+    sender->serializerHandler = serializerHandler;
 
     psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
     sender->uri = psa_websocket_createURI(scope, topic);
@@ -174,17 +165,7 @@ void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
         hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (entry != NULL) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
-                    free(msgEntry);
-
-                }
-                hashMap_destroy(entry->msgEntries, false, false);
-
-                free(entry);
-            }
+            free(entry);
         }
         hashMap_destroy(sender->boundedServices.map, false, false);
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -198,7 +179,6 @@ void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
         }
         free(sender->topic);
         free(sender->uri);
-        free(sender->serType);
         free(sender);
     }
 }
@@ -216,16 +196,17 @@ const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sen
 }
 
 const char* pubsub_websocketTopicSender_serializerType(pubsub_websocket_topic_sender_t *sender) {
-    return sender->serType;
+    return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
 }
 
 static int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
     psa_websocket_bounded_service_entry_t *entry = (psa_websocket_bounded_service_entry_t *) handle;
-    int64_t rc = pubsub_websocketAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serType, msgType);
-    if(rc >= 0) {
-        *msgTypeId = (unsigned int)rc;
+    uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
+    if (msgId != 0) {
+        *msgTypeId = msgId;
+        return 0;
     }
-    return 0;
+    return -1;
 }
 
 static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
@@ -241,7 +222,6 @@ static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_
         entry->getCount = 1;
         entry->parent = sender;
         entry->bndId = bndId;
-        entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
         entry->service.handle = entry;
         entry->service.localMsgTypeIdForMsgType = psa_websocket_localMsgTypeIdForMsgType;
         entry->service.send = psa_websocket_topicPublicationSend;
@@ -264,60 +244,40 @@ static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle
     if (entry != NULL && entry->getCount == 0) {
         //free entry
         hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
-        hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
-            free(msgEntry);
-        }
-        hashMap_destroy(entry->msgEntries, false, false);
-
         free(entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 }
 
 static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
-    int status = CELIX_SERVICE_EXCEPTION;
     psa_websocket_bounded_service_entry_t *bound = handle;
     pubsub_websocket_topic_sender_t *sender = bound->parent;
 
-    psa_websocket_serializer_entry_t *serializer = pubsub_websocketAdmin_acquireSerializerForMessageId(sender->admin, sender->serType, msgTypeId);
+    const char* msgFqn;
+    int majorVersion;
+    int minorVersion;
+    celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion);
 
-    if(serializer == NULL) {
-        pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
-        L_WARN("[PSA_WEBSOCKET_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
-        return CELIX_SERVICE_EXCEPTION;
-    }
-    
-    psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
-
-    if(entry == NULL) {
-        entry = calloc(1, sizeof(psa_websocket_send_msg_entry_t));
-        entry->type = msgTypeId;
-        entry->header.id = serializer->fqn;
-        celix_version_t* version = celix_version_createVersionFromString(serializer->version);
-        entry->header.major = (uint8_t)celix_version_getMajor(version);
-        entry->header.minor = (uint8_t)celix_version_getMinor(version);
-        entry->header.seqNr = 0;
-        celix_version_destroy(version);
-        hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+
+    if (status != CELIX_SUCCESS) {
+        L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+               pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+        return status;
     }
 
     if (sender->sockConnection != NULL) {
         delay_first_send_for_late_joiners(sender);
         size_t serializedOutputLen = 0;
         struct iovec* serializedOutput = NULL;
-        status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedOutput, &serializedOutputLen);
-
+        status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
         if (status == CELIX_SUCCESS /*ser ok*/) {
             json_error_t jsError;
 
             json_t *jsMsg = json_object();
-            json_object_set_new_nocheck(jsMsg, "id", json_string(entry->header.id));
-            json_object_set_new_nocheck(jsMsg, "major", json_integer(entry->header.major));
-            json_object_set_new_nocheck(jsMsg, "minor", json_integer(entry->header.minor));
-            uint32_t seqNr = __atomic_fetch_add(&entry->header.seqNr, 1, __ATOMIC_RELAXED);
+            json_object_set_new_nocheck(jsMsg, "id", json_string(msgFqn));
+            json_object_set_new_nocheck(jsMsg, "major", json_integer(majorVersion));
+            json_object_set_new_nocheck(jsMsg, "minor", json_integer(minorVersion));
+            uint32_t seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
             json_object_set_new_nocheck(jsMsg, "seqNr", json_integer(seqNr));
 
             json_t *jsData;
@@ -338,17 +298,15 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType
             }
 
             json_decref(jsMsg); //Decrease ref count means freeing the object
-            serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedOutput, serializedOutputLen);
+            pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen);
         } else {
-            L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for scope/topic %s/%s",
-                   entry->header.id, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+            L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %u for scope/topic %s/%s",
+                   msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
         }
     } else { // when (sender->sockConnection == NULL) we dont have a client, but we do have a valid entry
     	status = CELIX_SUCCESS; // Not an error, just nothing to do
     }
 
-    pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
-
     return status;
 }
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
index 8f8cebf..6b42500 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
@@ -22,6 +22,7 @@
 
 #include "celix_bundle_context.h"
 #include "pubsub_admin_metrics.h"
+#include "pubsub_serializer_handler.h"
 
 typedef struct pubsub_websocket_topic_sender pubsub_websocket_topic_sender_t;
 
@@ -30,7 +31,7 @@ pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        const char *serType,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin);
 void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender);
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index a00ce34..9d2070f 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -452,6 +452,10 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
     int updateSerError = 0;
 
     const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+    if (msgFqn == NULL) {
+        L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
+        return;
+    }
 
     void *deserializedMsg = NULL;
     bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 8ae4489..afc8e54 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -498,6 +498,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
     pubsub_zmq_topic_sender_t *sender = bound->parent;
     bool monitor = sender->metricsEnabled;
 
+    //TODO remove use of entry, so that one less lock is needed and drop metrics stuff
     psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId));
 
     //metrics updates
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index 418b4b5..7d3d0f3 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -42,7 +42,7 @@ typedef struct pubsub_serialization_service_entry {
     const celix_properties_t *properties;
     uint32_t msgId;
     celix_version_t* msgVersion;
-    char* msgFqn;
+    const char* msgFqn;
     pubsub_message_serialization_service_t* svc;
 } pubsub_serialization_service_entry_t;
 
@@ -185,7 +185,6 @@ static void pubsub_serializerHandler_destroyCallback(void* data) {
         celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
         for (int i = 0; i < celix_arrayList_size(entries); ++i) {
             pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
-            free(entry->msgFqn);
             celix_version_destroy(entry->msgVersion);
             free(entry);
         }
@@ -239,6 +238,12 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
     }
 
     if (valid) {
+        char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
+        if (fqn == NULL) {
+            fqn = celix_utils_strdup(msgFqn);
+            hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, fqn);
+        }
+
         celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
         if (entries == NULL) {
             entries = celix_arrayList_create();
@@ -246,7 +251,7 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
         pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry));
         entry->svcId = svcId;
         entry->properties = svcProperties;
-        entry->msgFqn = celix_utils_strdup(msgFqn);
+        entry->msgFqn = fqn;
         entry->msgId = msgId;
         entry->msgVersion = msgVersion;
         entry->svc = svc;
@@ -258,11 +263,6 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
         celix_version_destroy(msgVersion);
     }
 
-    char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
-    if (fqn == NULL) {
-        hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, celix_utils_strdup(msgFqn));
-    }
-
     celixThreadRwlock_unlock(&handler->lock);
 }
 
@@ -288,7 +288,6 @@ void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handl
             }
         }
         if (found != NULL) {
-            free(found->msgFqn);
             celix_version_destroy(found->msgVersion);
             free(found);
         }

[celix] 01/02: Adds use of pubsub serializer handler to pubsub v2 tcp and v2 zmq

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 14f3bc1a81ad4d4a672f899f360d93ee0603bfd4
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed May 26 19:05:17 2021 +0200

    Adds use of pubsub serializer handler to pubsub v2 tcp and v2 zmq
---
 .../pubsub/pubsub_admin_tcp/v2/src/psa_activator.c |  15 --
 .../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c     | 247 +++++----------------
 .../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h     |   4 -
 .../v2/src/pubsub_tcp_topic_receiver.c             |  70 ++----
 .../v2/src/pubsub_tcp_topic_receiver.h             |   7 +-
 .../v2/src/pubsub_tcp_topic_sender.c               |  72 +++---
 .../v2/src/pubsub_tcp_topic_sender.h               |   4 +-
 .../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c     |  20 --
 .../v2/src/pubsub_zmq_topic_receiver.c             |  12 +-
 .../v2/src/pubsub_zmq_topic_receiver.h             |   2 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |  22 +-
 .../v2/src/pubsub_zmq_topic_sender.h               |   2 +-
 .../src/PubSubSerializationHandlerTestSuite.cc     |  23 +-
 .../include/pubsub_serializer_handler.h            |  27 ++-
 .../pubsub_utils/src/pubsub_serializer_handler.c   | 108 ++++++---
 15 files changed, 248 insertions(+), 387 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
index ec9badb..ec3f853 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
@@ -20,7 +20,6 @@
 #include <stdlib.h>
 
 #include "celix_api.h"
-#include "pubsub_serializer.h"
 #include "pubsub_protocol.h"
 #include "celix_log_helper.h"
 
@@ -34,7 +33,6 @@ typedef struct psa_tcp_activator {
 
     pubsub_tcp_admin_t *admin;
 
-    long serializersTrackerId;
     long protocolsTrackerId;
 
     pubsub_admin_service_t adminService;
@@ -50,7 +48,6 @@ typedef struct psa_tcp_activator {
 int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
     act->adminSvcId = -1L;
     act->cmdSvcId = -1L;
-    act->serializersTrackerId = -1L;
     act->protocolsTrackerId = -1L;
 
     act->logHelper = celix_logHelper_create(ctx, "celix_psa_admin_tcp_v2");
@@ -58,17 +55,6 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
     act->admin = pubsub_tcpAdmin_create(ctx, act->logHelper);
     celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
 
-    //track serializers
-    if (status == CELIX_SUCCESS) {
-        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-        opts.filter.ignoreServiceLanguage = true;
-        opts.callbackHandle = act->admin;
-        opts.addWithProperties = pubsub_tcpAdmin_addSerializerSvc;
-        opts.removeWithProperties = pubsub_tcpAdmin_removeSerializerSvc;
-        act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    }
-
     //track protocols
     if (status == CELIX_SUCCESS) {
         celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
@@ -132,7 +118,6 @@ int psa_tcp_stop(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
     celix_bundleContext_unregisterService(ctx, act->adminSvcId);
     celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
     celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
-    celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
     celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
     pubsub_tcpAdmin_destroy(act->admin);
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
index 66c92ef..f5f200f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
@@ -55,11 +55,6 @@ struct pubsub_tcp_admin {
     bool verbose;
 
     struct {
-        celix_thread_rwlock_t mutex;
-        hash_map_t *map; //key = svcId, value = psa_tcp_serializer_entry_t*
-    } serializers;
-
-    struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = svcId, value = psa_tcp_protocol_entry_t*
     } protocols;
@@ -79,6 +74,11 @@ struct pubsub_tcp_admin {
         hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
     } discoveredEndpoints;
 
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+    } serializationHandlers;
+
     pubsub_tcp_endPointStore_t endpointStore;
 };
 
@@ -101,11 +101,6 @@ static bool pubsub_tcpAdmin_endpointIsPublisher(const celix_properties_t *endpoi
     return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
 }
 
-static void pubsub_tcpAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
-    const char** out = handle;
-    *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
 pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) {
     pubsub_tcp_admin_t *psa = calloc(1, sizeof(*psa));
     psa->ctx = ctx;
@@ -120,11 +115,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo
     psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_CONTROL_SCORE_KEY,
                                                                    PSA_TCP_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadRwlock_create(&psa->serializers.mutex, NULL);
-    psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
     celixThreadMutex_create(&psa->protocols.mutex, NULL);
     psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
     psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
@@ -137,6 +130,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo
     celixThreadMutex_create(&psa->endpointStore.mutex, NULL);
     psa->endpointStore.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
+    celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+    psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     return psa;
 }
 
@@ -177,13 +173,13 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
 
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    iter = hashMapIterator_construct(psa->serializers.map);
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    iter = hashMapIterator_construct(psa->serializationHandlers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        psa_tcp_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
-        free(entry);
+        pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+        pubsub_serializerHandler_destroy(entry);
     }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
 
     celixThreadMutex_lock(&psa->protocols.mutex);
     iter = hashMapIterator_construct(psa->protocols.map);
@@ -205,8 +201,9 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
     hashMap_destroy(psa->discoveredEndpoints.map, false, false);
 
-    celixThreadRwlock_destroy(&psa->serializers.mutex);
-    hashMap_destroy(psa->serializers.map, false, false);
+    celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+    hashMap_destroy(psa->serializationHandlers.map, false, false);
+
     celixThreadMutex_destroy(&psa->protocols.mutex);
     hashMap_destroy(psa->protocols.map, false, false);
 
@@ -215,101 +212,6 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
     free(psa);
 }
 
-void pubsub_tcpAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_tcp_admin_t *psa = handle;
-
-    const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-    const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
-    const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
-    if (serType == NULL || msgId == -1L || msgFqn == NULL) {
-        L_INFO("[PSA_TCP_V2] Ignoring serializer service without one of the following properties: %s or %s or %s",
-               PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
-        L_INFO("[PSA_TCP_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-        return;
-    }
-    L_INFO("[PSA_TCP_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries == NULL) {
-        typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
-        hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
-        L_INFO("[PSA_TCP_V2] typeEntries added %p %s", psa->serializers.map, serType);
-    }
-    psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
-    if (entry == NULL) {
-        entry = calloc(1, sizeof(psa_tcp_serializer_entry_t));
-        entry->svc = svc;
-        entry->fqn = celix_utils_strdup(msgFqn);
-        entry->version = celix_utils_strdup(msgVersion);
-        hashMap_put(typeEntries, (void*)msgId, entry);
-        L_INFO("[PSA_TCP_V2] entry added");
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_tcp_admin_t *psa = handle;
-    const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
-    //remove serializer
-    // 1) First find entry and
-    // 2) loop and destroy all topic sender using the serializer and
-    // 3) loop and destroy all topic receivers using the serializer
-    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries != NULL) {
-        psa_tcp_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
-        free((void*)entry->fqn);
-        free((void*)entry->version);
-        free(entry);
-
-        // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
-        if(hashMap_size(typeEntries) == 0) {
-            hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-            celixThreadMutex_lock(&psa->topicSenders.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-                pubsub_tcp_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
-                if (sender != NULL && strncmp(serType, pubsub_tcpTopicSender_serializerType(sender), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(senderEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_tcpTopicSender_destroy(sender);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
-            celixThreadMutex_lock(&psa->topicReceivers.mutex);
-            iter = hashMapIterator_construct(psa->topicReceivers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter);
-                pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry);
-                if (receiver != NULL && strncmp(serType, pubsub_tcpTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(receiverEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_tcpTopicReceiver_destroy(receiver);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-        } else {
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-        }
-    } else {
-        celixThreadRwlock_unlock(&psa->serializers.mutex);
-    }
-}
-
 void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_tcp_admin_t *psa = handle;
 
@@ -421,30 +323,40 @@ pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *
     return status;
 }
 
+static pubsub_serializer_handler_t* pubsub_tcpAdmin_getSerializationHandler(pubsub_tcp_admin_t* psa, long msgSerializationMarkerSvcId) {
+    pubsub_serializer_handler_t* handler = NULL;
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+    if (handler == NULL) {
+        handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+        if (handler != NULL) {
+            hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+        }
+    }
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+    return handler;
+}
+
 celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
                                                 const celix_properties_t *topicProperties, long serializerSvcId,
                                                 long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_tcp_admin_t *psa = handle;
     celix_status_t status = CELIX_SUCCESS;
 
-    //1) Create TopicSender
-    //2) Store TopicSender
-    //3) Connect existing endpoints
-    //4) set outPublisherEndpoint
+    //1) Get serialization handler
+    //2) Create TopicSender
+    //3) Store TopicSender
+    //4) Connect existing endpoints
+    //5) set outPublisherEndpoint
+
+    pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic sender without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
 
     celix_properties_t *newEndpoint = NULL;
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-    //get serializer type
-    const char *serType = NULL;
-    celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
-    opts.callbackHandle = &serType;
-    opts.useWithProperties = pubsub_tcpAdmin_getSerType;
-    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-    char filter[32];
-    snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
-    opts.filter.filter = filter;
-    celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
     
     celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
@@ -453,14 +365,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
     if (sender == NULL) {
         psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
         if (protEntry != NULL) {
-            sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+            sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle, topicProperties,
                                                   &psa->endpointStore, protocolSvcId,
                                                   protEntry->svc);
         }
         if (sender != NULL) {
             const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
             const char *protType = protEntry->protType;
-            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+                                                pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
 
             celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
@@ -525,21 +438,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
                                                   long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_tcp_admin_t *psa = handle;
 
-    celix_properties_t *newEndpoint = NULL;
+    pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic receiver without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
 
+    celix_properties_t *newEndpoint = NULL;
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
-    //get serializer type
-    const char *serType = NULL;
-    celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
-    opts.callbackHandle = &serType;
-    opts.useWithProperties = pubsub_tcpAdmin_getSerType;
-    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-    char filter[32];
-    snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
-    opts.filter.filter = filter;
-    celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
     celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
@@ -547,7 +454,8 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
     if (receiver == NULL) {
         psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
         if (protEntry != NULL) {
-            receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+            receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic,
+                                                      handler, handle, topicProperties,
                                                       &psa->endpointStore, protocolSvcId, protEntry->svc);
         } else {
             L_ERROR("[PSA_TCP_V2] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
@@ -556,7 +464,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
             const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
             const char *protType = protEntry->protType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {
@@ -795,53 +703,6 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
     return status;
 }
 
-
-
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
-    pubsub_tcp_admin_t *psa = handle;
-    psa_tcp_serializer_entry_t *serializer = NULL;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
-    if(typeEntries != NULL) {
-        serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
-    }
-
-    return serializer;
-}
-
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer __attribute__((unused))) {
-    pubsub_tcp_admin_t *psa = handle;
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
-    pubsub_tcp_admin_t *psa = handle;
-    int64_t id = -1L;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
-    if(typeEntries != NULL) {
-        hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
-        while(hashMapIterator_hasNext(&iterator)) {
-            void *key = hashMapIterator_nextKey(&iterator);
-            psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, key);
-            L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn);
-            if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
-                id = (uint32_t)(uintptr_t)key;
-                break;
-            }
-        }
-    } else {
-        L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn);
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-    L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
-
-    return id;
-}
-
 pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle) {
     return NULL;
 }
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
index 2440fbb..513a934 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
@@ -82,10 +82,6 @@ void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_propert
 void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
 bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
 
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId);
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer);
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn);
-
 pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle);
 
 #endif //CELIX_PUBSUB_TCP_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 853e49d..b02768a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -56,7 +56,7 @@ struct pubsub_tcp_topic_receiver {
     pubsub_protocol_service_t *protocol;
     char *scope;
     char *topic;
-    char *serType;
+    pubsub_serializer_handler_t* serializerHandler;
     void *admin;
     size_t timeout;
     bool isPassive;
@@ -104,13 +104,12 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
 static void processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime);
 static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
 static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
 
 pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
                                                             celix_log_helper_t *logHelper,
                                                             const char *scope,
                                                             const char *topic,
-                                                            const char *serType,
+                                                            pubsub_serializer_handler_t* serializerHandler,
                                                             void *admin,
                                                             const celix_properties_t *topicProperties,
                                                             pubsub_tcp_endPointStore_t *handlerStore,
@@ -119,7 +118,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
-    receiver->serType = celix_utils_strdup(serType);
+    receiver->serializerHandler = serializerHandler;
     receiver->admin = admin;
     receiver->protocolSvcId = protocolSvcId;
     receiver->protocol = protocol;
@@ -269,7 +268,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             }
         }
         hashMap_destroy(receiver->subscribers.map, false, false);
-
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -299,7 +297,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             free(receiver->scope);
         }
         free(receiver->topic);
-        free(receiver->serType);
     }
     free(receiver);
 }
@@ -313,7 +310,7 @@ const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver)
 }
 
 const char *pubsub_tcpTopicReceiver_serializerType(pubsub_tcp_topic_receiver_t *receiver) {
-    return receiver->serType;
+    return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
 }
 
 long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver) {
@@ -460,47 +457,39 @@ static inline void
 processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
                              const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
     //NOTE receiver->subscribers.mutex locked
-    psa_tcp_serializer_entry_t *msgSer = pubsub_tcpAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, message->header.msgId);
 
-    if(msgSer == NULL) {
-        pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
-        L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X. Received payload size is %u.", message->header.msgId, message->payload.length);
-        return;
-    }
+    const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
 
     void *deSerializedMsg = NULL;
-    celix_version_t *version = celix_version_createVersionFromString(msgSer->version);
-    bool validVersion = psa_tcp_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion);
-    celix_version_destroy(version);
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
     if (validVersion) {
         struct iovec deSerializeBuffer;
         deSerializeBuffer.iov_base = message->payload.payload;
         deSerializeBuffer.iov_len = message->payload.length;
-        celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
         // When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver
         if (message->payload.payload == deSerializedMsg) {
             *releaseMsg = true;
         }
-        const char *msgType = msgSer->fqn;
 
         if (status == CELIX_SUCCESS) {
             uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata);
             bool release = true;
             if (cont) {
                 hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
                 while (hashMapIterator_hasNext(&iter)) {
                     pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle,msgType, msgId, deSerializedMsg, message->metadata.metadata, &release);
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
+                    svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release);
+                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
                     if (!release && hashMapIterator_hasNext(&iter)) {
                         //receive function has taken ownership and still more receive function to come ..
                         //deserialize again for new message
-                        status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+                        status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
                         if (status != CELIX_SUCCESS) {
                             L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
-                                   msgType,
+                                   msgFqn,
                                    receiver->scope == NULL ? "(null)" : receiver->scope,
                                    receiver->topic);
                             break;
@@ -509,19 +498,25 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                     }
                 }
                 if (release) {
-                    msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deSerializedMsg);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
                 }
                 if (message->metadata.metadata) {
                     celix_properties_destroy(message->metadata.metadata);
                 }
             }
         } else {
-            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgType,
+            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
                    receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
+    } else {
+        L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+               msgFqn,
+               pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+               (int)message->header.msgMajorVersion,
+               (int)message->header.msgMinorVersion,
+               pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
+               pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
     }
-
-    pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
 }
 
 static void
@@ -664,24 +659,3 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
-
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
-    bool check = false;
-
-    if (major == 0 && minor == 0) {
-        //no check
-        return true;
-    }
-
-    int versionMajor;
-    int versionMinor;
-    if (msgVersion!=NULL) {
-        version_getMajor(msgVersion, &versionMajor);
-        version_getMinor(msgVersion, &versionMinor);
-        if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
-            check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
-        }
-    }
-
-    return check;
-}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
index a7de405..d06fe4a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
@@ -20,10 +20,11 @@
 #ifndef CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
 #define CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
 
-#include <pubsub_admin_metrics.h>
+#include "pubsub_admin_metrics.h"
 #include "celix_bundle_context.h"
-#include <pubsub_protocol.h>
+#include "pubsub_protocol.h"
 #include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
 
 typedef struct pubsub_tcp_topic_receiver pubsub_tcp_topic_receiver_t;
 
@@ -31,7 +32,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
                                                             celix_log_helper_t *logHelper,
                                                             const char *scope,
                                                             const char *topic,
-                                                            const char *serType,
+                                                            pubsub_serializer_handler_t* serializerHandler,
                                                             void *admin,
                                                             const celix_properties_t *topicProperties,
                                                             pubsub_tcp_endPointStore_t *handlerStore,
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 847d538..11d73d9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -57,12 +57,12 @@ struct pubsub_tcp_topic_sender {
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
     pubsub_interceptors_handler_t *interceptorsHandler;
+    pubsub_serializer_handler_t* serializerHandler;
 
     void *admin;
     char *scope;
     char *topic;
     char *url;
-    char *serializerType;
     bool isStatic;
     bool isPassive;
     bool verbose;
@@ -85,7 +85,6 @@ typedef struct psa_tcp_send_msg_entry {
     uint8_t major;
     uint8_t minor;
     unsigned char originUUID[16];
-//    pubsub_msg_serializer_t *msgSer;
     pubsub_protocol_service_t *protSer;
     struct iovec *serializedIoVecOutput;
     size_t serializedIoVecOutputLen;
@@ -118,7 +117,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     celix_log_helper_t *logHelper,
     const char *scope,
     const char *topic,
-    const char *serializerType,
+    pubsub_serializer_handler_t* serializerHandler,
     void *admin,
     const celix_properties_t *topicProperties,
     pubsub_tcp_endPointStore_t *handlerStore,
@@ -127,7 +126,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->logHelper = logHelper;
-    sender->serializerType = celix_utils_strdup(serializerType);
+    sender->serializerHandler = serializerHandler;
     sender->admin = admin;
     sender->protocolSvcId = protocolSvcId;
     sender->protocol = protocol;
@@ -189,7 +188,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
         pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
         pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
-        // Hhen passiveKey is specified, enable receive event for full-duplex connection using key.
+        // When passiveKey is specified, enable receive event for full-duplex connection using key.
         // Because the topic receiver is already started, enable the receive event.
         pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
         pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
@@ -301,7 +300,6 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
         }
         free(sender->topic);
         free(sender->url);
-        free(sender->serializerType);
         free(sender);
     }
 }
@@ -319,7 +317,7 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender) {
-    return sender->serializerType;
+    return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
@@ -337,15 +335,6 @@ bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
     return sender->isPassive;
 }
 
-static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
-    psa_tcp_bounded_service_entry_t *entry = (psa_tcp_bounded_service_entry_t *) handle;
-    int64_t rc = pubsub_tcpAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType);
-    if(rc >= 0) {
-        *msgTypeId = (unsigned int)rc;
-    }
-    return 0;
-}
-
 static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
                                          const celix_properties_t *svcProperties __attribute__((unused))) {
     pubsub_tcp_topic_sender_t *sender = handle;
@@ -406,39 +395,35 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
     psa_tcp_bounded_service_entry_t *bound = handle;
     pubsub_tcp_topic_sender_t *sender = bound->parent;
 
-    psa_tcp_serializer_entry_t *serializer = pubsub_tcpAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId);
-
-    if(serializer == NULL) {
-        pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
-        L_WARN("[PSA_TCP_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
-        return CELIX_SERVICE_EXCEPTION;
-    }
 
     psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
 
-    if(entry == NULL) {
-        entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
-        entry->protSer = sender->protocol;
-        entry->type = msgTypeId;
-        entry->fqn = serializer->fqn;
-        celix_version_t* version = celix_version_createVersionFromString(serializer->version);
-        entry->major = (uint8_t)celix_version_getMajor(version);
-        entry->minor = (uint8_t)celix_version_getMinor(version);
-        celix_version_destroy(version);
-        uuid_copy(entry->originUUID, sender->fwUUID);
-        hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+    if (entry == NULL) {
+        const char* fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
+        if (fqn != NULL) {
+            entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
+            entry->protSer = sender->protocol;
+            entry->type = msgTypeId;
+            entry->fqn = fqn;
+            entry->major = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId);
+            entry->minor = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId);
+            uuid_copy(entry->originUUID, sender->fwUUID);
+            hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+        } else {
+            L_WARN("Cannot find message serialization for msg id %i", (int)msgTypeId);
+        }
     }
 
     delay_first_send_for_late_joiners(sender);
 
     size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
     struct iovec *serializedIoVecOutput = NULL;
-    status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
+    status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
     entry->serializedIoVecOutputLen = MAX(serializedIoVecOutputLen, entry->serializedIoVecOutputLen);
 
     bool cont = false;
     if (status == CELIX_SUCCESS) /*ser ok*/ {
-        cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata);
+        cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata);
     }
     if (cont) {
         pubsub_protocol_message_t message;
@@ -468,12 +453,12 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
                 status = -1;
                 sendOk = false;
             }
-            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata);
+            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata);
             if (message.metadata.metadata) {
                 celix_properties_destroy(message.metadata.metadata);
             }
             if (serializedIoVecOutput) {
-                serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedIoVecOutput, serializedIoVecOutputLen);
+                pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
                 serializedIoVecOutput = NULL;
             }
         }
@@ -482,12 +467,10 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
         }
     } else {
-        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn,
+        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn,
                sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
     }
 
-    pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
-
     return status;
 }
 
@@ -502,4 +485,11 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender)
         usleep(sender->send_delay * 1000);
         firstSend = false;
     }
+}
+
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
+    psa_tcp_bounded_service_entry_t* entry = handle;
+    uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
+    *msgTypeId = (unsigned int)msgId;
+    return 0;
 }
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
index dfb5014..29c8f7a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
@@ -24,6 +24,7 @@
 #include "pubsub_admin_metrics.h"
 #include "pubsub_protocol.h"
 #include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
 
 typedef struct pubsub_tcp_topic_sender pubsub_tcp_topic_sender_t;
 
@@ -32,7 +33,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     celix_log_helper_t *logHelper,
     const char *scope,
     const char *topic,
-    const char *serializerType,
+    pubsub_serializer_handler_t* serializerHandler,
     void *admin,
     const celix_properties_t *topicProperties,
     pubsub_tcp_endPointStore_t *handlerStore,
@@ -46,7 +47,6 @@ const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
 const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender);
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
 bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
-long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
 long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
 
 /**
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index e30403e..cc6de56 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -66,11 +66,6 @@ struct pubsub_zmq_admin {
     bool verbose;
 
     struct {
-        celix_thread_rwlock_t mutex;
-        hash_map_t *map; //key = svcId, value = psa_zmq_serializer_entry_t*
-    } serializers;
-
-    struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = svcId, value = psa_zmq_protocol_entry_t*
     } protocols;
@@ -189,9 +184,6 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_lo
     psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE);
     psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadRwlock_create(&psa->serializers.mutex, NULL);
-    psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
     celixThreadMutex_create(&psa->protocols.mutex, NULL);
     psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
 
@@ -241,15 +233,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
 
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    iter = hashMapIterator_construct(psa->serializers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        hash_map_t *entry = hashMapIterator_nextValue(&iter);
-        hashMap_destroy(entry, false, true);
-    }
-    hashMap_clear(psa->serializers.map, false, false);
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-
     celixThreadMutex_lock(&psa->protocols.mutex);
     iter = hashMapIterator_construct(psa->protocols.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -275,9 +258,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
     hashMap_destroy(psa->discoveredEndpoints.map, false, false);
 
-    celixThreadRwlock_destroy(&psa->serializers.mutex);
-    hashMap_destroy(psa->serializers.map, false, false);
-
     celixThreadMutex_destroy(&psa->protocols.mutex);
     hashMap_destroy(psa->protocols.map, false, false);
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 1051d77..a00ce34 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -95,7 +95,6 @@ struct pubsub_zmq_topic_receiver {
     struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
-        hash_map_t *msgFqns; //key = msg id, value = char* msgFqn
         bool allInitialized;
     } subscribers;
 };
@@ -207,7 +206,6 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
 
         receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
-        receiver->subscribers.msgFqns = hashMap_create(NULL, NULL, NULL, NULL);
         receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
     }
 
@@ -288,7 +286,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
             }
         }
         hashMap_destroy(receiver->subscribers.map, false, false);
-        hashMap_destroy(receiver->subscribers.msgFqns, false, true);
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -454,11 +451,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
     int updateReceiveCount = 0;
     int updateSerError = 0;
 
-    char* msgFqn = hashMap_get(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId);
-    if (msgFqn == NULL) {
-        msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
-        hashMap_put(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId, msgFqn);
-    }
+    const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
 
     void *deserializedMsg = NULL;
     bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
@@ -505,8 +498,9 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
             L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
     } else {
-        L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s', version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+        L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
                msgFqn,
+               pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
                (int)message->header.msgMajorVersion,
                (int)message->header.msgMinorVersion,
                pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
index a2f953b..3900f55 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
@@ -32,7 +32,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         const char *scope,
         const char *topic,
         const celix_properties_t *topicProperties,
-        pubsub_serializer_handler_t* serHandler,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin,
         long protocolSvcId,
         pubsub_protocol_service_t *protocol);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 9996c53..8ae4489 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -52,7 +52,7 @@
 struct pubsub_zmq_topic_sender {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    pubsub_serializer_handler_t* serializationHandler;
+    pubsub_serializer_handler_t* serializerHandler;
     void *admin;
     long protocolSvcId;
     pubsub_protocol_service_t *protocol;
@@ -137,7 +137,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        pubsub_serializer_handler_t* serializationHandler,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin,
         long protocolSvcId,
         pubsub_protocol_service_t *prot,
@@ -148,7 +148,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->logHelper = logHelper;
-    sender->serializationHandler = serializationHandler;
+    sender->serializerHandler = serializerHandler;
     sender->admin = admin;
     sender->protocolSvcId = protocolSvcId;
     sender->protocol = prot;
@@ -359,7 +359,7 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
 }
 
 const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
-    return pubsub_serializerHandler_getSerializationType(sender->serializationHandler);
+    return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
 }
 
 long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) {
@@ -384,7 +384,7 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender) {
 
 static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
     psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t *) handle;
-    *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializationHandler, msgType);
+    *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
     return 0;
 }
 
@@ -513,9 +513,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
         entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t));
         entry->protSer = sender->protocol;
         entry->type = msgTypeId;
-        entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializationHandler, msgTypeId);
-        entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializationHandler, msgTypeId);
-        entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializationHandler, msgTypeId);
+        entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
+        entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId);
+        entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId);
         uuid_copy(entry->originUUID, sender->fwUUID);
         celixThreadMutex_create(&entry->metrics.mutex, NULL);
         hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
@@ -528,7 +528,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
     }
     size_t serializedOutputLen = 0;
     struct iovec *serializedOutput = NULL;
-    status = pubsub_serializerHandler_serialize(sender->serializationHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
+    status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
     if (monitor) {
         clock_gettime(CLOCK_REALTIME, &serializationEnd);
     }
@@ -588,7 +588,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 zmq_msg_t msg4; // Footer
                 void *socket = zsock_resolve(sender->zmq.socket);
                 psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
-                freeMsgEntry->serHandler = sender->serializationHandler;
+                freeMsgEntry->serHandler = sender->serializerHandler;
                 freeMsgEntry->msgId = msgTypeId;
                 freeMsgEntry->serializedOutput = serializedOutput;
                 freeMsgEntry->serializedOutputLen = serializedOutputLen;
@@ -665,7 +665,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 celix_properties_destroy(message.metadata.metadata);
             }
             if (!bound->parent->zeroCopyEnabled && serializedOutput) {
-                pubsub_serializerHandler_freeSerializedMsg(sender->serializationHandler, msgTypeId, serializedOutput, serializedOutputLen);
+                pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen);
             }
 
             if (sendOk) {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
index c2c0d57..584b88d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
@@ -32,7 +32,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        pubsub_serializer_handler_t* serializationHandler,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin,
         long protocolSvcId,
         pubsub_protocol_service_t *prot,
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
index 550c85f..77a2fa0 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
@@ -111,7 +111,6 @@ TEST_F(PubSubSerializationHandlerTestSuite, SerializationServiceFound) {
     EXPECT_EQ(42, pubsub_serializerHandler_getMsgId(handler, "example::Msg"));
     auto *fqn = pubsub_serializerHandler_getMsgFqn(handler, 42);
     EXPECT_STREQ("example::Msg",  fqn);
-    free(fqn);
     EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 0));
     EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
 
@@ -293,3 +292,25 @@ TEST_F(PubSubSerializationHandlerTestSuite, CreateHandlerFromMarker) {
 
     celix_logHelper_destroy(logHelper);
 }
+
+TEST_F(PubSubSerializationHandlerTestSuite, GetMsgInfo) {
+    auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
+    EXPECT_FALSE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+    EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+
+    long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+    EXPECT_TRUE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+    EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+    const char* msgFqn;
+    int major;
+    int minor;
+    EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, &msgFqn, &major, &minor));
+    EXPECT_STREQ("example::Msg1", msgFqn);
+    EXPECT_EQ(1, major);
+    EXPECT_EQ(0, minor);
+
+    celix_bundleContext_unregisterService(ctx.get(), svcId1);
+    pubsub_serializerHandler_destroy(handler);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
index 0519891..f2c58ac 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
@@ -138,10 +138,15 @@ celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_ha
 bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* handler, uint32_t msgId, int majorVersion, int minorVersion);
 
 /**
+ * @brief Whether the serializer handler has found 1 or more pubsub_message_serialization_service for the provided msg id.
+ */
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
  * @brief Get msg fqn from a msg id.
- * @return msg fqn or NULL if msg id is not known.
+ * @return msg fqn or NULL if msg id is not known. msg fqn is valid as long as the handler exists.
  */
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
 
 /**
  * @brief Get a msg id from a msgFqn.
@@ -176,6 +181,24 @@ int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* han
  */
 int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
 
+
+/**
+ * @brief Returns msg info (fqn, major version, minor version) in 1 call.
+ *
+ * @param handler               The serializer handler
+ * @param msgId                 The msg id where to get the info for
+ * @param msgFqnOut             If not NULL will be set to the msgFqn (valid as long as the serializer handler is valid)
+ * @param msgMajorVersionOut    If not NULL will be set to the msg major version
+ * @param msgMinorVersionOut    If not NULL will be set to the msg minor version
+ * @return                      CELIX_SUCCESS on success, CELIX_ILLEGAL_ARGUMENT if the message for the provided msg id cannot be found.
+ */
+celix_status_t pubsub_serializerHandler_getMsgInfo(
+        pubsub_serializer_handler_t* handler,
+        uint32_t msgId,
+        const char** msgFqnOut,
+        int* msgMajorVersionOut,
+        int* msgMinorVersionOut);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index 7bfa5e0..418b4b5 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -48,6 +48,7 @@ typedef struct pubsub_serialization_service_entry {
 
 struct pubsub_serializer_handler {
     celix_bundle_context_t* ctx;
+    char* filter;
     char* serType;
     bool backwardCompatible;
     long serializationSvcTrackerId;
@@ -55,6 +56,7 @@ struct pubsub_serializer_handler {
 
     celix_thread_rwlock_t lock;
     hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+    hash_map_t *msgFullyQualifiedNames; //key = msg id, value = msg fqn. Non destructive map with msg fqn
 };
 
 static void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
@@ -104,17 +106,6 @@ static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serializat
     return compatible;
 }
 
-static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
-    //NOTE assumes mutex is locked
-    const char *result = NULL;
-    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
-    if (entries != NULL) {
-        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
-        result = entry->msgFqn;
-    }
-    return result;
-}
-
 pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
     pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
     handler->ctx = ctx;
@@ -125,18 +116,18 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_contex
 
     celixThreadRwlock_create(&handler->lock, NULL);
     handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+    handler->msgFullyQualifiedNames = hashMap_create(NULL, NULL, NULL, NULL);
 
-    char *filter = NULL;
-    asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+    asprintf(&handler->filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
     celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
     opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
     opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
-    opts.filter.filter = filter;
+    opts.filter.filter = handler->filter;
     opts.callbackHandle = handler;
     opts.addWithProperties = addSerializationService;
     opts.removeWithProperties = removeSerializationService;
-    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    free(filter);
+    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptionsAsync(ctx, &opts);
+
 
     return handler;
 }
@@ -186,26 +177,31 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(cel
     return data.handler;
 }
 
+static void pubsub_serializerHandler_destroyCallback(void* data) {
+    pubsub_serializer_handler_t* handler = data;
+    celixThreadRwlock_destroy(&handler->lock);
+    hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+        for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+            pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+            free(entry->msgFqn);
+            celix_version_destroy(entry->msgVersion);
+            free(entry);
+        }
+        celix_arrayList_destroy(entries);
+    }
+    hashMap_destroy(handler->serializationServices, false, false);
+    hashMap_destroy(handler->msgFullyQualifiedNames, false, true);
+    celix_logHelper_destroy(handler->logHelper);
+    free(handler->serType);
+    free(handler->filter);
+    free(handler);
+}
 
 void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
     if (handler != NULL) {
-        celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
-        celixThreadRwlock_destroy(&handler->lock);
-        hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
-        while (hashMapIterator_hasNext(&iter)) {
-            celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
-            for (int i = 0; i < celix_arrayList_size(entries); ++i) {
-                pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
-                free(entry->msgFqn);
-                celix_version_destroy(entry->msgVersion);
-                free(entry);
-            }
-            celix_arrayList_destroy(entries);
-        }
-        hashMap_destroy(handler->serializationServices, false, false);
-        celix_logHelper_destroy(handler->logHelper);
-        free(handler->serType);
-        free(handler);
+        celix_bundleContext_stopTrackerAsync(handler->ctx, handler->serializationSvcTrackerId, handler, pubsub_serializerHandler_destroyCallback);
     }
 }
 
@@ -257,10 +253,16 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
         celix_arrayList_add(entries, entry);
         celix_arrayList_sort(entries, compareEntries);
 
-        hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+        hashMap_put(handler->serializationServices, (void*)(uintptr_t)msgId, entries);
     } else {
         celix_version_destroy(msgVersion);
     }
+
+    char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
+    if (fqn == NULL) {
+        hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, celix_utils_strdup(msgFqn));
+    }
+
     celixThreadRwlock_unlock(&handler->lock);
 }
 
@@ -375,9 +377,16 @@ bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* ha
     return compatible;
 }
 
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    celixThreadRwlock_readLock(&handler->lock);
+    void* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    celixThreadRwlock_unlock(&handler->lock);
+    return entries != NULL;
+}
+
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
     celixThreadRwlock_readLock(&handler->lock);
-    char *msgFqn = celix_utils_strdup(getMsgFqn(handler, msgId));
+    char *msgFqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
     celixThreadRwlock_unlock(&handler->lock);
     return msgFqn;
 }
@@ -434,4 +443,31 @@ size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializ
 
 const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler) {
     return handler->serType;
+}
+
+int pubsub_serializerHandler_getMsgInfo(
+        pubsub_serializer_handler_t* handler,
+        uint32_t msgId,
+        const char** msgFqnOut,
+        int* msgMajorVersionOut,
+        int* msgMinorVersionOut) {
+    int result = CELIX_SUCCESS;
+    celixThreadRwlock_readLock(&handler->lock);
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        if (msgFqnOut != NULL) {
+            *msgFqnOut = entry->msgFqn;
+        }
+        if (msgMinorVersionOut != NULL) {
+            *msgMajorVersionOut = celix_version_getMajor(entry->msgVersion);
+        }
+        if (msgMinorVersionOut != NULL) {
+            *msgMinorVersionOut = celix_version_getMinor(entry->msgVersion);
+        }
+    } else {
+        result = CELIX_ILLEGAL_ARGUMENT;
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return result;
 }
\ No newline at end of file