You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/04/14 08:14:03 UTC

[2/8] celix git commit: CELIX-408: Major refactoring of the pubsub serializer design and usage. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/src/dyn_msg_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/dyn_msg_utils.c b/pubsub/pubsub_common/public/src/dyn_msg_utils.c
deleted file mode 100644
index 1c6bbcd..0000000
--- a/pubsub/pubsub_common/public/src/dyn_msg_utils.c
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * dyn_msg_utils.c
- *
- *  \date       Nov 11, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <dirent.h>
-
-#include "utils.h"
-#include "dyn_message.h"
-
-#include "dyn_msg_utils.h"
-
-#define SYSTEM_BUNDLE_ARCHIVE_PATH 		"CELIX_FRAMEWORK_EXTENDER_PATH"
-
-static char * getMsgDescriptionDir(bundle_pt bundle);
-static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
-
-
-unsigned int uintHash(const void * uintNum) {
-	return *((unsigned int*)uintNum);
-}
-
-int uintEquals(const void * uintNum, const void * toCompare) {
-	return ( (*((unsigned int*)uintNum)) == (*((unsigned int*)toCompare)) );
-}
-
-void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
-
-	char *root = NULL;
-	char *metaInfPath = NULL;
-
-	root = getMsgDescriptionDir(bundle);
-
-	if(root != NULL){
-		asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
-
-		addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
-		addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
-
-		free(metaInfPath);
-		free(root);
-	}
-}
-
-void emptyMsgTypesMap(hash_map_pt msgTypesMap)
-{
-	hash_map_iterator_pt iter = hashMapIterator_create(msgTypesMap);
-
-	while(hashMapIterator_hasNext(iter)){
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-		dynMessage_destroy( ((dyn_message_type *) hashMapEntry_getValue(entry)) );
-	}
-	hashMap_clear(msgTypesMap, true, false);
-	hashMapIterator_destroy(iter);
-}
-
-static char * getMsgDescriptionDir(bundle_pt bundle)
-{
-	char *root = NULL;
-
-	bool isSystemBundle = false;
-	bundle_isSystemBundle(bundle, &isSystemBundle);
-
-	if(isSystemBundle == true) {
-		bundle_context_pt context;
-		bundle_getContext(bundle, &context);
-
-		const char *prop = NULL;
-
-		bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
-
-		if(prop != NULL) {
-			root = strdup(prop);
-		} else {
-			root = getcwd(NULL, 0);
-		}
-	} else {
-		bundle_getEntry(bundle, ".", &root);
-	}
-
-	return root;
-}
-
-
-static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap)
-{
-	char path[128];
-	struct dirent *entry = NULL;
-	DIR *dir = opendir(root);
-
-	if(dir) {
-		entry = readdir(dir);
-	}
-
-	while (entry != NULL) {
-
-		if (strstr(entry->d_name, ".descriptor") != NULL) {
-
-			printf("DMU: Parsing entry '%s'\n", entry->d_name);
-
-			memset(path,0,128);
-			snprintf(path, 128, "%s/%s", root, entry->d_name);
-			FILE *stream = fopen(path,"r");
-
-			if (stream != NULL){
-				dyn_message_type* msgType = NULL;
-
-				int rc = dynMessage_parse(stream, &msgType);
-				if (rc == 0 && msgType!=NULL) {
-
-					char* msgName = NULL;
-					dynMessage_getName(msgType,&msgName);
-
-					if(msgName!=NULL){
-						unsigned int* msgId = malloc(sizeof(unsigned int));
-						*msgId = utils_stringHash(msgName);
-						hashMap_put(msgTypesMap,msgId,msgType);
-					}
-
-				}
-				else{
-					printf("DMU: cannot parse message from descriptor %s\n.",path);
-				}
-				fclose(stream);
-			}else{
-				printf("DMU: cannot open descriptor file %s\n.",path);
-			}
-
-		}
-		entry = readdir(dir);
-	}
-
-	if(dir) {
-		closedir(dir);
-	}
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt b/pubsub/pubsub_serializer_json/CMakeLists.txt
index 61b1cd9..a5798a4 100644
--- a/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -32,7 +32,6 @@ add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson
     SOURCES
     	private/src/ps_activator.c
     	private/src/pubsub_serializer_impl.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
index f026300..5299808 100644
--- a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
+++ b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
@@ -34,32 +34,25 @@
 
 #include "pubsub_serializer.h"
 
-struct _pubsub_message_type {	/* _dyn_message_type */
-	struct namvals_head header;
-	struct namvals_head annotations;
-	struct types_head types;
-	dyn_type *msgType;
-	version_pt msgVersion;
-};
-
-struct pubsub_serializer {
+typedef struct pubsub_serializer {
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
-};
-
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer);
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer);
+} pubsub_serializer_t;
 
-/* Start of serializer specific functions */
-celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
-celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output);
+typedef struct pubsub_msg_serializer_impl {
+    pubsub_msg_serializer_t msgSerializer;
+    dyn_message_type* dynMsg;
+} pubsub_msg_serializer_impl_t;
 
-void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt bundle);
-void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap);
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t* *serializer);
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer);
 
-version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out);
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, pubsub_msg_serializer_map_t* map);
 
+/* Start of serializer specific functions */
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg);
 
 #endif /* PUBSUB_SERIALIZER_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/ps_activator.c b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
index b83c26b..e0e23d4 100644
--- a/pubsub/pubsub_serializer_json/private/src/ps_activator.c
+++ b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
@@ -32,8 +32,8 @@
 #include "pubsub_serializer_impl.h"
 
 struct activator {
-	pubsub_serializer_pt serializer;
-	pubsub_serializer_service_pt serializerService;
+	pubsub_serializer_t* serializer;
+	pubsub_serializer_service_t* serializerService;
 	service_registration_pt registration;
 };
 
@@ -56,24 +56,16 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
 celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
 	celix_status_t status = CELIX_SUCCESS;
 	struct activator *activator = userData;
-	pubsub_serializer_service_pt pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
+	pubsub_serializer_service_t* pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
 
 	if (!pubsubSerializerSvc) {
 		status = CELIX_ENOMEM;
 	}
 	else{
-		pubsubSerializerSvc->serializer = activator->serializer;
-
-		pubsubSerializerSvc->serialize = pubsubSerializer_serialize;
-		pubsubSerializerSvc->deserialize = pubsubSerializer_deserialize;
-
-		pubsubSerializerSvc->fillMsgTypesMap = pubsubSerializer_fillMsgTypesMap;
-		pubsubSerializerSvc->emptyMsgTypesMap = pubsubSerializer_emptyMsgTypesMap;
-
-		pubsubSerializerSvc->getVersion = pubsubSerializer_getVersion;
-		pubsubSerializerSvc->getName = pubsubSerializer_getName;
-		pubsubSerializerSvc->freeMsg = pubsubSerializer_freeMsg;
+		pubsubSerializerSvc->handle = activator->serializer;
 
+		pubsubSerializerSvc->createSerializerMap = (void*)pubsubSerializer_createSerializerMap;
+		pubsubSerializerSvc->destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap;
 		activator->serializerService = pubsubSerializerSvc;
 
 		status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, NULL, &activator->registration);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
index 60d5c98..2dd8258 100644
--- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -26,30 +26,28 @@
 
 #include <stdio.h>
 #include <stdlib.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
 #include <string.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <inttypes.h>
 
-#include "constants.h"
 #include "utils.h"
 #include "hash_map.h"
-#include "array_list.h"
 #include "bundle_context.h"
-#include "bundle.h"
-#include "service_reference.h"
-#include "service_registration.h"
+
 #include "log_helper.h"
-#include "log_service.h"
-#include "service_factory.h"
 
 #include "json_serializer.h"
-#include "dyn_msg_utils.h"
 
 #include "pubsub_serializer_impl.h"
 
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer) {
+#define SYSTEM_BUNDLE_ARCHIVE_PATH 		"CELIX_FRAMEWORK_EXTENDER_PATH"
+
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle_pt bundle);
+
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t** serializer) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	*serializer = calloc(1, sizeof(**serializer));
@@ -70,7 +68,7 @@ celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_seriali
 	return status;
 }
 
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer) {
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	logHelper_stop(serializer->loghelper);
@@ -81,63 +79,197 @@ celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer) {
 	return status;
 }
 
-celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
-	celix_status_t status = CELIX_SUCCESS;
-
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out) {
+    celix_status_t status = CELIX_SUCCESS;
+    pubsub_msg_serializer_map_t* map = calloc(1, sizeof(*map));
+    if (status == CELIX_SUCCESS) {
+        map->bundle = bundle;
+        map->serializers = hashMap_create(NULL, NULL, NULL, NULL);
+        pubsubSerializer_fillMsgSerializerMap(map->serializers, bundle);
+    } else {
+        logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map");
+        status = CELIX_ENOMEM;
+    }
 
-	char *jsonOutput = NULL;
-	int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
-	if (rc != 0){
-		status = CELIX_BUNDLE_EXCEPTION;
-	}
+    if (status == CELIX_SUCCESS) {
+        *out = map;
+    }
+    return status;
+}
 
-	*output = (void *) jsonOutput;
-	*outputLen = strlen(jsonOutput) + 1;
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, pubsub_msg_serializer_map_t* map) {
+    celix_status_t status = CELIX_SUCCESS;
+    if (map == NULL) {
+        return status;
+    }
 
-	return status;
+    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+        pubsub_msg_serializer_impl_t* impl = msgSer->handle;
+        dynMessage_destroy(impl->dynMsg); //note msgSer->name and msgSer->version owned by dynType
+        free(impl); //also contains the service struct.
+    }
+    hashMap_destroy(map->serializers, false, false);
+    free(map);
+    return status;
 }
 
-celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output){
-	celix_status_t status = CELIX_SUCCESS;
 
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen) {
+    celix_status_t status = CELIX_SUCCESS;
 
-	void *textOutput = NULL;
-	int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+    char *jsonOutput = NULL;
+    dyn_type* dynType = NULL;
+    dynMessage_getMessageType(impl->dynMsg, &dynType);
+	int rc = jsonSerializer_serialize(dynType, msg, &jsonOutput);
 	if (rc != 0){
 		status = CELIX_BUNDLE_EXCEPTION;
 	}
-
-	*output = textOutput;
+    if (status == CELIX_SUCCESS) {
+        *out = jsonOutput;
+        *outLen = strlen(jsonOutput) + 1;
+    }
 
 	return status;
 }
 
-void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap, bundle_pt bundle){
-	fillMsgTypesMap(msgTypesMap, bundle);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out) {
+    celix_status_t status = CELIX_SUCCESS;
+    void *msg = NULL;
+    dyn_type* dynType = NULL;
+    dynMessage_getMessageType(impl->dynMsg, &dynType);
+    int rc = jsonSerializer_deserialize(dynType, input, &msg);
+    if (rc != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    }
+    if (status == CELIX_SUCCESS) {
+        *out = msg;
+    }
+	return status;
 }
 
-void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap){
-	emptyMsgTypesMap(msgTypesMap);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg) {
+    dyn_type* dynType = NULL;
+    dynMessage_getMessageType(impl->dynMsg, &dynType);
+    if (dynType != NULL) {
+        dynType_free(dynType, msg);
+    }
 }
 
-version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type *msgType){
-	version_pt msgVersion = NULL;
-	dynMessage_getVersion((dyn_message_type *) msgType, &msgVersion);
-	return msgVersion;
+
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bundle_pt bundle) {
+    char* root = NULL;
+    char* metaInfPath = NULL;
+
+    root = pubsubSerializer_getMsgDescriptionDir(bundle);
+
+    if(root != NULL){
+        asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
+
+        pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
+        pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
+
+        free(metaInfPath);
+        free(root);
+    }
 }
 
-char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType){
-	char *name = NULL;
-	dynMessage_getName((dyn_message_type *) msgType, &name);
-	return name;
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
+{
+    char *root = NULL;
+
+    bool isSystemBundle = false;
+    bundle_isSystemBundle(bundle, &isSystemBundle);
+
+    if(isSystemBundle == true) {
+        bundle_context_pt context;
+        bundle_getContext(bundle, &context);
+
+        const char *prop = NULL;
+
+        bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+
+        if(prop != NULL) {
+            root = strdup(prop);
+        } else {
+            root = getcwd(NULL, 0);
+        }
+    } else {
+        bundle_getEntry(bundle, ".", &root);
+    }
+
+    return root;
 }
 
-void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg){
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
-	dynType_free(type, msg);
+
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgSerializers)
+{
+    char path[128];
+    struct dirent *entry = NULL;
+    DIR *dir = opendir(root);
+
+    if(dir) {
+        entry = readdir(dir);
+    }
+
+    while (entry != NULL) {
+
+        if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+            printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+            memset(path,0,128);
+            snprintf(path, 128, "%s/%s", root, entry->d_name);
+            FILE *stream = fopen(path,"r");
+
+            if (stream != NULL){
+                dyn_message_type* msgType = NULL;
+
+                int rc = dynMessage_parse(stream, &msgType);
+                if (rc == 0 && msgType != NULL) {
+
+                    char* msgName = NULL;
+                    dynMessage_getName(msgType,&msgName);
+
+                    version_pt msgVersion = NULL;
+                    dynMessage_getVersion(msgType, &msgVersion);
+
+                    unsigned int msgId = utils_stringHash(msgName);
+
+                    pubsub_msg_serializer_impl_t* impl = calloc(1, sizeof(*impl));
+                    impl->dynMsg = msgType;
+                    impl->msgSerializer.handle = impl;
+                    impl->msgSerializer.msgId = msgId;
+                    impl->msgSerializer.msgName = msgName;
+                    impl->msgSerializer.msgVersion = msgVersion;
+                    impl->msgSerializer.serialize = (void*) pubsubMsgSerializer_serialize;
+                    impl->msgSerializer.deserialize = (void*) pubsubMsgSerializer_deserialize;
+                    impl->msgSerializer.freeMsg = (void*) pubsubMsgSerializer_freeMsg;
+
+                    bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
+                    if (clash) {
+                        printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
+                    } else if ( msgName != NULL && msgVersion != NULL && msgId != 0) {
+                        hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, &impl->msgSerializer);
+                    } else {
+                        printf("Error adding creating msg serializer\n");
+                    }
+
+                }
+                else{
+                    printf("DMU: cannot parse message from descriptor %s\n.",path);
+                }
+                fclose(stream);
+            }else{
+                printf("DMU: cannot open descriptor file %s\n.",path);
+            }
+
+        }
+        entry = readdir(dir);
+    }
+
+    if(dir) {
+        closedir(dir);
+    }
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 3826a25..36ea422 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -328,7 +328,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle, servic
 	celix_status_t status = CELIX_SUCCESS;
 
 	pubsub_topology_manager_pt manager = handle;
-	pubsub_serializer_service_pt new_serializer = (pubsub_serializer_service_pt) service;
+	pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
 
 	celixThreadMutex_lock(&manager->serializerListLock);
 
@@ -360,7 +360,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle, ser
 	celix_status_t status = CELIX_SUCCESS;
 
 	pubsub_topology_manager_pt manager = handle;
-	pubsub_serializer_service_pt new_serializer = (pubsub_serializer_service_pt) service;
+	pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
 
 	celixThreadMutex_lock(&manager->serializerListLock);
 
@@ -377,7 +377,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle, ser
 
 	if (arrayList_size(manager->serializerList) > 0){
 		//there is another serializer available, change the admin so it is using another serializer
-		pubsub_serializer_service_pt replacing_serializer = (pubsub_serializer_service_pt) arrayList_get(manager->serializerList,0);
+		pubsub_serializer_service_t* replacing_serializer = (pubsub_serializer_service_t*) arrayList_get(manager->serializerList,0);
 
 		for(j=0; j<arrayList_size(manager->psaList); j++){
 			pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,j);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/test/msg_descriptors/msg.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/test/msg_descriptors/msg.descriptor b/pubsub/test/msg_descriptors/msg.descriptor
index 808644c..03b15ba 100644
--- a/pubsub/test/msg_descriptors/msg.descriptor
+++ b/pubsub/test/msg_descriptors/msg.descriptor
@@ -5,4 +5,4 @@ version=1.0.0
 :annotations
 :types
 :message
-{n seqnR}
\ No newline at end of file
+{n seqnR}

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/test/test/tst_activator.cpp
----------------------------------------------------------------------
diff --git a/pubsub/test/test/tst_activator.cpp b/pubsub/test/test/tst_activator.cpp
index 6d957a0..266f73e 100644
--- a/pubsub/test/test/tst_activator.cpp
+++ b/pubsub/test/test/tst_activator.cpp
@@ -145,13 +145,14 @@ TEST_GROUP(PUBSUB_INT_GROUP)
         //check if message are returned
         msg_t initMsg;
         initMsg.seqNr = 0;
+        int count = 0;
         for (int i = 0; i < TRIES; ++i) {
             pthread_mutex_lock(&g_act.mutex);
             g_act.pubSvc->send(g_act.pubSvc->handle, g_act.msgId, &initMsg);
             pthread_mutex_unlock(&g_act.mutex);
             usleep(TIMEOUT);
             pthread_mutex_lock(&g_act.mutex);
-            int count = g_act.count;
+            count = g_act.count;
             pthread_mutex_unlock(&g_act.mutex);
             if (count > 0) {
                 break;
@@ -159,6 +160,7 @@ TEST_GROUP(PUBSUB_INT_GROUP)
                 printf("No return message received, waiting for a while\n");
             }
         }
+        CHECK(count > 0);
 	}
 
 	void teardown() {
@@ -167,7 +169,10 @@ TEST_GROUP(PUBSUB_INT_GROUP)
 };
 
 TEST(PUBSUB_INT_GROUP, sendRecvTest) {
-    g_act.count = 0;
+    pthread_mutex_lock(&g_act.mutex);
+    g_act.count = 0; //reset counter
+    pthread_mutex_unlock(&g_act.mutex);
+
     constexpr int COUNT = 50;
     msg_t msg;
     for (int i = 0; i < COUNT; ++i) {