You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/05/24 13:49:00 UTC

[celix] branch feature/use_ser_hander_in_psa updated: Updates pubsub zmq v2 to use pubsub_serializer_handler_t

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/use_ser_hander_in_psa by this push:
     new e840b83  Updates pubsub zmq v2 to use pubsub_serializer_handler_t
e840b83 is described below

commit e840b83a3a20960a70c7a9bdd29596a316964d6c
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon May 24 15:48:33 2021 +0200

    Updates pubsub zmq v2 to use pubsub_serializer_handler_t
---
 .../pubsub/pubsub_admin_zmq/v2/src/psa_activator.c |  11 -
 .../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c     | 239 ++++++---------------
 .../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h     |  13 --
 .../v2/src/pubsub_zmq_topic_receiver.c             | 153 ++++++-------
 .../v2/src/pubsub_zmq_topic_receiver.h             |   7 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |  80 +++----
 .../v2/src/pubsub_zmq_topic_sender.h               |   5 +-
 .../include/pubsub_message_serialization_marker.h  |  60 ++++++
 .../gtest/src/PubSubMatchingTestSuite.cpp          |  94 +++-----
 .../src/PubSubSerializationHandlerTestSuite.cc     |  35 ++-
 .../include/pubsub_serializer_handler.h            |  52 ++++-
 .../include/pubsub_serializer_provider.h           |  36 ++--
 bundles/pubsub/pubsub_utils/src/pubsub_matching.c  |  19 +-
 .../pubsub_utils/src/pubsub_serializer_handler.c   |  78 ++++++-
 14 files changed, 454 insertions(+), 428 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
index 014401e..a6d1c2d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
@@ -59,17 +59,6 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
     act->admin = pubsub_zmqAdmin_create(ctx, act->logHelper);
     celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
 
-    //track serializers
-    if (status == CELIX_SUCCESS) {
-        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-        opts.filter.ignoreServiceLanguage = true;
-        opts.callbackHandle = act->admin;
-        opts.addWithProperties = pubsub_zmqAdmin_addSerializerSvc;
-        opts.removeWithProperties = pubsub_zmqAdmin_removeSerializerSvc;
-        act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    }
-
     //track protocols
     if (status == CELIX_SUCCESS) {
         celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index 5d396e5..e30403e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -23,15 +23,17 @@
 #include <arpa/inet.h>
 #include <netdb.h>
 #include <ifaddrs.h>
-#include <pubsub_endpoint.h>
 #include <czmq.h>
-#include <pubsub_serializer.h>
-#include <pubsub_protocol.h>
 #include <ip_utils.h>
-#include <pubsub_matching.h>
-#include <pubsub_utils.h>
-#include <pubsub_message_serialization_service.h>
 
+#include "pubsub_serializer_handler.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_serializer.h"
+#include "pubsub_protocol.h"
+#include "pubsub_matching.h"
+#include "pubsub_utils.h"
+#include "pubsub_message_serialization_service.h"
+#include "pubsub_message_serialization_marker.h"
 #include "pubsub_zmq_admin.h"
 #include "pubsub_psa_zmq_constants.h"
 #include "pubsub_zmq_topic_sender.h"
@@ -88,6 +90,10 @@ struct pubsub_zmq_admin {
         hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
     } discoveredEndpoints;
 
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+    } serializationHandlers;
 };
 
 typedef struct psa_zmq_protocol_entry {
@@ -99,6 +105,7 @@ typedef struct psa_zmq_protocol_entry {
 static celix_status_t zmq_getIpAddress(const char* interface, char** ip);
 static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
 static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId);
 
 static bool pubsub_zmqAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
@@ -197,6 +204,9 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_lo
     celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
     psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
+    celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+    psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     return psa;
 }
 
@@ -248,6 +258,14 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->protocols.mutex);
 
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    iter = hashMapIterator_construct(psa->serializationHandlers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+        pubsub_serializerHandler_destroy(entry);
+    }
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+
     celixThreadMutex_destroy(&psa->topicSenders.mutex);
     hashMap_destroy(psa->topicSenders.map, true, false);
 
@@ -263,6 +281,9 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     celixThreadMutex_destroy(&psa->protocols.mutex);
     hashMap_destroy(psa->protocols.map, false, false);
 
+    celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+    hashMap_destroy(psa->serializationHandlers.map, false, false);
+
     if (psa->zmq_auth != NULL) {
         zactor_destroy(&psa->zmq_auth);
     }
@@ -272,92 +293,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     free(psa);
 }
 
-void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_zmq_admin_t *psa = handle;
-
-    const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-    const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
-    const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
-    if (serType == NULL || msgId == -1L || msgFqn == NULL) {
-        L_INFO("[PSA_ZMQ] Ignoring serializer service without one of the following properties: %s or %s or %s",
-               PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
-        L_INFO("[PSA_ZMQ] Ignored serializer type %s msgId %li fqn %s\n", serType, msgId, msgFqn);
-        return;
-    }
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries == NULL) {
-        typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
-        hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
-    }
-    psa_zmq_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
-    if (entry == NULL) {
-        entry = calloc(1, sizeof(psa_zmq_serializer_entry_t));
-        entry->svc = svc;
-        entry->fqn = celix_utils_strdup(msgFqn);
-        entry->version = celix_utils_strdup(msgVersion);
-        hashMap_put(typeEntries, (void*)msgId, entry);
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_zmq_admin_t *psa = handle;
-    const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries != NULL) {
-        psa_zmq_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
-        free((void*)entry->fqn);
-        free((void*)entry->version);
-        free(entry);
-
-        // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
-        if(hashMap_size(typeEntries) == 0) {
-            hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-            celixThreadMutex_lock(&psa->topicSenders.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-                pubsub_zmq_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
-                if (sender != NULL && strncmp(serType, pubsub_zmqTopicSender_serializerType(sender), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(senderEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_zmqTopicSender_destroy(sender);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
-            celixThreadMutex_lock(&psa->topicReceivers.mutex);
-            iter = hashMapIterator_construct(psa->topicReceivers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter);
-                pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry);
-                if (receiver != NULL && strncmp(serType, pubsub_zmqTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(receiverEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_zmqTopicReceiver_destroy(receiver);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-        } else {
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-        }
-    } else {
-        celixThreadRwlock_unlock(&psa->serializers.mutex);
-    }
-}
-
 void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_zmq_admin_t *psa = handle;
 
@@ -462,20 +397,21 @@ celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix
     return status;
 }
 
-static void pubsub_zmqAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
-    const char** out = handle;
-    *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
-
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId __attribute__((unused)), long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_zmq_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
-    //1) Create TopicSender
-    //2) Store TopicSender
-    //3) Connect existing endpoints
-    //4) set outPublisherEndpoint
+    //1) Get serialization handler
+    //2) Create TopicSender
+    //3) Store TopicSender
+    //4) Connect existing endpoints
+    //5) set outPublisherEndpoint
+
+    pubsub_serializer_handler_t* handler = pubsub_zmqAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic sender without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
 
     celix_properties_t *newEndpoint = NULL;
 
@@ -485,17 +421,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
     }
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
-    //get serializer type
-    const char *serType = NULL;
-    celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
-    opts.callbackHandle = &serType;
-    opts.useWithProperties = pubsub_zmqAdmin_getSerType;
-    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-    char filter[32];
-    snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
-    opts.filter.filter = filter;
-    celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
     celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     pubsub_zmq_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
@@ -503,14 +428,14 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
     if (sender == NULL) {
         psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
         if (protEntry != NULL) {
-            sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle,
+            sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle,
                     protocolSvcId, protEntry->svc, psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
         }
         if (sender != NULL) {
             const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
             const char *protType = protEntry->protType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
-                                                serType, protType, NULL);
+                                                pubsub_zmqTopicSender_serializerType(sender), protType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
 
             //if configured use a static discover url
@@ -546,10 +471,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
     }
     celixThreadMutex_unlock(&psa->protocols.mutex);
 
-    if (sender != NULL && newEndpoint != NULL) {
-        //TODO connect endpoints to sender, NOTE is this needed for a zmq topic sender?
-    }
-
     if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
         *outPublisherEndpoint = newEndpoint;
     }
@@ -572,7 +493,6 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
         pubsub_zmq_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
         celixThreadMutex_unlock(&psa->topicSenders.mutex);
         free(mapKey);
-        //TODO disconnect endpoints to sender. note is this needed for a zmq topic sender?
         pubsub_zmqTopicSender_destroy(sender);
     } else {
         celixThreadMutex_unlock(&psa->topicSenders.mutex);
@@ -585,21 +505,15 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
 
 celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId __attribute__((unused)), long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_zmq_admin_t *psa = handle;
-
     celix_properties_t *newEndpoint = NULL;
 
-    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    pubsub_serializer_handler_t* handler = pubsub_zmqAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic receiver without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
 
-    //get serializer type
-    const char *serType = NULL;
-    celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
-    opts.callbackHandle = &serType;
-    opts.useWithProperties = pubsub_zmqAdmin_getSerType;
-    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-    char filter[32];
-    snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
-    opts.filter.filter = filter;
-    celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
     celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -608,7 +522,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
     if (receiver == NULL) {
         psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
         if (protEntry != NULL) {
-            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serType, handle, protocolSvcId, protEntry->svc);
+            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, handler, handle, protocolSvcId, protEntry->svc);
         } else {
             L_ERROR("[PSA_ZMQ] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
         }
@@ -616,7 +530,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
             const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
             const char *protType = protEntry->protType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+                                                pubsub_zmqTopicReceiver_serializerType(receiver), protType, NULL);
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {
@@ -856,46 +771,6 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE
     return status;
 }
 
-psa_zmq_serializer_entry_t* pubsub_zmqAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
-    pubsub_zmq_admin_t *psa = handle;
-    psa_zmq_serializer_entry_t *serializer = NULL;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
-    if(typeEntries != NULL) {
-        serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
-    }
-
-    return serializer;
-}
-
-void pubsub_zmqAdmin_releaseSerializer(void *handle, psa_zmq_serializer_entry_t* serializer __attribute__((unused))) {
-    pubsub_zmq_admin_t *psa = handle;
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_zmqAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
-    pubsub_zmq_admin_t *psa = handle;
-    int64_t id = -1L;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
-    if(typeEntries != NULL) {
-        hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
-        while(hashMapIterator_hasNext(&iterator)) {
-            void *key = hashMapIterator_nextKey(&iterator);
-            psa_zmq_serializer_entry_t *entry = hashMap_get(typeEntries, key);
-            if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
-                id = (uint32_t)(uintptr_t)key;
-                break;
-            }
-        }
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-    return id;
-}
-
 pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle) {
     pubsub_zmq_admin_t *psa = handle;
     pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
@@ -924,6 +799,20 @@ pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle) {
     return result;
 }
 
+static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId) {
+    pubsub_serializer_handler_t* handler = NULL;
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+    if (handler == NULL) {
+        handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+        if (handler != NULL) {
+            hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+        }
+    }
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+    return handler;
+}
+
 #ifndef ANDROID
 static celix_status_t zmq_getIpAddress(const char* interface, char** ip) {
     celix_status_t status = CELIX_BUNDLE_EXCEPTION;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
index f99ee08..475e464 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
@@ -27,12 +27,6 @@
 
 typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
 
-typedef struct psa_zmq_serializer_entry {
-    const char *fqn;
-    const char *version;
-    pubsub_message_serialization_service_t *svc;
-} psa_zmq_serializer_entry_t;
-
 pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper);
 void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
 
@@ -49,18 +43,11 @@ celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *s
 celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
 celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
 
-void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-
 void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
 void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
 
 bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
 
-psa_zmq_serializer_entry_t* pubsub_zmqAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId);
-void pubsub_zmqAdmin_releaseSerializer(void *handle, psa_zmq_serializer_entry_t* serializer);
-int64_t pubsub_zmqAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn);
-
 pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle);
 
 #endif //CELIX_PUBSUB_ZMQ_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 0568834..1051d77 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -26,22 +26,22 @@
 #if !defined(__APPLE__)
     #include <sys/epoll.h>
 #endif
+
 #include <assert.h>
-#include <pubsub_endpoint.h>
 #include <arpa/inet.h>
 #include <czmq.h>
-#include <celix_log_helper.h>
-#include "pubsub_zmq_topic_receiver.h"
-#include "pubsub_psa_zmq_constants.h"
-
 #include <uuid/uuid.h>
-#include <pubsub_admin_metrics.h>
-#include <pubsub_utils.h>
-#include <celix_api.h>
-#include <celix_version.h>
 
+#include "pubsub_endpoint.h"
+#include "celix_log_helper.h"
+#include "pubsub_zmq_topic_receiver.h"
+#include "pubsub_psa_zmq_constants.h"
+#include "pubsub_admin_metrics.h"
+#include "pubsub_utils.h"
+#include "celix_api.h"
+#include "celix_version.h"
+#include "pubsub_serializer_handler.h"
 #include "pubsub_interceptors_handler.h"
-
 #include "celix_utils_api.h"
 #include "pubsub_zmq_admin.h"
 
@@ -64,7 +64,7 @@
 struct pubsub_zmq_topic_receiver {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    const char *serializerType;
+    pubsub_serializer_handler_t* serializerHandler;
     void *admin;
     long protocolSvcId;
     pubsub_protocol_service_t *protocol;
@@ -95,6 +95,7 @@ struct pubsub_zmq_topic_receiver {
     struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+        hash_map_t *msgFqns; //key = msg id, value = char* msgFqn
         bool allInitialized;
     } subscribers;
 };
@@ -118,7 +119,6 @@ static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t
 static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
 static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
 static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
-static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
 
 
 pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -126,14 +126,14 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
                                                             const char *scope,
                                                             const char *topic,
                                                             const celix_properties_t *topicProperties,
-                                                            const char* serializerType,
+                                                            pubsub_serializer_handler_t* serHandler,
                                                             void *admin,
                                                             long protocolSvcId,
                                                             pubsub_protocol_service_t *protocol) {
     pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
-    receiver->serializerType = celix_utils_strdup(serializerType);
+    receiver->serializerHandler = serHandler;
     receiver->admin = admin;
     receiver->protocolSvcId = protocolSvcId;
     receiver->protocol = protocol;
@@ -207,6 +207,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
 
         receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        receiver->subscribers.msgFqns = hashMap_create(NULL, NULL, NULL, NULL);
         receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
     }
 
@@ -287,6 +288,7 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
             }
         }
         hashMap_destroy(receiver->subscribers.map, false, false);
+        hashMap_destroy(receiver->subscribers.msgFqns, false, true);
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -312,7 +314,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
 
         free(receiver->scope);
         free(receiver->topic);
-        free((void*)receiver->serializerType);
     }
     free(receiver);
 }
@@ -325,7 +326,7 @@ const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver)
 }
 
 const char* pubsub_zmqTopicReceiver_serializerType(pubsub_zmq_topic_receiver_t *receiver) {
-    return receiver->serializerType;
+    return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
 }
 
 long pubsub_zmqTopicReceiver_protocolSvcId(pubsub_zmq_topic_receiver_t *receiver) {
@@ -446,7 +447,6 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
 static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
     //NOTE receiver->subscribers.mutex locked
     bool monitor = receiver->metricsEnabled;
-    psa_zmq_serializer_entry_t *msgSer = pubsub_zmqAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serializerType, message->header.msgId);
 
     //monitoring
     struct timespec beginSer;
@@ -454,60 +454,64 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
     int updateReceiveCount = 0;
     int updateSerError = 0;
 
-    if (msgSer!= NULL) {
-        void *deserializedMsg = NULL;
-        const char *msgFqn = msgSer->fqn;
-        celix_version_t *version = celix_version_createVersionFromString(msgSer->version);
-        bool validVersion = psa_zmq_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion);
-        celix_version_destroy(version);
-        if (validVersion) {
-            if (monitor) {
-                clock_gettime(CLOCK_REALTIME, &beginSer);
-            }
-            struct iovec deSerializeBuffer;
-            deSerializeBuffer.iov_base = message->payload.payload;
-            deSerializeBuffer.iov_len  = message->payload.length;
-            celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 0, &deserializedMsg);
-            if (monitor) {
-                clock_gettime(CLOCK_REALTIME, &endSer);
-            }
-            if (status == CELIX_SUCCESS) {
-                uint32_t msgId = message->header.msgId;
-                celix_properties_t *metadata = message->metadata.metadata;
-                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
-                bool release = true;
-                if (cont) {
-                    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                    while (hashMapIterator_hasNext(&iter2)) {
-                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                        svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
-                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
-                        if (!release && hashMapIterator_hasNext(&iter2)) {
-                            //receive function has taken ownership and still more receive function to come ..
-                            //deserialize again for new message
-                            status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 0, &deserializedMsg);
-                            if (status != CELIX_SUCCESS) {
-                                L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                                break;
-                            }
-                            release = true;
+    char* msgFqn = hashMap_get(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId);
+    if (msgFqn == NULL) {
+        msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+        hashMap_put(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId, msgFqn);
+    }
+
+    void *deserializedMsg = NULL;
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+    if (validVersion) {
+        if (monitor) {
+            clock_gettime(CLOCK_REALTIME, &beginSer);
+        }
+        struct iovec deSerializeBuffer;
+        deSerializeBuffer.iov_base = message->payload.payload;
+        deSerializeBuffer.iov_len  = message->payload.length;
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+        if (monitor) {
+            clock_gettime(CLOCK_REALTIME, &endSer);
+        }
+        if (status == CELIX_SUCCESS) {
+            uint32_t msgId = message->header.msgId;
+            celix_properties_t *metadata = message->metadata.metadata;
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
+            bool release = true;
+            if (cont) {
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
+                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
+                    if (!release && hashMapIterator_hasNext(&iter2)) {
+                        //receive function has taken ownership and still more receive function to come ..
+                        //deserialize again for new message
+                        status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+                        if (status != CELIX_SUCCESS) {
+                            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                            break;
                         }
+                        release = true;
                     }
-                    if (release) {
-                        msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deserializedMsg);
-                    }
-                    updateReceiveCount += 1;
                 }
-            } else {
-                updateSerError += 1;
-                L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                if (release) {
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
+                }
+                updateReceiveCount += 1;
             }
+        } else {
+            updateSerError += 1;
+            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
     } else {
-        L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId);
+        L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s', version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+               msgFqn,
+               (int)message->header.msgMajorVersion,
+               (int)message->header.msgMinorVersion,
+               pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
+               pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
     }
-
-    pubsub_zmqAdmin_releaseSerializer(receiver->admin, msgSer);
 }
 
 static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
@@ -744,24 +748,3 @@ static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const
     ts->zmq_pub_cert = pub_cert;
 #endif
 }
-
-static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
-    bool check=false;
-
-    if (major == 0 && minor == 0) {
-        //no check
-        return true;
-    }
-
-    int versionMajor;
-    int versionMinor;
-    if (msgVersion!=NULL) {
-        version_getMajor(msgVersion, &versionMajor);
-        version_getMinor(msgVersion, &versionMinor);
-        if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
-            check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
-        }
-    }
-
-    return check;
-}
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
index 67ba6d4..a2f953b 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
@@ -20,9 +20,10 @@
 #ifndef CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
 #define CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
 
-#include <pubsub_admin_metrics.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_admin_metrics.h"
+#include "pubsub_message_serialization_service.h"
 #include "celix_bundle_context.h"
+#include "pubsub_serializer_handler.h"
 
 typedef struct pubsub_zmq_topic_receiver pubsub_zmq_topic_receiver_t;
 
@@ -31,7 +32,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         const char *scope,
         const char *topic,
         const celix_properties_t *topicProperties,
-        const char* serializerType,
+        pubsub_serializer_handler_t* serHandler,
         void *admin,
         long protocolSvcId,
         pubsub_protocol_service_t *protocol);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index f969f4c..9996c53 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -21,17 +21,18 @@
 #include <pubsub_protocol.h>
 #include <stdlib.h>
 #include <memory.h>
-#include <pubsub_constants.h>
-#include <pubsub/publisher.h>
-#include <utils.h>
-#include <zconf.h>
 #include <arpa/inet.h>
 #include <czmq.h>
-#include <celix_log_helper.h>
+#include <uuid/uuid.h>
+
+#include "celix_utils.h"
+#include "pubsub_constants.h"
+#include "pubsub/publisher.h"
+#include "celix_log_helper.h"
 #include "pubsub_zmq_topic_sender.h"
 #include "pubsub_psa_zmq_constants.h"
-#include <uuid/uuid.h>
-#include <celix_version.h>
+#include "celix_version.h"
+#include "pubsub_serializer_handler.h"
 #include "celix_constants.h"
 #include "pubsub_interceptors_handler.h"
 #include "pubsub_zmq_admin.h"
@@ -51,7 +52,7 @@
 struct pubsub_zmq_topic_sender {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    const char *serializerType;
+    pubsub_serializer_handler_t* serializationHandler;
     void *admin;
     long protocolSvcId;
     pubsub_protocol_service_t *protocol;
@@ -85,8 +86,8 @@ struct pubsub_zmq_topic_sender {
 typedef struct psa_zmq_send_msg_entry {
     uint32_t type; //msg type id (hash of fqn)
     const char *fqn;
-    uint8_t major;
-    uint8_t minor;
+    uint16_t msgMajorVersion;
+    uint16_t msgMinorVersion;
     unsigned char originUUID[16];
     pubsub_protocol_service_t *protSer;
     unsigned int seqNr;
@@ -117,7 +118,8 @@ typedef struct psa_zmq_bounded_service_entry {
 } psa_zmq_bounded_service_entry_t;
 
 typedef struct psa_zmq_zerocopy_free_entry {
-    psa_zmq_serializer_entry_t *msgSer;
+    uint32_t msgId;
+    pubsub_serializer_handler_t *serHandler;
     struct iovec *serializedOutput;
     size_t serializedOutputLen;
 } psa_zmq_zerocopy_free_entry;
@@ -135,7 +137,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        const char* serializerType,
+        pubsub_serializer_handler_t* serializationHandler,
         void *admin,
         long protocolSvcId,
         pubsub_protocol_service_t *prot,
@@ -146,7 +148,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->logHelper = logHelper;
-    sender->serializerType = celix_utils_strdup(serializerType);
+    sender->serializationHandler = serializationHandler;
     sender->admin = admin;
     sender->protocolSvcId = protocolSvcId;
     sender->protocol = prot;
@@ -352,13 +354,12 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
         }
         free(sender->topic);
         free(sender->url);
-        free((void*)sender->serializerType);
         free(sender);
     }
 }
 
 const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
-    return sender->serializerType;
+    return pubsub_serializerHandler_getSerializationType(sender->serializationHandler);
 }
 
 long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) {
@@ -383,10 +384,7 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender) {
 
 static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
     psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t *) handle;
-    int64_t rc = pubsub_zmqAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType);
-    if(rc >= 0) {
-        *msgTypeId = (unsigned int)rc;
-    }
+    *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializationHandler, msgType);
     return 0;
 }
 
@@ -485,7 +483,7 @@ pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
 
 static void psa_zmq_freeMsg(void *msg, void *hint) {
     psa_zmq_zerocopy_free_entry *entry = hint;
-    entry->msgSer->svc->freeSerializedMsg(entry->msgSer->svc->handle, entry->serializedOutput, entry->serializedOutputLen);
+    pubsub_serializerHandler_freeSerializedMsg(entry->serHandler, entry->msgId, entry->serializedOutput, entry->serializedOutputLen);
     free(entry);
 }
 
@@ -500,14 +498,6 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
     pubsub_zmq_topic_sender_t *sender = bound->parent;
     bool monitor = sender->metricsEnabled;
 
-    psa_zmq_serializer_entry_t *serializer = pubsub_zmqAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId);
-
-    if(serializer == NULL) {
-        pubsub_zmqAdmin_releaseSerializer(sender->admin, serializer);
-        L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
-        return CELIX_SERVICE_EXCEPTION;
-    }
-
     psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId));
 
     //metrics updates
@@ -519,15 +509,13 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
     int serializationErrorUpdate = 0;
     int sendCountUpdate = 0;
 
-    if(entry == NULL) {
+    if (entry == NULL) {
         entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t));
         entry->protSer = sender->protocol;
         entry->type = msgTypeId;
-        entry->fqn = serializer->fqn;
-        celix_version_t* version = celix_version_createVersionFromString(serializer->version);
-        entry->major = (uint8_t)celix_version_getMajor(version);
-        entry->minor = (uint8_t)celix_version_getMinor(version);
-        celix_version_destroy(version);
+        entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializationHandler, msgTypeId);
+        entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializationHandler, msgTypeId);
+        entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializationHandler, msgTypeId);
         uuid_copy(entry->originUUID, sender->fwUUID);
         celixThreadMutex_create(&entry->metrics.mutex, NULL);
         hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
@@ -540,8 +528,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
     }
     size_t serializedOutputLen = 0;
     struct iovec *serializedOutput = NULL;
-    status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedOutput, &serializedOutputLen);
-
+    status = pubsub_serializerHandler_serialize(sender->serializationHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
     if (monitor) {
         clock_gettime(CLOCK_REALTIME, &serializationEnd);
     }
@@ -554,7 +541,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
             usleep(500);
         }
 
-        bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata);
+        bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata);
         if (cont) {
 
             pubsub_protocol_message_t message;
@@ -576,8 +563,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
             message.header.msgId = msgTypeId;
             message.header.seqNr = entry->seqNr;
-            message.header.msgMajorVersion = 0;
-            message.header.msgMinorVersion = 0;
+            message.header.msgMajorVersion = entry->msgMajorVersion;
+            message.header.msgMinorVersion = entry->msgMinorVersion;
             message.header.payloadSize = payloadLength;
             message.header.metadataSize = entry->metadataBufferSize;
             message.header.payloadPartSize = payloadLength;
@@ -601,7 +588,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 zmq_msg_t msg4; // Footer
                 void *socket = zsock_resolve(sender->zmq.socket);
                 psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
-                freeMsgEntry->msgSer = serializer;
+                freeMsgEntry->serHandler = sender->serializationHandler;
+                freeMsgEntry->msgId = msgTypeId;
                 freeMsgEntry->serializedOutput = serializedOutput;
                 freeMsgEntry->serializedOutputLen = serializedOutputLen;
 
@@ -671,13 +659,13 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
                 __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
             }
-            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata);
+            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata);
 
             if (message.metadata.metadata) {
                 celix_properties_destroy(message.metadata.metadata);
             }
             if (!bound->parent->zeroCopyEnabled && serializedOutput) {
-                serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedOutput, serializedOutputLen);
+                pubsub_serializerHandler_freeSerializedMsg(sender->serializationHandler, msgTypeId, serializedOutput, serializedOutputLen);
             }
 
             if (sendOk) {
@@ -691,15 +679,13 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
         }
     } else {
         serializationErrorUpdate = 1;
-        L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+        L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
     }
 
-    pubsub_zmqAdmin_releaseSerializer(sender->admin, serializer);
-
-    if (monitor && entry != NULL) {
+    if (monitor) {
         celixThreadMutex_lock(&entry->metrics.mutex);
 
-        long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed;
+        double n = (double)(entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed);
         double diff = celix_difftime(&serializationStart, &serializationEnd);
         double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n+1);
         entry->metrics.averageSerializationTimeInSeconds = average;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
index 744f653..c2c0d57 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
@@ -20,7 +20,7 @@
 #ifndef CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
 #define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
 
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_serializer_handler.h"
 #include "celix_bundle_context.h"
 #include "pubsub_admin_metrics.h"
 #include "celix_log_helper.h"
@@ -32,7 +32,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        const char* serializerType,
+        pubsub_serializer_handler_t* serializationHandler,
         void *admin,
         long protocolSvcId,
         pubsub_protocol_service_t *prot,
@@ -40,6 +40,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         const char *staticBindUrl,
         unsigned int basePort,
         unsigned int maxPort);
+
 void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);
 
 const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
new file mode 100644
index 0000000..a4ff06b
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
+
+#include "hash_map.h"
+#include "version.h"
+#include "celix_bundle.h"
+#include "sys/uio.h"
+
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME      "pubsub_message_serialization_marker"
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION   "1.0.0"
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_RANGE     "[1,2)"
+
+/**
+ * @brief Service property (named "serialization.type") identifying the serialization type (e.g json, avrobin, etc)
+ */
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY                 "serialization.type"
+
+/**
+ * @brief Service property (named "serialization.backwards.compatible") identifying whether the serialization is
+ * backwards compatible (i.e. for json a extra - new - field can be safely ignored and is thus backwards compatible.
+ *
+ * Type if boolean. If service property is not present, false will be used.
+ */
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE       "serialization.backwards.compatible"
+
+/**
+ * @brief Marker interface - interface with no methods - to indicate that a serialization.type is available.
+ *
+ * This marker interface is used to indicate that a serialization type is available without the need to rely on
+ * pubsub message serialization service per msg type.
+ * The service.ranking of this marker interface must be used to select a serialization type if no serialization type is
+ * configured. The service.ranking of individual pubsub_messge_serization_service is used to override serializers per
+ * type.
+ *
+ * The properties serialization.type is mandatory
+ */
+typedef struct pubsub_message_serialization_marker {
+    void* handle;
+} pubsub_message_serialization_marker_t;
+
+#endif /* PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_ */
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp b/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
index 85c86c1..10555a3 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
@@ -20,16 +20,17 @@
 #include "gtest/gtest.h"
 
 #include <memory>
+#include <cstdarg>
 
-#include <celix_api.h>
+#include "celix_api.h"
 #include "pubsub_message_serialization_service.h"
 #include "dyn_message.h"
-#include <cstdarg>
-#include <pubsub_protocol.h>
-#include <pubsub_constants.h>
-#include <pubsub_matching.h>
-#include <pubsub/api.h>
-#include <pubsub_endpoint.h>
+#include "pubsub_protocol.h"
+#include "pubsub_constants.h"
+#include "pubsub_matching.h"
+#include "pubsub/api.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_message_serialization_marker.h"
 
 static void stdLog(void*, int level, const char *file, int line, const char *msg, ...) {
     va_list ap;
@@ -54,70 +55,46 @@ public:
         bndId = celix_bundleContext_installBundle(ctx.get(), MATCHING_BUNDLE, true);
 
         dynMessage_logSetup(stdLog, NULL, 1);
-
-        msgSerSvc.handle = this;
-        msgSerSvc.serialize = [](void*, const void*, struct iovec**, size_t*) -> celix_status_t {
-            return CELIX_SUCCESS;
-        };
-        msgSerSvc.freeSerializedMsg = [](void*, struct iovec* , size_t) {
-        };
-        msgSerSvc.deserialize = [](void*, const struct iovec*, size_t, void**) -> celix_status_t {
-            return CELIX_SUCCESS;
-        };
-        msgSerSvc.freeDeserializedMsg = [](void*, void*) {
-        };
     }
 
     ~PubSubMatchingTestSuite() override {
         celix_bundleContext_uninstallBundle(ctx.get(), bndId);
     }
 
-    long registerSerSvc(const char* type, uint32_t msgId, const char* msgFqn, const char* msgVersion, long ranking) {
+    long registerMarkerSerSvc(const char* type) {
         auto* p = celix_properties_create();
-        celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, type);
-        celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, std::to_string(msgId).c_str());
-        celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, msgFqn);
-        celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
-        celix_properties_setLong(p, OSGI_FRAMEWORK_SERVICE_RANKING, ranking);
+        celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, type);
         celix_service_registration_options_t opts{};
-        opts.svc = static_cast<void*>(&msgSerSvc);
+        opts.svc = static_cast<void*>(&serMarkerSvc);
         opts.properties = p;
-        opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-        opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_VERSION;
+        opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+        opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION;
         return celix_bundleContext_registerServiceWithOptions(ctx.get(), &opts);
     }
 
     std::shared_ptr<celix_framework_t> fw{};
     std::shared_ptr<celix_bundle_context_t> ctx{};
-    pubsub_message_serialization_service_t  msgSerSvc{};
+    pubsub_message_serialization_marker_t serMarkerSvc{};
     pubsub_protocol_service_t protocolSvc{};
     long bndId{};
 };
 
 TEST_F(PubSubMatchingTestSuite, MatchPublisherSimple) {
-    auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
-
+    auto serMarkerId = registerMarkerSerSvc("fiets");
     long foundSvcId = -1;
-
     pubsub_utils_matchPublisher(ctx.get(), bndId, "(&(objectClass=pubsub.publisher)(service.lang=C)(topic=fiets))", "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
-    EXPECT_EQ(foundSvcId, serId);
-
-    celix_bundleContext_unregisterService(ctx.get(), serId);
+    EXPECT_EQ(foundSvcId, serMarkerId);
+    celix_bundleContext_unregisterService(ctx.get(), serMarkerId);
 }
 
 TEST_F(PubSubMatchingTestSuite, MatchPublisherMultiple) {
-    auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
-    auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
-    auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
-    auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
-
+    auto serFietsId = registerMarkerSerSvc("fiets");
+    auto serFiets2Id = registerMarkerSerSvc("fiets");
+    auto serAutoId = registerMarkerSerSvc("auto");
+    auto serBelId = registerMarkerSerSvc("bel");
     long foundSvcId = -1;
-
     pubsub_utils_matchPublisher(ctx.get(), bndId, "(&(objectClass=pubsub.publisher)(service.lang=C)(topic=fiets))", "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
-    EXPECT_EQ(foundSvcId, serFiets2Id);
-
+    EXPECT_EQ(foundSvcId, serFietsId); //older service are ranked higher
     celix_bundleContext_unregisterService(ctx.get(), serFietsId);
     celix_bundleContext_unregisterService(ctx.get(), serFiets2Id);
     celix_bundleContext_unregisterService(ctx.get(), serAutoId);
@@ -125,26 +102,23 @@ TEST_F(PubSubMatchingTestSuite, MatchPublisherMultiple) {
 }
 
 TEST_F(PubSubMatchingTestSuite, MatchSubscriberSimple) {
-    auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
+    auto serId = registerMarkerSerSvc("fiets");
 
     long foundSvcId = -1;
     auto* p = celix_properties_create();
     celix_properties_set(p, PUBSUB_SUBSCRIBER_SCOPE, "scope");
     celix_properties_set(p, PUBSUB_SUBSCRIBER_TOPIC, "fiets");
-
     pubsub_utils_matchSubscriber(ctx.get(), bndId, p, "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
     EXPECT_EQ(foundSvcId, serId);
-
     celix_properties_destroy(p);
     celix_bundleContext_unregisterService(ctx.get(), serId);
 }
 
 TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
-    auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
-    auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
-    auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
-    auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
+    auto serFietsId = registerMarkerSerSvc("fiets");
+    auto serFiets2Id = registerMarkerSerSvc("fiets");
+    auto serAutoId = registerMarkerSerSvc("auto");
+    auto serBelId = registerMarkerSerSvc("bel");
 
     long foundSvcId = -1;
 
@@ -154,7 +128,7 @@ TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
 
     pubsub_utils_matchSubscriber(ctx.get(), bndId, p, "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
 
-    EXPECT_EQ(foundSvcId, serFiets2Id);
+    EXPECT_EQ(foundSvcId, serFietsId);
 
     celix_properties_destroy(p);
     celix_bundleContext_unregisterService(ctx.get(), serFietsId);
@@ -164,7 +138,7 @@ TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
 }
 
 TEST_F(PubSubMatchingTestSuite, MatchEndpointSimple) {
-    auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
+    auto serId = registerMarkerSerSvc("fiets");
 
     long foundSvcId = -1;
 
@@ -183,10 +157,10 @@ TEST_F(PubSubMatchingTestSuite, MatchEndpointSimple) {
 }
 
 TEST_F(PubSubMatchingTestSuite, MatchEndpointMultiple) {
-    auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
-    auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
-    auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
-    auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
+    auto serFietsId = registerMarkerSerSvc("fiets");
+    auto serFiets2Id = registerMarkerSerSvc("fiets");
+    auto serAutoId = registerMarkerSerSvc("auto");
+    auto serBelId = registerMarkerSerSvc("bel");
 
     long foundSvcId = -1;
 
@@ -197,7 +171,7 @@ TEST_F(PubSubMatchingTestSuite, MatchEndpointMultiple) {
 
     pubsub_utils_matchEndpoint(ctx.get(), logHelper, ep, "admin?", false, &foundSvcId, NULL);
 
-    EXPECT_EQ(foundSvcId, serFiets2Id);
+    EXPECT_EQ(foundSvcId, serFietsId);
 
     celix_properties_destroy(ep);
     celix_logHelper_destroy(logHelper);
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
index 2448c18..550c85f 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
@@ -20,12 +20,13 @@
 #include "gtest/gtest.h"
 
 #include <memory>
+#include <cstdarg>
 
 #include "celix_api.h"
 #include "pubsub_message_serialization_service.h"
 #include "pubsub_serializer_handler.h"
 #include "dyn_message.h"
-#include <cstdarg>
+#include "pubsub_message_serialization_marker.h"
 
 static void stdLog(void*, int level, const char *file, int line, const char *msg, ...) {
     va_list ap;
@@ -98,6 +99,7 @@ public:
 TEST_F(PubSubSerializationHandlerTestSuite, CreateDestroy) {
     auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
     ASSERT_TRUE(handler != nullptr);
+    ASSERT_STREQ("json", pubsub_serializerHandler_getSerializationType(handler));
     pubsub_serializerHandler_destroy(handler);
 }
 
@@ -176,6 +178,8 @@ TEST_F(PubSubSerializationHandlerTestSuite, MultipleVersions) {
     EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 14));
     EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 1));
     EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
+    EXPECT_EQ(pubsub_serializerHandler_getMsgMajorVersion(handler, 42), 1);
+    EXPECT_EQ(pubsub_serializerHandler_getMsgMinorVersion(handler, 42), 0);
 
     celix_bundleContext_unregisterService(ctx.get(), svcId1);
     celix_bundleContext_unregisterService(ctx.get(), svcId2);
@@ -194,6 +198,8 @@ TEST_F(PubSubSerializationHandlerTestSuite, NoBackwardsCompatbile) {
     EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 14));
     EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 1));
     EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
+    EXPECT_EQ(pubsub_serializerHandler_getMsgMajorVersion(handler, 42), 1);
+    EXPECT_EQ(pubsub_serializerHandler_getMsgMinorVersion(handler, 42), 0);
 
     celix_bundleContext_unregisterService(ctx.get(), svcId1);
     pubsub_serializerHandler_destroy(handler);
@@ -261,4 +267,29 @@ TEST_F(PubSubSerializationHandlerTestSuite, BackwardsCompatibleCall) {
 
     celix_bundleContext_unregisterService(ctx.get(), svcId1);
     pubsub_serializerHandler_destroy(handler);
-}
\ No newline at end of file
+}
+
+TEST_F(PubSubSerializationHandlerTestSuite, CreateHandlerFromMarker) {
+    auto* logHelper = celix_logHelper_create(ctx.get(), "test");
+    auto* marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), 1032 /*invalid*/, logHelper);
+    EXPECT_FALSE(marker); //non existing svc
+
+    pubsub_message_serialization_marker_t markerSvc;
+    long svcId = celix_bundleContext_registerService(ctx.get(), &markerSvc, PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, NULL);
+    EXPECT_GE(svcId, 0);
+    marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), svcId, logHelper);
+    EXPECT_FALSE(marker); //missing ser type service property
+    celix_bundleContext_unregisterService(ctx.get(), svcId);
+
+    auto* props = celix_properties_create();
+    celix_properties_set(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, "test");
+    svcId = celix_bundleContext_registerService(ctx.get(), &markerSvc, PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, props);
+    EXPECT_GE(svcId, 0);
+    marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), svcId, logHelper);
+    EXPECT_TRUE(marker);
+    EXPECT_STREQ("test", pubsub_serializerHandler_getSerializationType(marker));
+    celix_bundleContext_unregisterService(ctx.get(), svcId);
+    pubsub_serializerHandler_destroy(marker);
+
+    celix_logHelper_destroy(logHelper);
+}
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
index 06c18de..0519891 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
@@ -23,6 +23,7 @@
 #include <stdint.h>
 #include <sys/uio.h>
 
+#include "celix_log_helper.h"
 #include "celix_api.h"
 
 #ifdef __cplusplus
@@ -33,8 +34,10 @@ typedef struct pubsub_serializer_handler pubsub_serializer_handler_t; //opaque t
 
 
 /**
- * @brief Creates a handler which track pubsub_custom_msg_serialization_service services with a (serialization.type=<serializerType)) filter.
- * If multiple pubsub_message_serialization_service for the same msg fqn (targeted.msg.fqn property) the highest ranking service will be used.
+ * @brief Creates a pubsub serializer handler which tracks pubsub_custom_msg_serialization_service services using the provided serialization type.
+ *
+ * If there are multiple pubsub_message_serialization_service services for the same msg fqn
+ * (targeted.msg.fqn property) the highest ranking service will be used.
  *
  * The message handler assumes (and checks) that all provided serialization services do not clash in message ids (so every msgId should have its own msgFqn)
  * and that only one version for a message serialization is registered.
@@ -53,6 +56,29 @@ typedef struct pubsub_serializer_handler pubsub_serializer_handler_t; //opaque t
 pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible);
 
 /**
+ * @brief Creates a pubsub serializer handler which tracks pubsub_custom_msg_serialization_service services using the serialization type of the provided
+ * marker service.id
+ *
+ * If there are multiple pubsub_message_serialization_service services for the same msg fqn
+ * (targeted.msg.fqn property) the highest ranking service will be used.
+ *
+ * The message handler assumes (and checks) that all provided serialization services do not clash in message ids (so every msgId should have its own msgFqn)
+ * and that only one version for a message serialization is registered.
+ * This means that all bundles in a single celix container (a single process) should all use the same version of messages.
+ *
+ * If backwards compatibility is supported, when serialized message with a higher minor version when available in the serializer handler are used to
+ * deserialize. This could be supported for serialization like json.
+ * So when a json message of version 1.1.x with content {"field1":"value1", "field2":"value2"} is deserialized to a version 1.0 (which only has field1),
+ * the message can and will be deserialized
+ *
+ * @param ctx                               The bundle contest.
+ * @param pubsubSerializerMarkerSvcId       The service.id of the pubsub_serialization_marker to use for deferring serializationType and backwardsCompatible.
+ * @param logHelper                         Optional log helper. If provided will be used to log issues whit creating a serializer handler for the provided marker svc id.
+ * @return A newly created pubsub serializer handler.
+ */
+pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(celix_bundle_context_t* ctx, long pubsubSerializerMarkerSvcId, celix_log_helper_t* logHelper);
+
+/**
  * @brief destroy the pubsub_serializer_handler and free the used memory.
  */
 void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler);
@@ -128,6 +154,28 @@ uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler,
  */
 size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializer_handler_t* handler);
 
+
+/**
+ * @brief Get the serializer type for this hanlder.
+ *
+ * Valid as long as the handler exists.
+ */
+const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler);
+
+/**
+ * @brief Returns the major version part of a message version.
+ *
+ * Returns -1 if message cannot be found.
+ */
+int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
+ * @brief Returns the minor version part of a message version.
+ *
+ * Returns -1 if message cannot be found.
+ */
+int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
index c64e62e..034547f 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
@@ -22,22 +22,26 @@
 
 typedef struct pubsub_serializer_provider pubsub_serializer_provider_t; //opaque type
 
-
-/**
- * Creates a handler which track bundles and registers pubsub_custom_msg_serialization_service
- * for every descriptor file found for json serialization.
- *
- * Added properties:
- * serialization.type=json
- * targeted.msg.fqn=<descriptor fqn>
- * targeted.msg.id=<msg fqn hash or msg id annotated in the descriptor>
- * targeted.msg.version=<msg version in descriptor if present> (optional)
- * service.ranking=0
- *
- * For descriptor found multiple times (same fqn and version) only the first one is registered
- *
- */
-pubsub_serializer_provider_t* pubsub_providerHandler_create(celix_bundle_context_t* ctx, const char *serializerType /* i.e. json */);
+ /**
+  *
+  * Creates a handler which track bundles and registers pubsub_custom_msg_serialization_service
+  * for every descriptor file found for json serialization.
+  *
+  * Added properties:
+  * serialization.type=json
+  * targeted.msg.fqn=<descriptor fqn>
+  * targeted.msg.id=<msg fqn hash or msg id annotated in the descriptor>
+  * targeted.msg.version=<msg version in descriptor if present> (optional)
+  * service.ranking=0
+  *
+  * For descriptor found multiple times (same fqn and version) only the first one is registered
+  *
+  * @param ctx
+  * @param serializerType
+  * @param backwardsCompatible
+  * @return
+  */
+pubsub_serializer_provider_t* pubsub_providerHandler_create(celix_bundle_context_t* ctx, const char *serializerType /* i.e. json */, bool backwardsCompatible);
 
 void pubsub_providerHandler_destroy(pubsub_serializer_provider_t* handler);
 
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
index b2a78df..6c90743 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
@@ -7,10 +7,10 @@
 
 #include "celix_bundle.h"
 
-#include <pubsub_endpoint.h>
-#include <pubsub_admin.h>
-#include <pubsub_protocol.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_endpoint.h"
+#include "pubsub_protocol.h"
+#include "pubsub_admin.h"
+#include "pubsub_message_serialization_marker.h"
 
 
 struct ps_utils_serializer_selection_data {
@@ -36,22 +36,19 @@ typedef struct ps_utils_retrieve_topic_properties_data {
 static long getPSSerializer(celix_bundle_context_t *ctx, const char *requested_serializer) {
     long svcId = -1L;
 
+    celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
+    opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+
     if (requested_serializer != NULL) {
         char filter[512];
-        int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, requested_serializer);
+        int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, requested_serializer);
         if (written > 512) {
             fprintf(stderr, "Cannot create serializer filter. need more than 512 char array\n");
         } else {
-            celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
-            opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
             opts.filter = filter;
             svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
         }
     } else {
-        celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
-        opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-        opts.ignoreServiceLanguage = true;
-
         //note findService will automatically return the highest ranking service id
         svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
     }
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index cb06107..7bfa5e0 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -22,9 +22,11 @@
 
 #include <string.h>
 
+#include "pubsub_message_serialization_marker.h"
 #include "celix_version.h"
 #include "pubsub_message_serialization_service.h"
 #include "celix_log_helper.h"
+#include "celix_constants.h"
 
 #define L_DEBUG(...) \
     celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
@@ -46,6 +48,7 @@ typedef struct pubsub_serialization_service_entry {
 
 struct pubsub_serializer_handler {
     celix_bundle_context_t* ctx;
+    char* serType;
     bool backwardCompatible;
     long serializationSvcTrackerId;
     celix_log_helper_t *logHelper;
@@ -115,6 +118,7 @@ static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgI
 pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
     pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
     handler->ctx = ctx;
+    handler->serType = celix_utils_strdup(serializerType);
     handler->backwardCompatible = backwardCompatible;
 
     handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
@@ -137,6 +141,51 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_contex
     return handler;
 }
 
+struct pubsub_serializerHandler_callback_data {
+    celix_bundle_context_t* ctx;
+    celix_log_helper_t* logHelper;
+    pubsub_serializer_handler_t* handler;
+};
+
+static void pubsub_serializerHandler_useMarkerSvcCallback(void *handle, void* svc __attribute__((unused)), const celix_properties_t* props) {
+    struct pubsub_serializerHandler_callback_data* data = handle;
+    const char* serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, NULL);
+    bool backwardsCompatible = celix_properties_getAsBool(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE, false);
+    if (serType != NULL) {
+        data->handler = pubsub_serializerHandler_create(data->ctx, serType, backwardsCompatible);
+    } else if (data->logHelper != NULL) {
+        celix_logHelper_error(
+                data->logHelper,
+                "Cannot create serializer handler because service %s does not have a %s service property",
+                PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME,
+                PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY);
+    }
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(celix_bundle_context_t* ctx, long pubsubSerializerMarkerSvcId, celix_log_helper_t* logHelper) {
+    struct pubsub_serializerHandler_callback_data data;
+    memset(&data, 0, sizeof(data));
+    data.ctx = ctx;
+    data.logHelper = logHelper;
+
+    char filter[32];
+    snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, pubsubSerializerMarkerSvcId);
+    celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+    opts.filter.filter = filter;
+    opts.callbackHandle = &data;
+    opts.useWithProperties = pubsub_serializerHandler_useMarkerSvcCallback;
+    bool called = celix_bundleContext_useServiceWithOptions(ctx, &opts);
+    if (!called && logHelper != NULL) {
+        celix_logHelper_error(
+                logHelper,
+                "Cannot find %s service for service id %li",
+                PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME,
+                pubsubSerializerMarkerSvcId);
+    }
+    return data.handler;
+}
+
 
 void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
     if (handler != NULL) {
@@ -155,6 +204,7 @@ void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
         }
         hashMap_destroy(handler->serializationServices, false, false);
         celix_logHelper_destroy(handler->logHelper);
+        free(handler->serType);
         free(handler);
     }
 }
@@ -330,7 +380,6 @@ char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, u
     char *msgFqn = celix_utils_strdup(getMsgFqn(handler, msgId));
     celixThreadRwlock_unlock(&handler->lock);
     return msgFqn;
-
 }
 
 uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler, const char* msgFqn) {
@@ -347,6 +396,29 @@ uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler,
     celixThreadRwlock_unlock(&handler->lock);
     return result;
 }
+int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    celixThreadRwlock_readLock(&handler->lock);
+    int major = -1;
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        major = celix_version_getMajor(entry->msgVersion);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return major;
+}
+
+int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    celixThreadRwlock_readLock(&handler->lock);
+    int major = -1;
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        major = celix_version_getMinor(entry->msgVersion);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return major;
+}
 
 size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializer_handler_t* handler) {
     size_t count = 0;
@@ -358,4 +430,8 @@ size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializ
     }
     celixThreadRwlock_unlock(&handler->lock);
     return count;
+}
+
+const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler) {
+    return handler->serType;
 }
\ No newline at end of file