You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/11/12 08:39:17 UTC

celix git commit: CELIX-454: Refactors pubsub msg serializer so that it uses the same handle approach as services to prevent confusion.

Repository: celix
Updated Branches:
  refs/heads/feature/CELIX-454-pubsub-disc 2b8f92dfa -> 88a575f51


CELIX-454: Refactors pubsub msg serializer so that it uses the same handle approach as services to prevent confusion.

Also fixes a mem issues due to confusion howto call the msg serializer with a handle.


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/88a575f5
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/88a575f5
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/88a575f5

Branch: refs/heads/feature/CELIX-454-pubsub-disc
Commit: 88a575f5153d6051e420861e6808642f87068068
Parents: 2b8f92d
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Mon Nov 12 09:38:12 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Mon Nov 12 09:38:12 2018 +0100

----------------------------------------------------------------------
 bundles/pubsub/CMakeLists.txt                   |  2 +-
 .../src/pubsub_udpmc_topic_receiver.c           |  4 +-
 .../src/pubsub_udpmc_topic_sender.c             |  2 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_admin.c     |  3 +-
 .../src/pubsub_zmq_topic_receiver.c             |  6 +-
 .../src/pubsub_zmq_topic_sender.c               |  2 +-
 .../src/ps_json_serializer_activator.c          |  8 +--
 .../src/pubsub_serializer_impl.c                | 76 +++++++++++++-------
 .../src/pubsub_serializer_impl.h                | 18 ++---
 .../pubsub_spi/include/pubsub_serializer.h      |  1 +
 bundles/pubsub/test/CMakeLists.txt              | 38 +++++-----
 bundles/pubsub/test/meta_data/msg.descriptor    |  2 +-
 bundles/pubsub/test/meta_data/ping.properties   |  1 -
 bundles/pubsub/test/test/sut_activator.c        |  7 +-
 bundles/pubsub/test/test/tst_activator.cc       | 12 +++-
 15 files changed, 106 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index e3db995..79f8168 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -39,7 +39,7 @@ if (PUBSUB)
 	if (ENABLE_TESTING)
 		option(BUILD_PUBSUB_TESTS "Enable Tests for PUBSUB" OFF)
 	endif()
-	if (ENABLE_TESTING AND BUILD_PUBSUB_TESTS AND BUILD_PUBSUB_PSA_ZMQ)
+	if (ENABLE_TESTING AND BUILD_PUBSUB_TESTS)
 		add_subdirectory(test)
 	endif()
 

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index a2f37d4..e1e5a42 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -392,7 +392,7 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
 
             if(validVersion){
 
-                celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst);
+                celix_status_t status = msgSer->deserialize(msgSer->handle, (const void *)msg->payload, 0, &msgInst);
 
                 if (status == CELIX_SUCCESS) {
                     bool release = true;
@@ -400,7 +400,7 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
                     svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &release);
 
                     if(release){
-                        msgSer->freeMsg(msgSer,msgInst);
+                        msgSer->freeMsg(msgSer->handle, msgInst);
                     }
                 }
                 else{

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index ddc6172..55bf96b 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -255,7 +255,7 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
 
         void* serializedOutput = NULL;
         size_t serializedOutputLen = 0;
-        if (msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen) == CELIX_SUCCESS) {
+        if (msgSer->serialize(msgSer->handle,inMsg,&serializedOutput, &serializedOutputLen) == CELIX_SUCCESS) {
 
             pubsub_msg_header_t *msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
             strncpy(msg_hdr->topic,entry->parent->topic,MAX_TOPIC_LEN-1);

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index f0f2b23..c5bfb99 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -274,6 +274,8 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
 
     celixThreadMutex_lock(&psa->serializers.mutex);
     psa_zmq_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
     if (entry != NULL) {
         celixThreadMutex_lock(&psa->topicSenders.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
@@ -305,7 +307,6 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
 
         free(entry);
     }
-    celixThreadMutex_unlock(&psa->serializers.mutex);
 }
 
 celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index bb190cc..6886109 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -33,7 +33,6 @@
 #include "pubsub_zmq_common.h"
 #include "../../pubsub_topology_manager/src/pubsub_topology_manager.h"
 
-//TODO see if block and wakeup (reset) also works
 #define PSA_ZMQ_RECV_TIMEOUT 1000
 
 
@@ -410,6 +409,7 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
 }
 
 static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, const pubsub_zmq_msg_header_t *hdr, const byte* payload, size_t payloadSize) {
+    //NOTE receiver->subscribers.mutex locked
     pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type));
     pubsub_subscriber_t *svc = entry->svc;
 
@@ -417,8 +417,8 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
         void *deserializedMsg = NULL;
         bool validVersion = psa_zmq_checkVersion(msgSer->msgVersion, hdr);
         if (validVersion) {
-            celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
-            if(status == CELIX_SUCCESS) {
+            celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg);
+            if (status == CELIX_SUCCESS) {
                 bool release = true;
                 svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
                 if (release) {

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index ed40d9f..d87412e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -385,7 +385,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
         void *serializedOutput = NULL;
         size_t serializedOutputLen = 0;
-        status = msgSer->serialize(msgSer, inMsg, &serializedOutput, &serializedOutputLen);
+        status = msgSer->serialize(msgSer->handle, inMsg, &serializedOutput, &serializedOutputLen);
         if (status == CELIX_SUCCESS) {
             zmsg_t *msg = zmsg_new();
             //TODO revert to use zmq_msg_init_data (or something like that) for zero copy for the payload

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
index b74bab8..d59c752 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
@@ -24,8 +24,8 @@
 #include "pubsub_serializer_impl.h"
 
 typedef struct psjs_activator {
-	pubsub_serializer_t* serializer;
-	;
+	pubsub_json_serializer_t* serializer;
+
 	pubsub_serializer_service_t serializerSvc;
 	long serializerSvcId;
 } psjs_activator_t;
@@ -37,8 +37,8 @@ static int psjs_start(psjs_activator_t *act, celix_bundle_context_t *ctx) {
 	if (status == CELIX_SUCCESS) {
 		act->serializerSvc.handle = act->serializer;
 
-		act->serializerSvc.createSerializerMap = (void*)pubsubSerializer_createSerializerMap;
-		act->serializerSvc.destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap;
+		act->serializerSvc.createSerializerMap = pubsubSerializer_createSerializerMap;
+		act->serializerSvc.destroySerializerMap = pubsubSerializer_destroySerializerMap;
 
 		/* Set serializer type */
 		celix_properties_t *props = celix_properties_create();

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
index 685d499..2569ee7 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
@@ -16,13 +16,6 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * pubsub_serializer_impl.c
- *
- *  \date       Mar 24, 2017
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -44,11 +37,31 @@
 #define SYSTEM_BUNDLE_ARCHIVE_PATH 		"CELIX_FRAMEWORK_EXTENDER_PATH"
 #define MAX_PATH_LEN    1024
 
+struct pubsub_json_serializer {
+	bundle_context_pt bundle_context;
+	log_helper_pt loghelper;
+};
+
+
+/* Start of serializer specific functions */
+static celix_status_t pubsubMsgSerializer_serialize(void* handle, const void* msg, void** out, size_t *outLen);
+static celix_status_t pubsubMsgSerializer_deserialize(void* handle, const void* input, size_t inputLen, void **out);
+static void pubsubMsgSerializer_freeMsg(void* handle, void *msg);
+
+
+typedef struct pubsub_json_msg_serializer_impl {
+	dyn_message_type *msgType;
+
+	unsigned int msgId;
+	const char* msgName;
+	version_pt msgVersion;
+} pubsub_json_msg_serializer_impl_t;
+
 static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
 static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
 static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle_pt bundle);
 
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t** serializer) {
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_json_serializer_t** serializer) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	*serializer = calloc(1, sizeof(**serializer));
@@ -69,7 +82,7 @@ celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_seriali
 	return status;
 }
 
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
+celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	logHelper_stop(serializer->loghelper);
@@ -80,8 +93,9 @@ celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
 	return status;
 }
 
-celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap) {
+celix_status_t pubsubSerializer_createSerializerMap(void *handle, bundle_pt bundle, hash_map_pt* serializerMap) {
 	celix_status_t status = CELIX_SUCCESS;
+	pubsub_json_serializer_t *serializer = handle;
 
 	hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
 
@@ -98,8 +112,9 @@ celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* seriali
 	return status;
 }
 
-celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, hash_map_pt serializerMap) {
+celix_status_t pubsubSerializer_destroySerializerMap(void* handle __attribute__((unused)), hash_map_pt serializerMap) {
 	celix_status_t status = CELIX_SUCCESS;
+	//pubsub_json_serializer_t *serializer = handle;
 	if (serializerMap == NULL) {
 		return CELIX_ILLEGAL_ARGUMENT;
 	}
@@ -107,9 +122,11 @@ celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serial
 	hash_map_iterator_t iter = hashMapIterator_construct(serializerMap);
 	while (hashMapIterator_hasNext(&iter)) {
 		pubsub_msg_serializer_t* msgSerializer = hashMapIterator_nextValue(&iter);
-		dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+		pubsub_json_msg_serializer_impl_t *impl = msgSerializer->handle;
+		dyn_message_type *dynMsg = impl->msgType;
 		dynMessage_destroy(dynMsg); //note msgSer->name and msgSer->version owned by dynType
 		free(msgSerializer); //also contains the service struct.
+		free(impl);
 	}
 
 	hashMap_destroy(serializerMap, false, false);
@@ -118,12 +135,14 @@ celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serial
 }
 
 
-celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen) {
+celix_status_t pubsubMsgSerializer_serialize(void *handle, const void* msg, void** out, size_t *outLen) {
 	celix_status_t status = CELIX_SUCCESS;
 
+	pubsub_json_msg_serializer_impl_t *impl = handle;
+
 	char *jsonOutput = NULL;
 	dyn_type* dynType = NULL;
-	dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+	dyn_message_type *dynMsg = impl->msgType;
 	dynMessage_getMessageType(dynMsg, &dynType);
 
 	if (jsonSerializer_serialize(dynType, msg, &jsonOutput) != 0){
@@ -138,12 +157,12 @@ celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSeriali
 	return status;
 }
 
-celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out) {
-
+celix_status_t pubsubMsgSerializer_deserialize(void* handle, const void* input, size_t inputLen, void **out) {
 	celix_status_t status = CELIX_SUCCESS;
+	pubsub_json_msg_serializer_impl_t *impl = handle;
 	void *msg = NULL;
 	dyn_type* dynType = NULL;
-	dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+	dyn_message_type *dynMsg = impl->msgType;
 	dynMessage_getMessageType(dynMsg, &dynType);
 
 	if (jsonSerializer_deserialize(dynType, (const char*)input, &msg) != 0) {
@@ -156,9 +175,10 @@ celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSeria
 	return status;
 }
 
-void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg) {
+void pubsubMsgSerializer_freeMsg(void* handle, void *msg) {
+	pubsub_json_msg_serializer_impl_t *impl = handle;
 	dyn_type* dynType = NULL;
-	dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+	dyn_message_type *dynMsg = impl->msgType;
 	dynMessage_getMessageType(dynMsg, &dynType);
 	if (dynType != NULL) {
 		dynType_free(dynType, msg);
@@ -246,12 +266,18 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
 
 						unsigned int msgId = utils_stringHash(msgName);
 
-						pubsub_msg_serializer_t *msgSerializer = calloc(1,sizeof(pubsub_msg_serializer_t));
+						pubsub_msg_serializer_t *msgSerializer = calloc(1,sizeof(*msgSerializer));
+						pubsub_json_msg_serializer_impl_t *impl = calloc(1, sizeof(*impl));
+
+						impl->msgType = msgType;
+						impl->msgId = msgId;
+						impl->msgName = msgName;
+						impl->msgVersion = msgVersion;
 
-						msgSerializer->handle = msgType;
-						msgSerializer->msgId = msgId;
-						msgSerializer->msgName = msgName;
-						msgSerializer->msgVersion = msgVersion;
+						msgSerializer->handle = impl;
+						msgSerializer->msgId = impl->msgId;
+						msgSerializer->msgName = impl->msgName;
+						msgSerializer->msgVersion = impl->msgVersion;
 						msgSerializer->serialize = (void*) pubsubMsgSerializer_serialize;
 						msgSerializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
 						msgSerializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
@@ -260,6 +286,7 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
 						if (clash){
 							printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
 							free(msgSerializer);
+							free(impl);
 							dynMessage_destroy(msgType);
 						}
 						else if (msgId != 0){
@@ -268,6 +295,7 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
 						}
 						else{
 							printf("Error creating msg serializer\n");
+							free(impl);
 							free(msgSerializer);
 							dynMessage_destroy(msgType);
 						}

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
index 75b72cb..9461438 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
@@ -29,20 +29,12 @@
 
 #define PUBSUB_JSON_SERIALIZER_TYPE	"json"
 
-typedef struct pubsub_serializer {
-	bundle_context_pt bundle_context;
-	log_helper_pt loghelper;
-} pubsub_serializer_t;
+typedef struct pubsub_json_serializer pubsub_json_serializer_t;
 
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t* *serializer);
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer);
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_json_serializer_t **serializer);
+celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer);
 
-celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap);
-celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, hash_map_pt serializerMap);
-
-/* Start of serializer specific functions */
-celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen);
-celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out);
-void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg);
+celix_status_t pubsubSerializer_createSerializerMap(void *handle, bundle_pt bundle, hash_map_pt* serializerMap);
+celix_status_t pubsubSerializer_destroySerializerMap(void *handle, hash_map_pt serializerMap);
 
 #endif /* PUBSUB_SERIALIZER_JSON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
index 9fa3340..d441c11 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -39,6 +39,7 @@
 
 typedef struct pubsub_msg_serializer {
 	void* handle;
+
 	unsigned int msgId;
 	const char* msgName;
 	version_pt msgVersion;

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index 1340448..941f245 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -51,25 +51,25 @@ celix_bundle_files(pubsub_tst
     DESTINATION "META-INF/topics/sub"
 )
 
-add_celix_container(pubsub_zmq_tests
-        GEN_BUNDLES_CONFIG #ensures that a config.properties will be created with the launch bundles.
-        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
-        PROPERTIES
-            LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
-)
-target_link_libraries(pubsub_zmq_tests PRIVATE Celix::framework Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
-target_include_directories(pubsub_zmq_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
-celix_container_bundles(pubsub_zmq_tests
-        Celix::pubsub_serializer_json
-        Celix::pubsub_topology_manager
-        Celix::pubsub_admin_zmq
-        pubsub_sut
-        pubsub_tst
-        Celix::shell
-        Celix::shell_tui
-)
 
 
-add_test(NAME run_pubsub_zmq_tests COMMAND pubsub_zmq_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_tests,CONTAINER_LOC>)
-SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_tests pubsub_zmq_tests ${CMAKE_BINARY_DIR}/coverage/pubsub/pubsub_zmq)
+if (BUILD_PUBSUB_PSA_ZMQ)
+    add_celix_container(pubsub_zmq_tests
+            GEN_BUNDLES_CONFIG #ensures that a config.properties will be created with the launch bundles.
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            DIR ${CMAKE_CURRENT_BINARY_DIR}
+            PROPERTIES
+                LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+            BUNDLES
+                Celix::pubsub_serializer_json
+                Celix::pubsub_topology_manager
+                Celix::pubsub_admin_zmq
+                pubsub_sut
+                pubsub_tst
+    )
+    target_link_libraries(pubsub_zmq_tests PRIVATE Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
+    target_include_directories(pubsub_zmq_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
+    add_test(NAME run_pubsub_zmq_tests COMMAND pubsub_zmq_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_tests,CONTAINER_LOC>)
+    SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_tests pubsub_zmq_tests ${CMAKE_BINARY_DIR}/coverage/pubsub/pubsub_zmq)
+endif ()
 

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/meta_data/msg.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/meta_data/msg.descriptor b/bundles/pubsub/test/meta_data/msg.descriptor
index 0eb28cb..da774ae 100644
--- a/bundles/pubsub/test/meta_data/msg.descriptor
+++ b/bundles/pubsub/test/meta_data/msg.descriptor
@@ -6,4 +6,4 @@ version=1.0.0
 classname=org.example.Msg
 :types
 :message
-{i seqnR}
+{i seqNr}

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/meta_data/ping.properties
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/ping.properties
index 3a99403..09284fc 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/ping.properties
@@ -14,7 +14,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-pubsub.admin=zmq
 zmq.static.bind.url=ipc:///tmp/pubsub-pingtest
 zmq.static.connect.urls=ipc:///tmp/pubsub-pingtest
 

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/test/sut_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/test/sut_activator.c b/bundles/pubsub/test/test/sut_activator.c
index 58c6d48..c09359e 100644
--- a/bundles/pubsub/test/test/sut_activator.c
+++ b/bundles/pubsub/test/test/sut_activator.c
@@ -100,7 +100,6 @@ static void* sut_sendThread(void *data) {
 
 	while (running) {
 		pthread_mutex_lock(&act->mutex);
-
 		if (act->pubSvc != NULL) {
 		    if (msgId == 0) {
 		        act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
@@ -113,10 +112,12 @@ static void* sut_sendThread(void *data) {
 
 		    msg.seqNr += 1;
 
-
-            usleep(10000);
         }
+        pthread_mutex_unlock(&act->mutex);
 
+		usleep(10000);
+
+		pthread_mutex_lock(&act->mutex);
 		running = act->running;
 		pthread_mutex_unlock(&act->mutex);
 	}

http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/test/tst_activator.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/test/tst_activator.cc b/bundles/pubsub/test/test/tst_activator.cc
index cd19aaf..237c9a4 100644
--- a/bundles/pubsub/test/test/tst_activator.cc
+++ b/bundles/pubsub/test/test/tst_activator.cc
@@ -68,12 +68,20 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
 CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
 
 
-static int tst_receive(void *handle, const char * /*msgType*/, unsigned int /*msgTypeId*/, void * /*msg*/, bool *release) {
+static int tst_receive(void *handle, const char * /*msgType*/, unsigned int /*msgTypeId*/, void * voidMsg, bool */*release*/) {
     struct activator *act = static_cast<struct activator *>(handle);
+
+    msg_t *msg = static_cast<msg_t*>(voidMsg);
+    static int prevSeqNr = 0;
+    int delta = msg->seqNr - prevSeqNr;
+    if (delta != 1) {
+        fprintf(stderr, "Warning: missing messages. seq jumped from %i to %i\n", prevSeqNr, msg->seqNr);
+    }
+    prevSeqNr = msg->seqNr;
+
     pthread_mutex_lock(&act->mutex);
     act->count += 1;
     pthread_mutex_unlock(&act->mutex);
-    *release = true;
     return CELIX_SUCCESS;
 }