You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/04/11 19:24:30 UTC

[1/2] celix git commit: CELIX-408: Major refactoring of the pubsub serializer design and usage. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.

Repository: celix
Updated Branches:
  refs/heads/feature/CELIX-408_runtime 97df926c2 -> 7efe4331a


http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/src/dyn_msg_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/dyn_msg_utils.c b/pubsub/pubsub_common/public/src/dyn_msg_utils.c
deleted file mode 100644
index 1c6bbcd..0000000
--- a/pubsub/pubsub_common/public/src/dyn_msg_utils.c
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * dyn_msg_utils.c
- *
- *  \date       Nov 11, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <dirent.h>
-
-#include "utils.h"
-#include "dyn_message.h"
-
-#include "dyn_msg_utils.h"
-
-#define SYSTEM_BUNDLE_ARCHIVE_PATH 		"CELIX_FRAMEWORK_EXTENDER_PATH"
-
-static char * getMsgDescriptionDir(bundle_pt bundle);
-static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
-
-
-unsigned int uintHash(const void * uintNum) {
-	return *((unsigned int*)uintNum);
-}
-
-int uintEquals(const void * uintNum, const void * toCompare) {
-	return ( (*((unsigned int*)uintNum)) == (*((unsigned int*)toCompare)) );
-}
-
-void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
-
-	char *root = NULL;
-	char *metaInfPath = NULL;
-
-	root = getMsgDescriptionDir(bundle);
-
-	if(root != NULL){
-		asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
-
-		addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
-		addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
-
-		free(metaInfPath);
-		free(root);
-	}
-}
-
-void emptyMsgTypesMap(hash_map_pt msgTypesMap)
-{
-	hash_map_iterator_pt iter = hashMapIterator_create(msgTypesMap);
-
-	while(hashMapIterator_hasNext(iter)){
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-		dynMessage_destroy( ((dyn_message_type *) hashMapEntry_getValue(entry)) );
-	}
-	hashMap_clear(msgTypesMap, true, false);
-	hashMapIterator_destroy(iter);
-}
-
-static char * getMsgDescriptionDir(bundle_pt bundle)
-{
-	char *root = NULL;
-
-	bool isSystemBundle = false;
-	bundle_isSystemBundle(bundle, &isSystemBundle);
-
-	if(isSystemBundle == true) {
-		bundle_context_pt context;
-		bundle_getContext(bundle, &context);
-
-		const char *prop = NULL;
-
-		bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
-
-		if(prop != NULL) {
-			root = strdup(prop);
-		} else {
-			root = getcwd(NULL, 0);
-		}
-	} else {
-		bundle_getEntry(bundle, ".", &root);
-	}
-
-	return root;
-}
-
-
-static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap)
-{
-	char path[128];
-	struct dirent *entry = NULL;
-	DIR *dir = opendir(root);
-
-	if(dir) {
-		entry = readdir(dir);
-	}
-
-	while (entry != NULL) {
-
-		if (strstr(entry->d_name, ".descriptor") != NULL) {
-
-			printf("DMU: Parsing entry '%s'\n", entry->d_name);
-
-			memset(path,0,128);
-			snprintf(path, 128, "%s/%s", root, entry->d_name);
-			FILE *stream = fopen(path,"r");
-
-			if (stream != NULL){
-				dyn_message_type* msgType = NULL;
-
-				int rc = dynMessage_parse(stream, &msgType);
-				if (rc == 0 && msgType!=NULL) {
-
-					char* msgName = NULL;
-					dynMessage_getName(msgType,&msgName);
-
-					if(msgName!=NULL){
-						unsigned int* msgId = malloc(sizeof(unsigned int));
-						*msgId = utils_stringHash(msgName);
-						hashMap_put(msgTypesMap,msgId,msgType);
-					}
-
-				}
-				else{
-					printf("DMU: cannot parse message from descriptor %s\n.",path);
-				}
-				fclose(stream);
-			}else{
-				printf("DMU: cannot open descriptor file %s\n.",path);
-			}
-
-		}
-		entry = readdir(dir);
-	}
-
-	if(dir) {
-		closedir(dir);
-	}
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt b/pubsub/pubsub_serializer_json/CMakeLists.txt
index 61b1cd9..a5798a4 100644
--- a/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -32,7 +32,6 @@ add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson
     SOURCES
     	private/src/ps_activator.c
     	private/src/pubsub_serializer_impl.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
index f026300..5299808 100644
--- a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
+++ b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
@@ -34,32 +34,25 @@
 
 #include "pubsub_serializer.h"
 
-struct _pubsub_message_type {	/* _dyn_message_type */
-	struct namvals_head header;
-	struct namvals_head annotations;
-	struct types_head types;
-	dyn_type *msgType;
-	version_pt msgVersion;
-};
-
-struct pubsub_serializer {
+typedef struct pubsub_serializer {
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
-};
-
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer);
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer);
+} pubsub_serializer_t;
 
-/* Start of serializer specific functions */
-celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
-celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output);
+typedef struct pubsub_msg_serializer_impl {
+    pubsub_msg_serializer_t msgSerializer;
+    dyn_message_type* dynMsg;
+} pubsub_msg_serializer_impl_t;
 
-void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt bundle);
-void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap);
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t* *serializer);
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer);
 
-version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out);
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, pubsub_msg_serializer_map_t* map);
 
+/* Start of serializer specific functions */
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg);
 
 #endif /* PUBSUB_SERIALIZER_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/ps_activator.c b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
index b83c26b..e0e23d4 100644
--- a/pubsub/pubsub_serializer_json/private/src/ps_activator.c
+++ b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
@@ -32,8 +32,8 @@
 #include "pubsub_serializer_impl.h"
 
 struct activator {
-	pubsub_serializer_pt serializer;
-	pubsub_serializer_service_pt serializerService;
+	pubsub_serializer_t* serializer;
+	pubsub_serializer_service_t* serializerService;
 	service_registration_pt registration;
 };
 
@@ -56,24 +56,16 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
 celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
 	celix_status_t status = CELIX_SUCCESS;
 	struct activator *activator = userData;
-	pubsub_serializer_service_pt pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
+	pubsub_serializer_service_t* pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
 
 	if (!pubsubSerializerSvc) {
 		status = CELIX_ENOMEM;
 	}
 	else{
-		pubsubSerializerSvc->serializer = activator->serializer;
-
-		pubsubSerializerSvc->serialize = pubsubSerializer_serialize;
-		pubsubSerializerSvc->deserialize = pubsubSerializer_deserialize;
-
-		pubsubSerializerSvc->fillMsgTypesMap = pubsubSerializer_fillMsgTypesMap;
-		pubsubSerializerSvc->emptyMsgTypesMap = pubsubSerializer_emptyMsgTypesMap;
-
-		pubsubSerializerSvc->getVersion = pubsubSerializer_getVersion;
-		pubsubSerializerSvc->getName = pubsubSerializer_getName;
-		pubsubSerializerSvc->freeMsg = pubsubSerializer_freeMsg;
+		pubsubSerializerSvc->handle = activator->serializer;
 
+		pubsubSerializerSvc->createSerializerMap = (void*)pubsubSerializer_createSerializerMap;
+		pubsubSerializerSvc->destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap;
 		activator->serializerService = pubsubSerializerSvc;
 
 		status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, NULL, &activator->registration);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
index 60d5c98..2dd8258 100644
--- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -26,30 +26,28 @@
 
 #include <stdio.h>
 #include <stdlib.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
 #include <string.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <inttypes.h>
 
-#include "constants.h"
 #include "utils.h"
 #include "hash_map.h"
-#include "array_list.h"
 #include "bundle_context.h"
-#include "bundle.h"
-#include "service_reference.h"
-#include "service_registration.h"
+
 #include "log_helper.h"
-#include "log_service.h"
-#include "service_factory.h"
 
 #include "json_serializer.h"
-#include "dyn_msg_utils.h"
 
 #include "pubsub_serializer_impl.h"
 
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer) {
+#define SYSTEM_BUNDLE_ARCHIVE_PATH 		"CELIX_FRAMEWORK_EXTENDER_PATH"
+
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle_pt bundle);
+
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t** serializer) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	*serializer = calloc(1, sizeof(**serializer));
@@ -70,7 +68,7 @@ celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_seriali
 	return status;
 }
 
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer) {
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	logHelper_stop(serializer->loghelper);
@@ -81,63 +79,197 @@ celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer) {
 	return status;
 }
 
-celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
-	celix_status_t status = CELIX_SUCCESS;
-
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out) {
+    celix_status_t status = CELIX_SUCCESS;
+    pubsub_msg_serializer_map_t* map = calloc(1, sizeof(*map));
+    if (status == CELIX_SUCCESS) {
+        map->bundle = bundle;
+        map->serializers = hashMap_create(NULL, NULL, NULL, NULL);
+        pubsubSerializer_fillMsgSerializerMap(map->serializers, bundle);
+    } else {
+        logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map");
+        status = CELIX_ENOMEM;
+    }
 
-	char *jsonOutput = NULL;
-	int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
-	if (rc != 0){
-		status = CELIX_BUNDLE_EXCEPTION;
-	}
+    if (status == CELIX_SUCCESS) {
+        *out = map;
+    }
+    return status;
+}
 
-	*output = (void *) jsonOutput;
-	*outputLen = strlen(jsonOutput) + 1;
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, pubsub_msg_serializer_map_t* map) {
+    celix_status_t status = CELIX_SUCCESS;
+    if (map == NULL) {
+        return status;
+    }
 
-	return status;
+    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+        pubsub_msg_serializer_impl_t* impl = msgSer->handle;
+        dynMessage_destroy(impl->dynMsg); //note msgSer->name and msgSer->version owned by dynType
+        free(impl); //also contains the service struct.
+    }
+    hashMap_destroy(map->serializers, false, false);
+    free(map);
+    return status;
 }
 
-celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output){
-	celix_status_t status = CELIX_SUCCESS;
 
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen) {
+    celix_status_t status = CELIX_SUCCESS;
 
-	void *textOutput = NULL;
-	int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+    char *jsonOutput = NULL;
+    dyn_type* dynType = NULL;
+    dynMessage_getMessageType(impl->dynMsg, &dynType);
+	int rc = jsonSerializer_serialize(dynType, msg, &jsonOutput);
 	if (rc != 0){
 		status = CELIX_BUNDLE_EXCEPTION;
 	}
-
-	*output = textOutput;
+    if (status == CELIX_SUCCESS) {
+        *out = jsonOutput;
+        *outLen = strlen(jsonOutput) + 1;
+    }
 
 	return status;
 }
 
-void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap, bundle_pt bundle){
-	fillMsgTypesMap(msgTypesMap, bundle);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out) {
+    celix_status_t status = CELIX_SUCCESS;
+    void *msg = NULL;
+    dyn_type* dynType = NULL;
+    dynMessage_getMessageType(impl->dynMsg, &dynType);
+    int rc = jsonSerializer_deserialize(dynType, input, &msg);
+    if (rc != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    }
+    if (status == CELIX_SUCCESS) {
+        *out = msg;
+    }
+	return status;
 }
 
-void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap){
-	emptyMsgTypesMap(msgTypesMap);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg) {
+    dyn_type* dynType = NULL;
+    dynMessage_getMessageType(impl->dynMsg, &dynType);
+    if (dynType != NULL) {
+        dynType_free(dynType, msg);
+    }
 }
 
-version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type *msgType){
-	version_pt msgVersion = NULL;
-	dynMessage_getVersion((dyn_message_type *) msgType, &msgVersion);
-	return msgVersion;
+
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bundle_pt bundle) {
+    char* root = NULL;
+    char* metaInfPath = NULL;
+
+    root = pubsubSerializer_getMsgDescriptionDir(bundle);
+
+    if(root != NULL){
+        asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
+
+        pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
+        pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
+
+        free(metaInfPath);
+        free(root);
+    }
 }
 
-char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType){
-	char *name = NULL;
-	dynMessage_getName((dyn_message_type *) msgType, &name);
-	return name;
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
+{
+    char *root = NULL;
+
+    bool isSystemBundle = false;
+    bundle_isSystemBundle(bundle, &isSystemBundle);
+
+    if(isSystemBundle == true) {
+        bundle_context_pt context;
+        bundle_getContext(bundle, &context);
+
+        const char *prop = NULL;
+
+        bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+
+        if(prop != NULL) {
+            root = strdup(prop);
+        } else {
+            root = getcwd(NULL, 0);
+        }
+    } else {
+        bundle_getEntry(bundle, ".", &root);
+    }
+
+    return root;
 }
 
-void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg){
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
-	dynType_free(type, msg);
+
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgSerializers)
+{
+    char path[128];
+    struct dirent *entry = NULL;
+    DIR *dir = opendir(root);
+
+    if(dir) {
+        entry = readdir(dir);
+    }
+
+    while (entry != NULL) {
+
+        if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+            printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+            memset(path,0,128);
+            snprintf(path, 128, "%s/%s", root, entry->d_name);
+            FILE *stream = fopen(path,"r");
+
+            if (stream != NULL){
+                dyn_message_type* msgType = NULL;
+
+                int rc = dynMessage_parse(stream, &msgType);
+                if (rc == 0 && msgType != NULL) {
+
+                    char* msgName = NULL;
+                    dynMessage_getName(msgType,&msgName);
+
+                    version_pt msgVersion = NULL;
+                    dynMessage_getVersion(msgType, &msgVersion);
+
+                    unsigned int msgId = utils_stringHash(msgName);
+
+                    pubsub_msg_serializer_impl_t* impl = calloc(1, sizeof(*impl));
+                    impl->dynMsg = msgType;
+                    impl->msgSerializer.handle = impl;
+                    impl->msgSerializer.msgId = msgId;
+                    impl->msgSerializer.msgName = msgName;
+                    impl->msgSerializer.msgVersion = msgVersion;
+                    impl->msgSerializer.serialize = (void*) pubsubMsgSerializer_serialize;
+                    impl->msgSerializer.deserialize = (void*) pubsubMsgSerializer_deserialize;
+                    impl->msgSerializer.freeMsg = (void*) pubsubMsgSerializer_freeMsg;
+
+                    bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
+                    if (clash) {
+                        printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
+                    } else if ( msgName != NULL && msgVersion != NULL && msgId != 0) {
+                        hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, &impl->msgSerializer);
+                    } else {
+                        printf("Error adding creating msg serializer\n");
+                    }
+
+                }
+                else{
+                    printf("DMU: cannot parse message from descriptor %s\n.",path);
+                }
+                fclose(stream);
+            }else{
+                printf("DMU: cannot open descriptor file %s\n.",path);
+            }
+
+        }
+        entry = readdir(dir);
+    }
+
+    if(dir) {
+        closedir(dir);
+    }
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 3826a25..36ea422 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -328,7 +328,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle, servic
 	celix_status_t status = CELIX_SUCCESS;
 
 	pubsub_topology_manager_pt manager = handle;
-	pubsub_serializer_service_pt new_serializer = (pubsub_serializer_service_pt) service;
+	pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
 
 	celixThreadMutex_lock(&manager->serializerListLock);
 
@@ -360,7 +360,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle, ser
 	celix_status_t status = CELIX_SUCCESS;
 
 	pubsub_topology_manager_pt manager = handle;
-	pubsub_serializer_service_pt new_serializer = (pubsub_serializer_service_pt) service;
+	pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
 
 	celixThreadMutex_lock(&manager->serializerListLock);
 
@@ -377,7 +377,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle, ser
 
 	if (arrayList_size(manager->serializerList) > 0){
 		//there is another serializer available, change the admin so it is using another serializer
-		pubsub_serializer_service_pt replacing_serializer = (pubsub_serializer_service_pt) arrayList_get(manager->serializerList,0);
+		pubsub_serializer_service_t* replacing_serializer = (pubsub_serializer_service_t*) arrayList_get(manager->serializerList,0);
 
 		for(j=0; j<arrayList_size(manager->psaList); j++){
 			pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,j);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/test/msg_descriptors/msg.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/test/msg_descriptors/msg.descriptor b/pubsub/test/msg_descriptors/msg.descriptor
index 808644c..03b15ba 100644
--- a/pubsub/test/msg_descriptors/msg.descriptor
+++ b/pubsub/test/msg_descriptors/msg.descriptor
@@ -5,4 +5,4 @@ version=1.0.0
 :annotations
 :types
 :message
-{n seqnR}
\ No newline at end of file
+{n seqnR}

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/test/test/tst_activator.cpp
----------------------------------------------------------------------
diff --git a/pubsub/test/test/tst_activator.cpp b/pubsub/test/test/tst_activator.cpp
index 6d957a0..266f73e 100644
--- a/pubsub/test/test/tst_activator.cpp
+++ b/pubsub/test/test/tst_activator.cpp
@@ -145,13 +145,14 @@ TEST_GROUP(PUBSUB_INT_GROUP)
         //check if message are returned
         msg_t initMsg;
         initMsg.seqNr = 0;
+        int count = 0;
         for (int i = 0; i < TRIES; ++i) {
             pthread_mutex_lock(&g_act.mutex);
             g_act.pubSvc->send(g_act.pubSvc->handle, g_act.msgId, &initMsg);
             pthread_mutex_unlock(&g_act.mutex);
             usleep(TIMEOUT);
             pthread_mutex_lock(&g_act.mutex);
-            int count = g_act.count;
+            count = g_act.count;
             pthread_mutex_unlock(&g_act.mutex);
             if (count > 0) {
                 break;
@@ -159,6 +160,7 @@ TEST_GROUP(PUBSUB_INT_GROUP)
                 printf("No return message received, waiting for a while\n");
             }
         }
+        CHECK(count > 0);
 	}
 
 	void teardown() {
@@ -167,7 +169,10 @@ TEST_GROUP(PUBSUB_INT_GROUP)
 };
 
 TEST(PUBSUB_INT_GROUP, sendRecvTest) {
-    g_act.count = 0;
+    pthread_mutex_lock(&g_act.mutex);
+    g_act.count = 0; //reset counter
+    pthread_mutex_unlock(&g_act.mutex);
+
     constexpr int COUNT = 50;
     msg_t msg;
     for (int i = 0; i < COUNT; ++i) {


[2/2] celix git commit: CELIX-408: Major refactoring of the pubsub serializer design and usage. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.

Posted by pn...@apache.org.
CELIX-408: Major refactoring of the pubsub serializer design and usage. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.

- The serializer is refactored to miminize to needed callback directly to the services. The serializer is now a two step approach.
  The serializer service can be used to create a map os msg serializers.
  And the msg serializer are serializer structs (with function ptrs) to serialize a specific msg
- Where feasible replace _pt types with _t types. Removing the pointer in the typedef makes it possible to add const info and sizeof calls of the typedef


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7efe4331
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7efe4331
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7efe4331

Branch: refs/heads/feature/CELIX-408_runtime
Commit: 7efe4331aafdfea1df8bd181c136f20acaf52b49
Parents: 97df926
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Tue Apr 11 21:19:10 2017 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Tue Apr 11 21:19:10 2017 +0200

----------------------------------------------------------------------
 cmake/cmake_celix/BundlePackaging.cmake         |   2 +-
 dfi/private/src/json_serializer.c               |   8 +-
 dfi/public/include/json_serializer.h            |   4 +-
 pubsub/pubsub_admin_udp_mc/CMakeLists.txt       |   1 -
 .../private/include/pubsub_admin_impl.h         |   6 +-
 .../include/pubsub_publish_service_private.h    |   8 +-
 .../private/include/topic_subscription.h        |   6 +-
 .../private/src/pubsub_admin_impl.c             |   4 +-
 .../private/src/topic_publication.c             | 173 +++++++-------
 .../private/src/topic_subscription.c            | 158 +++++++------
 pubsub/pubsub_admin_zmq/CMakeLists.txt          |   1 -
 .../private/include/pubsub_admin_impl.h         |   6 +-
 .../include/pubsub_publish_service_private.h    |   6 +-
 .../private/include/topic_subscription.h        |   6 +-
 .../private/src/pubsub_admin_impl.c             |   4 +-
 .../private/src/topic_publication.c             | 137 ++++++-----
 .../private/src/topic_subscription.c            | 184 ++++++---------
 .../public/include/dyn_msg_utils.h              |  39 ----
 .../pubsub_common/public/include/pubsub_admin.h |   4 +-
 .../public/include/pubsub_serializer.h          |  47 ++--
 pubsub/pubsub_common/public/src/dyn_msg_utils.c | 160 -------------
 pubsub/pubsub_serializer_json/CMakeLists.txt    |   1 -
 .../private/include/pubsub_serializer_impl.h    |  35 ++-
 .../private/src/ps_activator.c                  |  20 +-
 .../private/src/pubsub_serializer_impl.c        | 234 +++++++++++++++----
 .../private/src/pubsub_topology_manager.c       |   6 +-
 pubsub/test/msg_descriptors/msg.descriptor      |   2 +-
 pubsub/test/test/tst_activator.cpp              |   9 +-
 28 files changed, 582 insertions(+), 689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/cmake/cmake_celix/BundlePackaging.cmake
----------------------------------------------------------------------
diff --git a/cmake/cmake_celix/BundlePackaging.cmake b/cmake/cmake_celix/BundlePackaging.cmake
index ded1ca5..7eb42fa 100644
--- a/cmake/cmake_celix/BundlePackaging.cmake
+++ b/cmake/cmake_celix/BundlePackaging.cmake
@@ -319,7 +319,7 @@ function(bundle_libs)
             if (ADD_TO_MANIFEST)
                 list(APPEND LIBS "$<TARGET_SONAME_FILE_NAME:${LIB}>")
             endif()
-                list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>.
+            list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>.
         endif()
 
         get_target_property(IS_LIB ${BUNDLE} "BUNDLE_TARGET_IS_LIB")

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/private/src/json_serializer.c
----------------------------------------------------------------------
diff --git a/dfi/private/src/json_serializer.c b/dfi/private/src/json_serializer.c
index 0c06998..c1cd339 100644
--- a/dfi/private/src/json_serializer.c
+++ b/dfi/private/src/json_serializer.c
@@ -280,7 +280,7 @@ static int jsonSerializer_parseSequence(dyn_type *seq, json_t *array, void *seqL
     return status;
 }
 
-int jsonSerializer_serialize(dyn_type *type, void *input, char **output) {
+int jsonSerializer_serialize(dyn_type *type, const void* input, char **output) {
     int status = OK;
 
     json_t *root = NULL;
@@ -294,11 +294,11 @@ int jsonSerializer_serialize(dyn_type *type, void *input, char **output) {
     return status;
 }
 
-int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out) {
-    return jsonSerializer_writeAny(type, input, out);
+int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out) {
+    return jsonSerializer_writeAny(type, (void*)input /*TODO update static function to take const void**/, out);
 }
 
-static int jsonSerializer_writeAny(dyn_type *type, void *input, json_t **out) {
+static int jsonSerializer_writeAny(dyn_type *type, void* input, json_t **out) {
     int status = OK;
 
     int descriptor = dynType_descriptorType(type);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/public/include/json_serializer.h
----------------------------------------------------------------------
diff --git a/dfi/public/include/json_serializer.h b/dfi/public/include/json_serializer.h
index c785b01..2f91f2b 100644
--- a/dfi/public/include/json_serializer.h
+++ b/dfi/public/include/json_serializer.h
@@ -31,7 +31,7 @@ DFI_SETUP_LOG_HEADER(jsonSerializer);
 int jsonSerializer_deserialize(dyn_type *type, const char *input, void **result);
 int jsonSerializer_deserializeJson(dyn_type *type, json_t *input, void **result);
 
-int jsonSerializer_serialize(dyn_type *type, void *input, char **output);
-int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out);
+int jsonSerializer_serialize(dyn_type *type, const void* input, char **output);
+int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out);
 
 #endif

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index ce32db0..1ac0c2d 100644
--- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -35,7 +35,6 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
     	private/src/topic_subscription.c
     	private/src/topic_publication.c
     	private/src/large_udp.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
 )

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
index 9eddf15..89e6547 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -33,7 +33,7 @@
 
 struct pubsub_admin {
 
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
@@ -73,7 +73,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
 celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
 celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
 
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
index 81690b8..57d942a 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
@@ -39,17 +39,17 @@ typedef struct pubsub_udp_msg {
     struct pubsub_msg_header header;
     unsigned int payloadSize;
     char payload[];
-} *pubsub_udp_msg_pt;
+} pubsub_udp_msg_t;
 
 typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
index 36c902e..1535ae5 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
@@ -38,7 +38,7 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -49,8 +49,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 6e9f4e8..ebfe3e6 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -645,7 +645,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
 	return status;
 }
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 	admin->serializerSvc = serializerSvc;
 
@@ -673,7 +673,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
 	return status;
 }
 
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 	admin->serializerSvc = NULL;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index aa3faf0..be0a433 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -37,7 +37,6 @@
 #include "array_list.h"
 #include "celixbool.h"
 #include "service_registration.h"
-#include "dyn_msg_utils.h"
 #include "utils.h"
 #include "service_factory.h"
 #include "version.h"
@@ -59,7 +58,7 @@ struct topic_publication {
 	hash_map_pt boundServices; //<bundle_pt,bound_service>
 	celix_thread_mutex_t tp_lock;
 	struct sockaddr_in destAddr;
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
@@ -68,27 +67,27 @@ typedef struct publish_bundle_bound_service {
 	bundle_pt bundle;
     char *scope;
 	char *topic;
-	hash_map_pt msgTypes;
 	unsigned short getCount;
 	celix_thread_mutex_t mp_lock;
 	bool mp_send_in_progress;
 	array_list_pt mp_parts;
 	largeUdp_pt largeUdpHandle;
-}* publish_bundle_bound_service_pt;
+	pubsub_msg_serializer_map_t* map;
+} publish_bundle_bound_service_t;
 
 typedef struct pubsub_msg{
 	pubsub_msg_header_pt header;
 	char* payload;
 	int payloadSize;
-}* pubsub_msg_pt;
+} pubsub_msg_t;
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
 static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc);
 
 static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
 
@@ -98,7 +97,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out){
 
     char* ep = malloc(EP_ADDRESS_LEN);
     memset(ep,0,EP_ADDRESS_LEN);
@@ -136,7 +135,7 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
 	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 	while(hashMapIterator_hasNext(iter)){
-		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+		publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter);
 		pubsub_destroyPublishBundleBoundService(bound);
 	}
 	hashMapIterator_destroy(iter);
@@ -223,41 +222,49 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
 	return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&(pub->tp_lock));
 
-	pub->serializerSvc = serializerSvc;
+    //clear old serializer
+    if (pub->serializerSvc != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle , value = svc
+        while (hashMapIterator_hasNext(&iter)) {
+            publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+			bound->map = NULL;
+        }
+    }
 
-	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(bs_iter)){
-		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-		if (hashMap_size(boundSvc->msgTypes) == 0){
-			pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle);
-		}
+    //setup new serializer
+	pub->serializerSvc = serializerSvc;
+	hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+	while (hashMapIterator_hasNext(&iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+		bundle_pt bundle = hashMapEntry_getKey(entry);
+		publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry);
+		pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
 	}
-	hashMapIterator_destroy(bs_iter);
 
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
 	return status;
 }
 
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&(pub->tp_lock));
-
-	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(bs_iter)){
-		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-		pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes);
-	}
-	hashMapIterator_destroy(bs_iter);
-
+    if (pub->serializerSvc == svc) {
+        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+            pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+			bound->map = NULL;
+        }
+    }
 	pub->serializerSvc = NULL;
-
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
 	return status;
@@ -275,18 +282,18 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-	if(bound==NULL){
-		bound = pubsub_createPublishBundleBoundService(publish,bundle);
-		if(bound!=NULL){
-			hashMap_put(publish->boundServices,bundle,bound);
+	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle);
+	if (bound == NULL) {
+		bound = pubsub_createPublishBundleBoundService(publish, bundle);
+		if (bound != NULL) {
+			hashMap_put(publish->boundServices, bundle, bound);
 		}
 	}
-	else{
+	else {
 		bound->getCount++;
 	}
 
-	if(bound!=NULL){
+	if (bound != NULL) {
 		*service = bound->service;
 	}
 
@@ -301,17 +308,16 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-	if(bound!=NULL){
+	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle);
+	if (bound != NULL) {
 
 		bound->getCount--;
-		if(bound->getCount==0){
+		if (bound->getCount == 0) {
 			pubsub_destroyPublishBundleBoundService(bound);
 			hashMap_remove(publish->boundServices,bundle);
 		}
-
 	}
-	else{
+	else {
 		long bundleId = -1;
 		bundle_getBundleId(bundle,&bundleId);
 		printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
@@ -325,12 +331,11 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 	return CELIX_SUCCESS;
 }
 
-static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt msg, bool last, pubsub_release_callback_t *releaseCallback){
+static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
 	const int iovec_len = 3; // header + size + payload
 	bool ret = true;
-	pubsub_udp_msg_pt udpMsg;
 
-	int compiledMsgSize = sizeof(*udpMsg) + msg->payloadSize;
+	int compiledMsgSize = sizeof(pubsub_udp_msg_t) + msg->payloadSize;
 
 	struct iovec msg_iovec[iovec_len];
 	msg_iovec[0].iov_base = msg->header;
@@ -348,51 +353,51 @@ static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt
 	    ret = false;
 	}
 
-	//free(udpMsg);
 	if(releaseCallback) {
 	    releaseCallback->release(msg->payload, bound);
 	}
 	return ret;
-
 }
 
 
 static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
     int status = 0;
-    publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+    publish_bundle_bound_service_t* bound = handle;
 
     celixThreadMutex_lock(&(bound->parent->tp_lock));
     celixThreadMutex_lock(&(bound->mp_lock));
 
-    //TODO //FIXME -> should use pointer to int as identifier, can be many pointers to int ....
-    printf("TODO FIX usage of msg id's in the serializer hashmap. This seems wrongly based on pointers to uints!!!!\n");
-    pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
-    int major=0, minor=0;
+    pubsub_msg_serializer_t *msgSer = NULL;
+    if (bound->map != NULL) {
+        msgSer = hashMap_get(bound->map->serializers, (void *)(uintptr_t)msgTypeId);
+    }
 
-    if (msgType != NULL && bound->parent->serializerSvc != NULL) {
+    if (bound->map == NULL) {
+        printf("TP: Serializer is not set!\n");
+    } else if (msgSer == NULL ){
+        printf("TP: No msg serializer available for msg type id %d\n", msgTypeId);
+    }
 
-    	version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType);
+    int major=0, minor=0;
 
+    if (msgSer != NULL) {
 		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
-
 		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
 		msg_hdr->type = msgTypeId;
-		if (msgVersion != NULL){
-			version_getMajor(msgVersion, &major);
-			version_getMinor(msgVersion, &minor);
+		if (msgSer->msgVersion != NULL){
+			version_getMajor(msgSer->msgVersion, &major);
+			version_getMinor(msgSer->msgVersion, &minor);
 			msg_hdr->major = major;
 			msg_hdr->minor = minor;
 		}
 
-		void* serializedOutput = NULL;
-		int serializedOutputLen = 0;
-		bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen);
+		char* serializedOutput = NULL;
+		size_t serializedOutputLen = 0;
+        msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen);
 
-		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+		pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
 		msg->header = msg_hdr;
-		msg->payload = (char *) serializedOutput;
+		msg->payload = serializedOutput;
 		msg->payloadSize = serializedOutputLen;
 
 		if(send_pubsub_msg(bound, msg,true, NULL) == false) {
@@ -403,11 +408,7 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 		free(serializedOutput);
 
     } else {
-		if (bound->parent->serializerSvc == NULL) {
-			printf("TP: Serializer is not set!\n");
-		} else {
-			printf("TP: Message %u not supported.\n",msgTypeId);
-		}
+        printf("TP: Message %u not supported.\n",msgTypeId);
         status=-1;
     }
 
@@ -430,9 +431,9 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
 
 }
 
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle) {
 
-	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+	publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
 	if (bound != NULL) {
 		bound->service = calloc(1, sizeof(*bound->service));
@@ -445,7 +446,6 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->getCount = 1;
 		bound->mp_send_in_progress = false;
 		celixThreadMutex_create(&bound->mp_lock,NULL);
-		bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type>
 		arrayList_create(&bound->mp_parts);
 
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
@@ -457,10 +457,10 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->service->send = pubsub_topicPublicationSend;
 		bound->service->sendMultipart = NULL;  //Multipart not supported (jet) for UDP
 
-		if (tp->serializerSvc != NULL){
-			tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle);
+        //TODO check if lock on tp is needed? (e.g. is lock already done by caller?)
+		if (tp->serializerSvc != NULL) {
+            tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
 		}
-
 	}
 	else
 	{
@@ -474,30 +474,33 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 	return bound;
 }
 
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc) {
 
 	celixThreadMutex_lock(&boundSvc->mp_lock);
 
-	if(boundSvc->service != NULL){
+	if (boundSvc->service != NULL) {
 		free(boundSvc->service);
 	}
 
-	if(boundSvc->msgTypes != NULL){
-		if (boundSvc->parent->serializerSvc != NULL){
-			boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes);
-		}
-		hashMap_destroy(boundSvc->msgTypes,false,false);
-	}
+    //TODO check if lock on parent is needed, e.g. does the caller already lock?
+    if (boundSvc->map != NULL) {
+        if (boundSvc->parent->serializerSvc == NULL) {
+            printf("TP: Cannot destroy pubsub msg serializer map. No serliazer service\n");
+        } else {
+            boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map);
+            boundSvc->map = NULL;
+        }
+    }
 
-	if(boundSvc->mp_parts!=NULL){
+	if (boundSvc->mp_parts!=NULL) {
 		arrayList_destroy(boundSvc->mp_parts);
 	}
 
-    if(boundSvc->scope!=NULL){
+    if (boundSvc->scope!=NULL) {
         free(boundSvc->scope);
     }
 
-    if(boundSvc->topic!=NULL){
+    if (boundSvc->topic!=NULL) {
 		free(boundSvc->topic);
 	}
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 91bce9f..da23b21 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -49,7 +49,6 @@
 #include "topic_subscription.h"
 #include "subscriber.h"
 #include "publisher.h"
-#include "dyn_msg_utils.h"
 #include "pubsub_publish_service_private.h"
 #include "large_udp.h"
 
@@ -68,11 +67,15 @@ struct topic_subscription{
 	celix_thread_mutex_t ts_lock;
 	bundle_context_pt context;
 	int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
-	hash_map_pt servicesMap; // key = service, value = msg types map
+
+    //NOTE. using a service ptr can be dangerous, because pointer can be reused.
+    //ensuring that pointer are removed before new (refurbish) pionter comes along is crucial!
+	hash_map_pt msgSerializersMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
+
 	hash_map_pt socketMap; // key = URL, value = listen-socket
 	unsigned int nrSubscribers;
 	largeUdp_pt largeUdpHandle;
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 
 };
 
@@ -94,7 +97,7 @@ static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
 	topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
@@ -115,7 +118,7 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl
 
 	celixThreadMutex_create(&ts->ts_lock,NULL);
 	arrayList_create(&ts->sub_ep_list);
-	ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+	ts->msgSerializersMap = hashMap_create(NULL, NULL, NULL, NULL);
 	ts->socketMap =  hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
 	ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
@@ -161,7 +164,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	serviceTracker_destroy(ts->tracker);
 	arrayList_clear(ts->sub_ep_list);
 	arrayList_destroy(ts->sub_ep_list);
-	hashMap_destroy(ts->servicesMap,false,false);
+	hashMap_destroy(ts->msgSerializersMap,false,false);
 
 	hashMap_destroy(ts->socketMap,false,false);
 	largeUdp_destroy(ts->largeUdpHandle);
@@ -391,7 +394,7 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
@@ -403,50 +406,39 @@ celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p
 	return status;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
-	hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
-	while(hashMapIterator_hasNext(svc_iter)){
-		hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter);
-		if (hashMap_size(msgTypes) > 0){
-			ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+	if (ts->serializerSvc == serializerSvc) { //only act if svc removed is services used
+		hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializersMap);
+		while (hashMapIterator_hasNext(&iter)) {
+            pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
 		}
+		ts->serializerSvc = NULL;
 	}
-	hashMapIterator_destroy(svc_iter);
-
-	ts->serializerSvc = NULL;
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
 }
 
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc){
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (!hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
+	if (!hashMap_containsKey(ts->msgSerializersMap, svc)) {
 		bundle_pt bundle = NULL;
 		serviceReference_getBundle(reference, &bundle);
 
-		if (ts->serializerSvc != NULL){
-			ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle);
-		}
-
-		if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
-			hashMap_destroy(msgTypes,false,false);
-			printf("TS: Unsupported subscriber!\n");
-		}
-		else{
-			hashMap_put(ts->servicesMap, service, msgTypes);
+		if (ts->serializerSvc != NULL) {
+            pubsub_msg_serializer_map_t* map = NULL;
+            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+            if (map != NULL) {
+                hashMap_put(ts->msgSerializersMap, svc, map);
+            }
 		}
-
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
 	printf("TS: New subscriber registered.\n");
@@ -454,18 +446,16 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
 
 }
 
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc){
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
 
-	celixThreadMutex_lock(&ts->ts_lock);
-	if (hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
-		if(msgTypes!=NULL){
-			if (ts->serializerSvc != NULL){
-				ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
-			}
-			hashMap_destroy(msgTypes,false,false);
+
+    celixThreadMutex_lock(&ts->ts_lock);
+	if (hashMap_containsKey(ts->msgSerializersMap, svc)) {
+		pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializersMap, svc);
+		if (ts->serializerSvc != NULL){
+			ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
 		}
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
@@ -475,59 +465,51 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 }
 
 
-static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
+static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){
 
-	hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
-	while (hashMapIterator_hasNext(iter)) {
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializersMap);
+	celixThreadMutex_lock(&sub->ts_lock);
+	while (hashMapIterator_hasNext(&iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
 		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+		pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
 
-		pubsub_message_type *msgType = hashMap_get(msgTypes,&(msg->header.type));
+		pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void *)(uintptr_t )msg->header.type);
 
-		if (msgType == NULL) {
+		if (msgSer == NULL) {
 			printf("TS: Primary message %d not supported. NOT receiving any part of the whole message.\n",msg->header.type);
-		}
-		else if (sub->serializerSvc == NULL){
-			printf("TS: No active serializer service found!\n");
-		}
-		else{
+		} else {
 			void *msgInst = NULL;
-			char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
-			version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
-			bool validVersion = checkVersion(msgVersion,&msg->header);
-
+			bool validVersion = checkVersion(msgSer->msgVersion, &msg->header);
 			if(validVersion){
-				celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) msg->payload, &msgInst);
-
+                celix_status_t status = msgSer->deserialize(msgSer->handle, msg->payload, 0, &msgInst);
 				if (status == CELIX_SUCCESS) {
 					bool release = true;
 					pubsub_multipart_callbacks_t mp_callbacks;
-					mp_callbacks.handle = sub;
+					mp_callbacks.handle = map;
 					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
 					mp_callbacks.getMultipart = NULL;
 
-					subsvc->receive(subsvc->handle, name, msg->header.type, msgInst, &mp_callbacks, &release);
-					if(release){
-						sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
+					subsvc->receive(subsvc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+					if (release) {
+                        msgSer->freeMsg(msgSer->handle, msgInst);
 					}
 				}
 				else{
-					printf("TS: Cannot deserialize msgType %s.\n",name);
+					printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName);
 				}
 
 			}
-			else{
+			else {
 				int major=0,minor=0;
-				version_getMajor(msgVersion,&major);
-				version_getMinor(msgVersion,&minor);
-				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,msg->header.major,msg->header.minor);
+				version_getMajor(msgSer->msgVersion, &major);
+				version_getMinor(msgSer->msgVersion, &minor);
+				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+                       msgSer->msgName, major, minor, msg->header.major, msg->header.minor);
 			}
-
 		}
 	}
-	hashMapIterator_destroy(iter);
+	celixThreadMutex_unlock(&sub->ts_lock);
 }
 
 static void* udp_recv_thread_func(void * arg) {
@@ -555,7 +537,7 @@ static void* udp_recv_thread_func(void * arg) {
             unsigned int size;
             if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
                 // Handle data
-                pubsub_udp_msg_pt udpMsg = NULL;
+                pubsub_udp_msg_t* udpMsg = NULL;
                 if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
                 	printf("TS: ERROR largeUdp_read with index %d\n", index);
                 	continue;
@@ -577,19 +559,19 @@ static void* udp_recv_thread_func(void * arg) {
 }
 
 
-static void sigusr1_sighandler(int signo){
+static void sigusr1_sighandler(int signo) {
 	printf("TS: Topic subscription being shut down...\n");
 	return;
 }
 
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) {
 	bool check=false;
 	int major=0,minor=0;
 
-	if(msgVersion!=NULL){
+	if (msgVersion!=NULL) {
 		version_getMajor(msgVersion,&major);
 		version_getMinor(msgVersion,&minor);
-		if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+		if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
 			check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
 		}
 	}
@@ -597,8 +579,24 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
 	return check;
 }
 
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = utils_stringHash(msgType);
-	return 0;
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* out) {
+    pubsub_msg_serializer_map_t* map = handle;
+    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
+    unsigned int msgTypeId = 0;
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+        if (strncmp(msgSer->msgName, msgType, 1024 * 1024) == 0) {
+            msgTypeId = msgSer->msgId;
+            break;
+        }
+    }
+
+    if (msgTypeId == 0) {
+        printf("Cannot find msg type id for msgType %s\n", msgType);
+        return -1;
+    } else {
+        *out = msgTypeId;
+        return 0;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 956830d..49eba87 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -52,7 +52,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    	${ZMQ_CRYPTO_C}
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
-	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 	)
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index 2f81bff..3c36986 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -49,7 +49,7 @@
 
 struct pubsub_admin {
 
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
@@ -89,7 +89,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
 celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
 celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
 
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
index b6b76c6..158dfe7 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
@@ -34,14 +34,14 @@
 
 typedef struct topic_publication *topic_publication_pt;
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index c6fe93a..1fbbaaf 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -38,7 +38,7 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -51,8 +51,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 09fcd8c..5c9a5d5 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -666,7 +666,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
 	return status;
 }
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 	admin->serializerSvc = serializerSvc;
 
@@ -694,7 +694,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
 	return status;
 }
 
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 	admin->serializerSvc = NULL;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index bb8ff56..2e95874 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -47,7 +47,6 @@
 #include "version.h"
 
 #include "pubsub_common.h"
-#include "dyn_msg_utils.h"
 #include "pubsub_utils.h"
 #include "publisher.h"
 
@@ -70,7 +69,7 @@ struct topic_publication {
 	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 	hash_map_pt boundServices; //<bundle_pt,bound_service>
 	celix_thread_mutex_t tp_lock;
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
@@ -78,26 +77,26 @@ typedef struct publish_bundle_bound_service {
 	pubsub_publisher_pt service;
 	bundle_pt bundle;
 	char *topic;
-	hash_map_pt msgTypes;
+	pubsub_msg_serializer_map_t* map;
 	unsigned short getCount;
 	celix_thread_mutex_t mp_lock;
 	bool mp_send_in_progress;
 	array_list_pt mp_parts;
-}* publish_bundle_bound_service_pt;
+} publish_bundle_bound_service_t;
 
-typedef struct pubsub_msg{
+typedef struct pubsub_msg {
 	pubsub_msg_header_pt header;
 	char* payload;
 	int payloadSize;
-}* pubsub_msg_pt;
+} pubsub_msg_t;
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
 static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc);
 
 static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
 static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags);
@@ -105,7 +104,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 
 static void delay_first_send_for_late_joiners(void);
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -235,7 +234,7 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
 	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 	while(hashMapIterator_hasNext(iter)){
-		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+		publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter);
 		pubsub_destroyPublishBundleBoundService(bound);
 	}
 	hashMapIterator_destroy(iter);
@@ -332,43 +331,50 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
 	return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&(pub->tp_lock));
 
-	pub->serializerSvc = serializerSvc;
-
-	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(bs_iter)){
-		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-		if (hashMap_size(boundSvc->msgTypes) == 0){
-			pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle);
+	//clearing pref serializer
+	if (pub->serializerSvc != NULL) {
+		hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+		while (hashMapIterator_hasNext(&iter)) {
+			publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+			bound->map = NULL;
 		}
 	}
-	hashMapIterator_destroy(bs_iter);
+
+	pub->serializerSvc = serializerSvc;
+	hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+	while (hashMapIterator_hasNext(&iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+		bundle_pt bundle = hashMapEntry_getKey(entry);
+		publish_bundle_bound_service_t* boundSvc = hashMapEntry_getValue(entry);
+		pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &boundSvc->map);
+	}
 
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
 	return status;
 }
 
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){
 	celix_status_t status = CELIX_SUCCESS;
-
 	celixThreadMutex_lock(&(pub->tp_lock));
 
-	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(bs_iter)){
-		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-		pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes);
+	if (pub->serializerSvc == svc) {
+		hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+		while (hashMapIterator_hasNext(&iter)) {
+			publish_bundle_bound_service_t *boundSvc = hashMapIterator_nextValue(&iter);
+			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, boundSvc->map);
+			boundSvc->map = NULL;
+		}
+		pub->serializerSvc = NULL;
 	}
-	hashMapIterator_destroy(bs_iter);
-
-	pub->serializerSvc = NULL;
 
 	celixThreadMutex_unlock(&(pub->tp_lock));
-
 	return status;
 }
 
@@ -384,7 +390,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle);
 	if(bound==NULL){
 		bound = pubsub_createPublishBundleBoundService(publish,bundle);
 		if(bound!=NULL){
@@ -410,7 +416,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle);
 	if(bound!=NULL){
 
 		bound->getCount--;
@@ -434,7 +440,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 	return CELIX_SUCCESS;
 }
 
-static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
+static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_t* msg, bool last){
 
 	bool ret = true;
 
@@ -474,7 +480,7 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
 	unsigned int i = 0;
 	unsigned int mp_num = arrayList_size(mp_msg_parts);
 	for(;i<mp_num;i++){
-		ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
+		ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_t*)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
 	}
 	arrayList_clear(mp_msg_parts);
 
@@ -489,10 +495,8 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 }
 
 static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags){
-
 	int status = 0;
-
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+	publish_bundle_bound_service_t* bound = handle;
 
 	celixThreadMutex_lock(&(bound->mp_lock));
 	if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
@@ -501,38 +505,33 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
 		return -3;
 	}
 
-	pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
+	pubsub_msg_serializer_t* msgSer = NULL;
+	if (bound->map != NULL) {
+		msgSer = hashMap_get(bound->map->serializers, (void*)(uintptr_t)msgTypeId);
+	}
 	int major=0, minor=0;
 
-	if (msgType != NULL && bound->parent->serializerSvc != NULL) {
-
-		version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType);
-
+	if (msgSer != NULL) {
 		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
-
 		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
 		msg_hdr->type = msgTypeId;
-		if (msgVersion != NULL){
-			version_getMajor(msgVersion, &major);
-			version_getMinor(msgVersion, &minor);
+		if (msgSer->msgVersion != NULL){
+			version_getMajor(msgSer->msgVersion, &major);
+			version_getMinor(msgSer->msgVersion, &minor);
 			msg_hdr->major = major;
 			msg_hdr->minor = minor;
 		}
 
-		void* serializedOutput = NULL;
-		int serializedOutputLen = 0;
-		bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen);
-
-		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+		char* serializedOutput = NULL;
+		size_t serializedOutputLen = 0;
+		msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen);
+		pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
 		msg->header = msg_hdr;
 		msg->payload = (char *) serializedOutput;
 		msg->payloadSize = serializedOutputLen;
-
 		bool snd = true;
 
-		switch(flags){
+		switch (flags) {
 		case PUBSUB_PUBLISHER_FIRST_MSG:
 			bound->mp_send_in_progress = true;
 			arrayList_add(bound->mp_parts,msg);
@@ -602,9 +601,9 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
 
 }
 
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
 
-	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+	publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
 	if (bound != NULL) {
 		bound->service = calloc(1, sizeof(*bound->service));
@@ -617,7 +616,11 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->getCount = 1;
 		bound->mp_send_in_progress = false;
 		celixThreadMutex_create(&bound->mp_lock,NULL);
-		bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type>
+
+		//TODO check if lock is needed. e.g. was the caller already locked?
+		if (tp->serializerSvc != NULL) {
+			tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
+		}
 		arrayList_create(&bound->mp_parts);
 
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
@@ -628,10 +631,6 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->service->send = pubsub_topicPublicationSend;
 		bound->service->sendMultipart = pubsub_topicPublicationSendMultipart;
 
-		if (tp->serializerSvc != NULL){
-			tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle);
-		}
-
 	}
 	else
 	{
@@ -645,26 +644,24 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 	return bound;
 }
 
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc){
 
 	celixThreadMutex_lock(&boundSvc->mp_lock);
 
-	if(boundSvc->service != NULL){
+	if (boundSvc->service != NULL) {
 		free(boundSvc->service);
 	}
 
-	if(boundSvc->msgTypes != NULL){
-		if (boundSvc->parent->serializerSvc != NULL){
-			boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes);
-		}
-		hashMap_destroy(boundSvc->msgTypes,false,false);
+	if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
+		boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map);
+		boundSvc->map = NULL;
 	}
 
-	if(boundSvc->mp_parts!=NULL){
+	if (boundSvc->mp_parts!=NULL) {
 		arrayList_destroy(boundSvc->mp_parts);
 	}
 
-	if(boundSvc->topic!=NULL){
+	if (boundSvc->topic!=NULL) {
 		free(boundSvc->topic);
 	}
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 3de56af..7ef2c5d 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -46,7 +46,6 @@
 
 #include "subscriber.h"
 #include "publisher.h"
-#include "dyn_msg_utils.h"
 #include "pubsub_utils.h"
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -58,8 +57,7 @@
 #define POLL_TIMEOUT  	250
 #define ZMQ_POLL_TIMEOUT_MS_ENV 	"ZMQ_POLL_TIMEOUT_MS"
 
-struct topic_subscription{
-
+struct topic_subscription {
 	zsock_t* zmq_socket;
 	zcert_t * zmq_cert;
 	zcert_t * zmq_pub_cert;
@@ -71,26 +69,25 @@ struct topic_subscription{
 	celix_thread_mutex_t ts_lock;
 	bundle_context_pt context;
 
-	hash_map_pt servicesMap; // key = service, value = msg types map
+	hash_map_pt msgSerializers; // key = service ptr, value = pubsub_msg_serializer_map_t*
 	array_list_pt pendingConnections;
 	array_list_pt pendingDisconnections;
 
 	celix_thread_mutex_t pendingConnections_lock;
 	celix_thread_mutex_t pendingDisconnections_lock;
 	unsigned int nrSubscribers;
-	pubsub_serializer_service_pt serializerSvc;
-
+	pubsub_serializer_service_t* serializerSvc;
 };
 
-typedef struct complete_zmq_msg{
+typedef struct complete_zmq_msg {
 	zframe_t* header;
 	zframe_t* payload;
 }* complete_zmq_msg_pt;
 
-typedef struct mp_handle{
-	hash_map_pt svc_msg_db;
+typedef struct mp_handle {
+	pubsub_msg_serializer_map_t* map;
 	hash_map_pt rcv_msg_map;
-}* mp_handle_pt;
+} mp_handle_t;
 
 typedef struct msg_map_entry{
 	bool retain;
@@ -104,12 +101,12 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
 static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part);
-static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list);
-static void destroy_mp_handle(mp_handle_pt mp_handle);
+static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list);
+static void destroy_mp_handle(mp_handle_t* mp_handle);
 static void connectPendingPublishers(topic_subscription_pt sub);
 static void disconnectPendingPublishers(topic_subscription_pt sub);
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -223,7 +220,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 	celixThreadMutex_create(&ts->socket_lock, NULL);
 	celixThreadMutex_create(&ts->ts_lock,NULL);
 	arrayList_create(&ts->sub_ep_list);
-	ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+	ts->msgSerializers = hashMap_create(NULL, NULL, NULL, NULL);
 	arrayList_create(&ts->pendingConnections);
 	arrayList_create(&ts->pendingDisconnections);
 	celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
@@ -269,7 +266,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	serviceTracker_destroy(ts->tracker);
 	arrayList_clear(ts->sub_ep_list);
 	arrayList_destroy(ts->sub_ep_list);
-	hashMap_destroy(ts->servicesMap,false,false);
+	hashMap_destroy(ts->msgSerializers,false,false);
 
 	celixThreadMutex_lock(&ts->pendingConnections_lock);
 	arrayList_destroy(ts->pendingConnections);
@@ -429,7 +426,7 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
@@ -441,22 +438,18 @@ celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p
 	return status;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* svc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
-	hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
-	while(hashMapIterator_hasNext(svc_iter)){
-		hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter);
-		if (hashMap_size(msgTypes) > 0){
-			ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+	if (ts->serializerSvc == svc) {
+		hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializers);
+		while(hashMapIterator_hasNext(&iter)){
+			pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
+			ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
 		}
 	}
-	hashMapIterator_destroy(svc_iter);
-
 	ts->serializerSvc = NULL;
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -467,24 +460,19 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (!hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
+	if (!hashMap_containsKey(ts->msgSerializers, service)) {
 		bundle_pt bundle = NULL;
 		serviceReference_getBundle(reference, &bundle);
 
-		if (ts->serializerSvc != NULL){
-			ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle);
-		}
-
-		if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
-			hashMap_destroy(msgTypes,false,false);
-			printf("TS: Unsupported subscriber!\n");
-		}
-		else{
-			hashMap_put(ts->servicesMap, service, msgTypes);
+		if (ts->serializerSvc != NULL) {
+			pubsub_msg_serializer_map_t* map = NULL;
+			ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+			if (map != NULL) {
+				hashMap_put(ts->msgSerializers, service, map);
+			} else {
+				printf("TS: Cannot create msg serializer map\n");
+			}
 		}
-
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
 	printf("TS: New subscriber registered.\n");
@@ -497,14 +485,9 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
-		if(msgTypes!=NULL){
-			if (ts->serializerSvc != NULL){
-				ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
-			}
-			hashMap_destroy(msgTypes,false,false);
-		}
+	if (hashMap_containsKey(ts->msgSerializers, service)) {
+		pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializers, service);
+        ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
 
@@ -513,66 +496,55 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 }
 
 
-static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
+static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) {
 
 	pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
 
-	hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
-	while (hashMapIterator_hasNext(iter)) {
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializers);
+	while (hashMapIterator_hasNext(&iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
 		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+		pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
 
-		pubsub_message_type *msgType = hashMap_get(msgTypes,&(first_msg_hdr->type));
-		if (msgType == NULL) {
-			printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type);
-		}
-		else if (sub->serializerSvc == NULL){
-			printf("TS: No active serializer found!\n");
-		}
-		else{
+		pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t )first_msg_hdr->type);
+		if (msgSer == NULL) {
+			printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n", first_msg_hdr->type);
+		} else {
 			void *msgInst = NULL;
-			char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
-			version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
-			bool validVersion = checkVersion(msgVersion,first_msg_hdr);
-
+			bool validVersion = checkVersion(msgSer->msgVersion, first_msg_hdr);
 			if(validVersion){
-
-				celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
+				celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst);
 
 				if (status == CELIX_SUCCESS) {
 					bool release = true;
 
-					mp_handle_pt mp_handle = create_mp_handle(sub, msgTypes,msg_list);
+					mp_handle_t* mp_handle = create_mp_handle(sub, map, msg_list);
 					pubsub_multipart_callbacks_t mp_callbacks;
 					mp_callbacks.handle = mp_handle;
 					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
 					mp_callbacks.getMultipart = pubsub_getMultipart;
-					subsvc->receive(subsvc->handle, name, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
+					subsvc->receive(subsvc->handle, msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
 
-					if(release){
-						sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
+					if (release) {
+						msgSer->freeMsg(msgSer->handle, msgInst);
 					}
-					if(mp_handle!=NULL){
+					if (mp_handle!=NULL) {
 						destroy_mp_handle(mp_handle);
 					}
 				}
 				else{
-					printf("TS: Cannot deserialize msgType %s.\n",name);
+					printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName);
 				}
 
-			}
-			else{
+			} else {
 				int major=0,minor=0;
-				version_getMajor(msgVersion,&major);
-				version_getMinor(msgVersion,&minor);
-				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
+				version_getMajor(msgSer->msgVersion, &major);
+				version_getMinor(msgSer->msgVersion, &minor);
+				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+					   msgSer->msgName, major, minor, first_msg_hdr->major, first_msg_hdr->minor);
 			}
-
 		}
 	}
-	hashMapIterator_destroy(iter);
 
 	int i = 0;
 	for(;i<arrayList_size(msg_list);i++){
@@ -737,7 +709,7 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain
 		return -1;
 	}
 
-	mp_handle_pt mp_handle = (mp_handle_pt)handle;
+	mp_handle_t* mp_handle = handle;
 	msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId);
 	if(entry!=NULL){
 		entry->retain = retain;
@@ -753,63 +725,55 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain
 
 }
 
-static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){
+static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list) {
 
 	if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message
 		return NULL;
 	}
 
-	mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
-	mp_handle->svc_msg_db = svc_msg_db;
-	mp_handle->rcv_msg_map = hashMap_create(uintHash, NULL, uintEquals, NULL);
-
-	int i=1; //We skip the first message, it will be handle differently
-	for(;i<arrayList_size(rcv_msg_list);i++){
-		complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
+	mp_handle_t* mp_handle = calloc(1,sizeof(struct mp_handle));
+	mp_handle->map = map;
+	mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL);
 
+	int i; //We skip the first message, it will be handle differently
+	for (i=1 ; i<arrayList_size(rcv_msg_list) ; i++) {
+		complete_zmq_msg_pt c_msg = arrayList_get(rcv_msg_list,i);
 		pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header);
 
-		pubsub_message_type *msgType = hashMap_get(svc_msg_db,&(header->type));
-		if (msgType != NULL && sub->serializerSvc != NULL) {
+		pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t)(header->type));
+		if (msgSer != NULL) {
 			void *msgInst = NULL;
-			version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
-			bool validVersion = checkVersion(msgVersion,header);
-
+			bool validVersion = checkVersion(msgSer->msgVersion, header);
 			if(validVersion){
-				celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
+				//TODO make the getMultipart lazy?
+				celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(c_msg->payload), 0, &msgInst);
 
 				if(status == CELIX_SUCCESS){
-					unsigned int* msgId = calloc(1,sizeof(unsigned int));
-					*msgId = header->type;
 					msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry));
 					entry->msgInst = msgInst;
-					hashMap_put(mp_handle->rcv_msg_map,msgId,entry);
+					hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)(header->type), entry);
 				}
 			}
 		}
-
 	}
-
 	return mp_handle;
-
 }
 
-static void destroy_mp_handle(mp_handle_pt mp_handle){
+static void destroy_mp_handle(mp_handle_t* mp_handle){
 
 	hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map);
 	while(hashMapIterator_hasNext(iter)){
 		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-		unsigned int* msgId = (unsigned int*)hashMapEntry_getKey(entry);
+		unsigned int msgId = (unsigned int)(uintptr_t)hashMapEntry_getKey(entry);
 		msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
-		pubsub_message_type* msgType = hashMap_get(mp_handle->svc_msg_db,msgId);
-		if(msgType!=NULL){
-			if(!msgEntry->retain){
-				free(msgEntry->msgInst);
+		pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->map->serializers, (void*)(uintptr_t)msgId);
+		if (msgSer != NULL) {
+			if (!msgEntry->retain) {
+				msgSer->freeMsg(msgSer->handle, msgEntry->msgInst);
 			}
 		}
 		else{
-			printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n",*msgId);
+			printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n", msgId);
 		}
 	}
 	hashMapIterator_destroy(iter);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/dyn_msg_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/dyn_msg_utils.h b/pubsub/pubsub_common/public/include/dyn_msg_utils.h
deleted file mode 100644
index 71085ab..0000000
--- a/pubsub/pubsub_common/public/include/dyn_msg_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * dyn_msg_utils.h
- *
- *  \date       Nov 11, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef DYN_MSG_UTILS_H_
-#define DYN_MSG_UTILS_H_
-
-#include "bundle.h"
-#include "hash_map.h"
-
-unsigned int uintHash(const void * uintNum);
-int uintEquals(const void * uintNum, const void * toCompare);
-
-void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle);
-void emptyMsgTypesMap(hash_map_pt msgTypesMap);
-
-#endif /* DYN_MSG_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h b/pubsub/pubsub_common/public/include/pubsub_admin.h
index 52cb75c..f7ab7e0 100644
--- a/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ b/pubsub/pubsub_common/public/include/pubsub_admin.h
@@ -56,8 +56,8 @@ struct pubsub_admin_service {
 	celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
 	celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
 
-	celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
-	celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+	celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+	celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
 
 };
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h b/pubsub/pubsub_common/public/include/pubsub_serializer.h
index f2df075..e9f9f6c 100644
--- a/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -24,33 +24,44 @@
  *  \copyright	Apache License, Version 2.0
  */
 
-#ifndef PUBSUB_SERIALIZER_H_
-#define PUBSUB_SERIALIZER_H_
+#ifndef PUBSUB_SERIALIZER_SERVICE_H_
+#define PUBSUB_SERIALIZER_SERVICE_H_
 
 #include "service_reference.h"
 
 #include "pubsub_common.h"
 
-typedef struct _pubsub_message_type pubsub_message_type;
-
-typedef struct pubsub_serializer *pubsub_serializer_pt;
-
-struct pubsub_serializer_service {
+/**
+ * There should be a pubsub_serializer_t
+ * per msg type (msg id) per bundle
+ *
+ * The pubsub_serializer_service can create
+ * a serializer_map per bundle. Potentially using
+ * the extender pattern.
+ */
+typedef struct pubsub_msg_serializer {
+    void* handle;
+    unsigned int msgId;
+    const char* msgName;
+    version_pt msgVersion;
 
-	pubsub_serializer_pt serializer;
+    celix_status_t (*serialize)(void* handle, const void* input, char** out, size_t* outLen);
+    celix_status_t (*deserialize)(void* handle, const char* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
 
-	celix_status_t (*serialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
-	celix_status_t (*deserialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output);
+    void (*freeMsg)(void* handle, void* msg);
+} pubsub_msg_serializer_t;
 
-	void (*fillMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt bundle);
-	void (*emptyMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap);
+typedef struct pubsub_msg_serializer_map {
+    bundle_pt bundle;
+    hash_map_pt serializers; //key = msg id (unsigned int), value = pubsub_serializer_t*
+} pubsub_msg_serializer_map_t;
 
-	version_pt (*getVersion)(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-	char* (*getName)(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-	void (*freeMsg)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg);
+typedef struct pubsub_serializer_service {
+	void* handle;
 
-};
+	celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, pubsub_msg_serializer_map_t** out);
+    celix_status_t (*destroySerializerMap)(void* handle, pubsub_msg_serializer_map_t* map);
 
-typedef struct pubsub_serializer_service *pubsub_serializer_service_pt;
+} pubsub_serializer_service_t;
 
-#endif /* PUBSUB_SERIALIZER_H_ */
+#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */