You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/11/12 08:39:17 UTC
celix git commit: CELIX-454: Refactors pubsub msg serializer so that
it uses the same handle approach as services to prevent confusion.
Repository: celix
Updated Branches:
refs/heads/feature/CELIX-454-pubsub-disc 2b8f92dfa -> 88a575f51
CELIX-454: Refactors pubsub msg serializer so that it uses the same handle approach as services to prevent confusion.
Also fixes a mem issues due to confusion howto call the msg serializer with a handle.
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/88a575f5
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/88a575f5
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/88a575f5
Branch: refs/heads/feature/CELIX-454-pubsub-disc
Commit: 88a575f5153d6051e420861e6808642f87068068
Parents: 2b8f92d
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Mon Nov 12 09:38:12 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Mon Nov 12 09:38:12 2018 +0100
----------------------------------------------------------------------
bundles/pubsub/CMakeLists.txt | 2 +-
.../src/pubsub_udpmc_topic_receiver.c | 4 +-
.../src/pubsub_udpmc_topic_sender.c | 2 +-
.../pubsub_admin_zmq/src/pubsub_zmq_admin.c | 3 +-
.../src/pubsub_zmq_topic_receiver.c | 6 +-
.../src/pubsub_zmq_topic_sender.c | 2 +-
.../src/ps_json_serializer_activator.c | 8 +--
.../src/pubsub_serializer_impl.c | 76 +++++++++++++-------
.../src/pubsub_serializer_impl.h | 18 ++---
.../pubsub_spi/include/pubsub_serializer.h | 1 +
bundles/pubsub/test/CMakeLists.txt | 38 +++++-----
bundles/pubsub/test/meta_data/msg.descriptor | 2 +-
bundles/pubsub/test/meta_data/ping.properties | 1 -
bundles/pubsub/test/test/sut_activator.c | 7 +-
bundles/pubsub/test/test/tst_activator.cc | 12 +++-
15 files changed, 106 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index e3db995..79f8168 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -39,7 +39,7 @@ if (PUBSUB)
if (ENABLE_TESTING)
option(BUILD_PUBSUB_TESTS "Enable Tests for PUBSUB" OFF)
endif()
- if (ENABLE_TESTING AND BUILD_PUBSUB_TESTS AND BUILD_PUBSUB_PSA_ZMQ)
+ if (ENABLE_TESTING AND BUILD_PUBSUB_TESTS)
add_subdirectory(test)
endif()
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index a2f37d4..e1e5a42 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -392,7 +392,7 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
if(validVersion){
- celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst);
+ celix_status_t status = msgSer->deserialize(msgSer->handle, (const void *)msg->payload, 0, &msgInst);
if (status == CELIX_SUCCESS) {
bool release = true;
@@ -400,7 +400,7 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &release);
if(release){
- msgSer->freeMsg(msgSer,msgInst);
+ msgSer->freeMsg(msgSer->handle, msgInst);
}
}
else{
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index ddc6172..55bf96b 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -255,7 +255,7 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
void* serializedOutput = NULL;
size_t serializedOutputLen = 0;
- if (msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen) == CELIX_SUCCESS) {
+ if (msgSer->serialize(msgSer->handle,inMsg,&serializedOutput, &serializedOutputLen) == CELIX_SUCCESS) {
pubsub_msg_header_t *msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
strncpy(msg_hdr->topic,entry->parent->topic,MAX_TOPIC_LEN-1);
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index f0f2b23..c5bfb99 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -274,6 +274,8 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
celixThreadMutex_lock(&psa->serializers.mutex);
psa_zmq_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
if (entry != NULL) {
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
@@ -305,7 +307,6 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
free(entry);
}
- celixThreadMutex_unlock(&psa->serializers.mutex);
}
celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index bb190cc..6886109 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -33,7 +33,6 @@
#include "pubsub_zmq_common.h"
#include "../../pubsub_topology_manager/src/pubsub_topology_manager.h"
-//TODO see if block and wakeup (reset) also works
#define PSA_ZMQ_RECV_TIMEOUT 1000
@@ -410,6 +409,7 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
}
static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, const pubsub_zmq_msg_header_t *hdr, const byte* payload, size_t payloadSize) {
+ //NOTE receiver->subscribers.mutex locked
pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type));
pubsub_subscriber_t *svc = entry->svc;
@@ -417,8 +417,8 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
void *deserializedMsg = NULL;
bool validVersion = psa_zmq_checkVersion(msgSer->msgVersion, hdr);
if (validVersion) {
- celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
- if(status == CELIX_SUCCESS) {
+ celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg);
+ if (status == CELIX_SUCCESS) {
bool release = true;
svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
if (release) {
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index ed40d9f..d87412e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -385,7 +385,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
void *serializedOutput = NULL;
size_t serializedOutputLen = 0;
- status = msgSer->serialize(msgSer, inMsg, &serializedOutput, &serializedOutputLen);
+ status = msgSer->serialize(msgSer->handle, inMsg, &serializedOutput, &serializedOutputLen);
if (status == CELIX_SUCCESS) {
zmsg_t *msg = zmsg_new();
//TODO revert to use zmq_msg_init_data (or something like that) for zero copy for the payload
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
index b74bab8..d59c752 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
@@ -24,8 +24,8 @@
#include "pubsub_serializer_impl.h"
typedef struct psjs_activator {
- pubsub_serializer_t* serializer;
- ;
+ pubsub_json_serializer_t* serializer;
+
pubsub_serializer_service_t serializerSvc;
long serializerSvcId;
} psjs_activator_t;
@@ -37,8 +37,8 @@ static int psjs_start(psjs_activator_t *act, celix_bundle_context_t *ctx) {
if (status == CELIX_SUCCESS) {
act->serializerSvc.handle = act->serializer;
- act->serializerSvc.createSerializerMap = (void*)pubsubSerializer_createSerializerMap;
- act->serializerSvc.destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap;
+ act->serializerSvc.createSerializerMap = pubsubSerializer_createSerializerMap;
+ act->serializerSvc.destroySerializerMap = pubsubSerializer_destroySerializerMap;
/* Set serializer type */
celix_properties_t *props = celix_properties_create();
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
index 685d499..2569ee7 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
@@ -16,13 +16,6 @@
*specific language governing permissions and limitations
*under the License.
*/
-/*
- * pubsub_serializer_impl.c
- *
- * \date Mar 24, 2017
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
#include <stdio.h>
#include <stdlib.h>
@@ -44,11 +37,31 @@
#define SYSTEM_BUNDLE_ARCHIVE_PATH "CELIX_FRAMEWORK_EXTENDER_PATH"
#define MAX_PATH_LEN 1024
+struct pubsub_json_serializer {
+ bundle_context_pt bundle_context;
+ log_helper_pt loghelper;
+};
+
+
+/* Start of serializer specific functions */
+static celix_status_t pubsubMsgSerializer_serialize(void* handle, const void* msg, void** out, size_t *outLen);
+static celix_status_t pubsubMsgSerializer_deserialize(void* handle, const void* input, size_t inputLen, void **out);
+static void pubsubMsgSerializer_freeMsg(void* handle, void *msg);
+
+
+typedef struct pubsub_json_msg_serializer_impl {
+ dyn_message_type *msgType;
+
+ unsigned int msgId;
+ const char* msgName;
+ version_pt msgVersion;
+} pubsub_json_msg_serializer_impl_t;
+
static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle_pt bundle);
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t** serializer) {
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_json_serializer_t** serializer) {
celix_status_t status = CELIX_SUCCESS;
*serializer = calloc(1, sizeof(**serializer));
@@ -69,7 +82,7 @@ celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_seriali
return status;
}
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
+celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer) {
celix_status_t status = CELIX_SUCCESS;
logHelper_stop(serializer->loghelper);
@@ -80,8 +93,9 @@ celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
return status;
}
-celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap) {
+celix_status_t pubsubSerializer_createSerializerMap(void *handle, bundle_pt bundle, hash_map_pt* serializerMap) {
celix_status_t status = CELIX_SUCCESS;
+ pubsub_json_serializer_t *serializer = handle;
hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
@@ -98,8 +112,9 @@ celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* seriali
return status;
}
-celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, hash_map_pt serializerMap) {
+celix_status_t pubsubSerializer_destroySerializerMap(void* handle __attribute__((unused)), hash_map_pt serializerMap) {
celix_status_t status = CELIX_SUCCESS;
+ //pubsub_json_serializer_t *serializer = handle;
if (serializerMap == NULL) {
return CELIX_ILLEGAL_ARGUMENT;
}
@@ -107,9 +122,11 @@ celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serial
hash_map_iterator_t iter = hashMapIterator_construct(serializerMap);
while (hashMapIterator_hasNext(&iter)) {
pubsub_msg_serializer_t* msgSerializer = hashMapIterator_nextValue(&iter);
- dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+ pubsub_json_msg_serializer_impl_t *impl = msgSerializer->handle;
+ dyn_message_type *dynMsg = impl->msgType;
dynMessage_destroy(dynMsg); //note msgSer->name and msgSer->version owned by dynType
free(msgSerializer); //also contains the service struct.
+ free(impl);
}
hashMap_destroy(serializerMap, false, false);
@@ -118,12 +135,14 @@ celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serial
}
-celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen) {
+celix_status_t pubsubMsgSerializer_serialize(void *handle, const void* msg, void** out, size_t *outLen) {
celix_status_t status = CELIX_SUCCESS;
+ pubsub_json_msg_serializer_impl_t *impl = handle;
+
char *jsonOutput = NULL;
dyn_type* dynType = NULL;
- dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+ dyn_message_type *dynMsg = impl->msgType;
dynMessage_getMessageType(dynMsg, &dynType);
if (jsonSerializer_serialize(dynType, msg, &jsonOutput) != 0){
@@ -138,12 +157,12 @@ celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSeriali
return status;
}
-celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out) {
-
+celix_status_t pubsubMsgSerializer_deserialize(void* handle, const void* input, size_t inputLen, void **out) {
celix_status_t status = CELIX_SUCCESS;
+ pubsub_json_msg_serializer_impl_t *impl = handle;
void *msg = NULL;
dyn_type* dynType = NULL;
- dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+ dyn_message_type *dynMsg = impl->msgType;
dynMessage_getMessageType(dynMsg, &dynType);
if (jsonSerializer_deserialize(dynType, (const char*)input, &msg) != 0) {
@@ -156,9 +175,10 @@ celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSeria
return status;
}
-void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg) {
+void pubsubMsgSerializer_freeMsg(void* handle, void *msg) {
+ pubsub_json_msg_serializer_impl_t *impl = handle;
dyn_type* dynType = NULL;
- dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+ dyn_message_type *dynMsg = impl->msgType;
dynMessage_getMessageType(dynMsg, &dynType);
if (dynType != NULL) {
dynType_free(dynType, msg);
@@ -246,12 +266,18 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
unsigned int msgId = utils_stringHash(msgName);
- pubsub_msg_serializer_t *msgSerializer = calloc(1,sizeof(pubsub_msg_serializer_t));
+ pubsub_msg_serializer_t *msgSerializer = calloc(1,sizeof(*msgSerializer));
+ pubsub_json_msg_serializer_impl_t *impl = calloc(1, sizeof(*impl));
+
+ impl->msgType = msgType;
+ impl->msgId = msgId;
+ impl->msgName = msgName;
+ impl->msgVersion = msgVersion;
- msgSerializer->handle = msgType;
- msgSerializer->msgId = msgId;
- msgSerializer->msgName = msgName;
- msgSerializer->msgVersion = msgVersion;
+ msgSerializer->handle = impl;
+ msgSerializer->msgId = impl->msgId;
+ msgSerializer->msgName = impl->msgName;
+ msgSerializer->msgVersion = impl->msgVersion;
msgSerializer->serialize = (void*) pubsubMsgSerializer_serialize;
msgSerializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
msgSerializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
@@ -260,6 +286,7 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
if (clash){
printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
free(msgSerializer);
+ free(impl);
dynMessage_destroy(msgType);
}
else if (msgId != 0){
@@ -268,6 +295,7 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
}
else{
printf("Error creating msg serializer\n");
+ free(impl);
free(msgSerializer);
dynMessage_destroy(msgType);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
index 75b72cb..9461438 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
@@ -29,20 +29,12 @@
#define PUBSUB_JSON_SERIALIZER_TYPE "json"
-typedef struct pubsub_serializer {
- bundle_context_pt bundle_context;
- log_helper_pt loghelper;
-} pubsub_serializer_t;
+typedef struct pubsub_json_serializer pubsub_json_serializer_t;
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t* *serializer);
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer);
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_json_serializer_t **serializer);
+celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer);
-celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap);
-celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, hash_map_pt serializerMap);
-
-/* Start of serializer specific functions */
-celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen);
-celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out);
-void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg);
+celix_status_t pubsubSerializer_createSerializerMap(void *handle, bundle_pt bundle, hash_map_pt* serializerMap);
+celix_status_t pubsubSerializer_destroySerializerMap(void *handle, hash_map_pt serializerMap);
#endif /* PUBSUB_SERIALIZER_JSON_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
index 9fa3340..d441c11 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -39,6 +39,7 @@
typedef struct pubsub_msg_serializer {
void* handle;
+
unsigned int msgId;
const char* msgName;
version_pt msgVersion;
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index 1340448..941f245 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -51,25 +51,25 @@ celix_bundle_files(pubsub_tst
DESTINATION "META-INF/topics/sub"
)
-add_celix_container(pubsub_zmq_tests
- GEN_BUNDLES_CONFIG #ensures that a config.properties will be created with the launch bundles.
- LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
- PROPERTIES
- LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
-)
-target_link_libraries(pubsub_zmq_tests PRIVATE Celix::framework Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
-target_include_directories(pubsub_zmq_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
-celix_container_bundles(pubsub_zmq_tests
- Celix::pubsub_serializer_json
- Celix::pubsub_topology_manager
- Celix::pubsub_admin_zmq
- pubsub_sut
- pubsub_tst
- Celix::shell
- Celix::shell_tui
-)
-add_test(NAME run_pubsub_zmq_tests COMMAND pubsub_zmq_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_tests,CONTAINER_LOC>)
-SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_tests pubsub_zmq_tests ${CMAKE_BINARY_DIR}/coverage/pubsub/pubsub_zmq)
+if (BUILD_PUBSUB_PSA_ZMQ)
+ add_celix_container(pubsub_zmq_tests
+ GEN_BUNDLES_CONFIG #ensures that a config.properties will be created with the launch bundles.
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ BUNDLES
+ Celix::pubsub_serializer_json
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_zmq
+ pubsub_sut
+ pubsub_tst
+ )
+ target_link_libraries(pubsub_zmq_tests PRIVATE Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
+ target_include_directories(pubsub_zmq_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
+ add_test(NAME run_pubsub_zmq_tests COMMAND pubsub_zmq_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_tests,CONTAINER_LOC>)
+ SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_tests pubsub_zmq_tests ${CMAKE_BINARY_DIR}/coverage/pubsub/pubsub_zmq)
+endif ()
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/meta_data/msg.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/meta_data/msg.descriptor b/bundles/pubsub/test/meta_data/msg.descriptor
index 0eb28cb..da774ae 100644
--- a/bundles/pubsub/test/meta_data/msg.descriptor
+++ b/bundles/pubsub/test/meta_data/msg.descriptor
@@ -6,4 +6,4 @@ version=1.0.0
classname=org.example.Msg
:types
:message
-{i seqnR}
+{i seqNr}
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/meta_data/ping.properties
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/ping.properties
index 3a99403..09284fc 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/ping.properties
@@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-pubsub.admin=zmq
zmq.static.bind.url=ipc:///tmp/pubsub-pingtest
zmq.static.connect.urls=ipc:///tmp/pubsub-pingtest
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/test/sut_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/test/sut_activator.c b/bundles/pubsub/test/test/sut_activator.c
index 58c6d48..c09359e 100644
--- a/bundles/pubsub/test/test/sut_activator.c
+++ b/bundles/pubsub/test/test/sut_activator.c
@@ -100,7 +100,6 @@ static void* sut_sendThread(void *data) {
while (running) {
pthread_mutex_lock(&act->mutex);
-
if (act->pubSvc != NULL) {
if (msgId == 0) {
act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
@@ -113,10 +112,12 @@ static void* sut_sendThread(void *data) {
msg.seqNr += 1;
-
- usleep(10000);
}
+ pthread_mutex_unlock(&act->mutex);
+ usleep(10000);
+
+ pthread_mutex_lock(&act->mutex);
running = act->running;
pthread_mutex_unlock(&act->mutex);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/88a575f5/bundles/pubsub/test/test/tst_activator.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/test/tst_activator.cc b/bundles/pubsub/test/test/tst_activator.cc
index cd19aaf..237c9a4 100644
--- a/bundles/pubsub/test/test/tst_activator.cc
+++ b/bundles/pubsub/test/test/tst_activator.cc
@@ -68,12 +68,20 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
-static int tst_receive(void *handle, const char * /*msgType*/, unsigned int /*msgTypeId*/, void * /*msg*/, bool *release) {
+static int tst_receive(void *handle, const char * /*msgType*/, unsigned int /*msgTypeId*/, void * voidMsg, bool */*release*/) {
struct activator *act = static_cast<struct activator *>(handle);
+
+ msg_t *msg = static_cast<msg_t*>(voidMsg);
+ static int prevSeqNr = 0;
+ int delta = msg->seqNr - prevSeqNr;
+ if (delta != 1) {
+ fprintf(stderr, "Warning: missing messages. seq jumped from %i to %i\n", prevSeqNr, msg->seqNr);
+ }
+ prevSeqNr = msg->seqNr;
+
pthread_mutex_lock(&act->mutex);
act->count += 1;
pthread_mutex_unlock(&act->mutex);
- *release = true;
return CELIX_SUCCESS;
}