You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/04/11 19:24:30 UTC
[1/2] celix git commit: CELIX-408: Major refactoring of the pubsub
serializer design and usage. example of udpmc and zmq seesm to be working.
combi and zmq multicast are not stable yet.
Repository: celix
Updated Branches:
refs/heads/feature/CELIX-408_runtime 97df926c2 -> 7efe4331a
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/src/dyn_msg_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/dyn_msg_utils.c b/pubsub/pubsub_common/public/src/dyn_msg_utils.c
deleted file mode 100644
index 1c6bbcd..0000000
--- a/pubsub/pubsub_common/public/src/dyn_msg_utils.c
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * dyn_msg_utils.c
- *
- * \date Nov 11, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <dirent.h>
-
-#include "utils.h"
-#include "dyn_message.h"
-
-#include "dyn_msg_utils.h"
-
-#define SYSTEM_BUNDLE_ARCHIVE_PATH "CELIX_FRAMEWORK_EXTENDER_PATH"
-
-static char * getMsgDescriptionDir(bundle_pt bundle);
-static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
-
-
-unsigned int uintHash(const void * uintNum) {
- return *((unsigned int*)uintNum);
-}
-
-int uintEquals(const void * uintNum, const void * toCompare) {
- return ( (*((unsigned int*)uintNum)) == (*((unsigned int*)toCompare)) );
-}
-
-void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
-
- char *root = NULL;
- char *metaInfPath = NULL;
-
- root = getMsgDescriptionDir(bundle);
-
- if(root != NULL){
- asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
-
- addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
- addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
-
- free(metaInfPath);
- free(root);
- }
-}
-
-void emptyMsgTypesMap(hash_map_pt msgTypesMap)
-{
- hash_map_iterator_pt iter = hashMapIterator_create(msgTypesMap);
-
- while(hashMapIterator_hasNext(iter)){
- hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
- dynMessage_destroy( ((dyn_message_type *) hashMapEntry_getValue(entry)) );
- }
- hashMap_clear(msgTypesMap, true, false);
- hashMapIterator_destroy(iter);
-}
-
-static char * getMsgDescriptionDir(bundle_pt bundle)
-{
- char *root = NULL;
-
- bool isSystemBundle = false;
- bundle_isSystemBundle(bundle, &isSystemBundle);
-
- if(isSystemBundle == true) {
- bundle_context_pt context;
- bundle_getContext(bundle, &context);
-
- const char *prop = NULL;
-
- bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
-
- if(prop != NULL) {
- root = strdup(prop);
- } else {
- root = getcwd(NULL, 0);
- }
- } else {
- bundle_getEntry(bundle, ".", &root);
- }
-
- return root;
-}
-
-
-static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap)
-{
- char path[128];
- struct dirent *entry = NULL;
- DIR *dir = opendir(root);
-
- if(dir) {
- entry = readdir(dir);
- }
-
- while (entry != NULL) {
-
- if (strstr(entry->d_name, ".descriptor") != NULL) {
-
- printf("DMU: Parsing entry '%s'\n", entry->d_name);
-
- memset(path,0,128);
- snprintf(path, 128, "%s/%s", root, entry->d_name);
- FILE *stream = fopen(path,"r");
-
- if (stream != NULL){
- dyn_message_type* msgType = NULL;
-
- int rc = dynMessage_parse(stream, &msgType);
- if (rc == 0 && msgType!=NULL) {
-
- char* msgName = NULL;
- dynMessage_getName(msgType,&msgName);
-
- if(msgName!=NULL){
- unsigned int* msgId = malloc(sizeof(unsigned int));
- *msgId = utils_stringHash(msgName);
- hashMap_put(msgTypesMap,msgId,msgType);
- }
-
- }
- else{
- printf("DMU: cannot parse message from descriptor %s\n.",path);
- }
- fclose(stream);
- }else{
- printf("DMU: cannot open descriptor file %s\n.",path);
- }
-
- }
- entry = readdir(dir);
- }
-
- if(dir) {
- closedir(dir);
- }
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt b/pubsub/pubsub_serializer_json/CMakeLists.txt
index 61b1cd9..a5798a4 100644
--- a/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -32,7 +32,6 @@ add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson
SOURCES
private/src/ps_activator.c
private/src/pubsub_serializer_impl.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
)
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
index f026300..5299808 100644
--- a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
+++ b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
@@ -34,32 +34,25 @@
#include "pubsub_serializer.h"
-struct _pubsub_message_type { /* _dyn_message_type */
- struct namvals_head header;
- struct namvals_head annotations;
- struct types_head types;
- dyn_type *msgType;
- version_pt msgVersion;
-};
-
-struct pubsub_serializer {
+typedef struct pubsub_serializer {
bundle_context_pt bundle_context;
log_helper_pt loghelper;
-};
-
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer);
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer);
+} pubsub_serializer_t;
-/* Start of serializer specific functions */
-celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
-celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output);
+typedef struct pubsub_msg_serializer_impl {
+ pubsub_msg_serializer_t msgSerializer;
+ dyn_message_type* dynMsg;
+} pubsub_msg_serializer_impl_t;
-void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt bundle);
-void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap);
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t* *serializer);
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer);
-version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out);
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, pubsub_msg_serializer_map_t* map);
+/* Start of serializer specific functions */
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg);
#endif /* PUBSUB_SERIALIZER_IMPL_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/ps_activator.c b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
index b83c26b..e0e23d4 100644
--- a/pubsub/pubsub_serializer_json/private/src/ps_activator.c
+++ b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
@@ -32,8 +32,8 @@
#include "pubsub_serializer_impl.h"
struct activator {
- pubsub_serializer_pt serializer;
- pubsub_serializer_service_pt serializerService;
+ pubsub_serializer_t* serializer;
+ pubsub_serializer_service_t* serializerService;
service_registration_pt registration;
};
@@ -56,24 +56,16 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
celix_status_t status = CELIX_SUCCESS;
struct activator *activator = userData;
- pubsub_serializer_service_pt pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
+ pubsub_serializer_service_t* pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
if (!pubsubSerializerSvc) {
status = CELIX_ENOMEM;
}
else{
- pubsubSerializerSvc->serializer = activator->serializer;
-
- pubsubSerializerSvc->serialize = pubsubSerializer_serialize;
- pubsubSerializerSvc->deserialize = pubsubSerializer_deserialize;
-
- pubsubSerializerSvc->fillMsgTypesMap = pubsubSerializer_fillMsgTypesMap;
- pubsubSerializerSvc->emptyMsgTypesMap = pubsubSerializer_emptyMsgTypesMap;
-
- pubsubSerializerSvc->getVersion = pubsubSerializer_getVersion;
- pubsubSerializerSvc->getName = pubsubSerializer_getName;
- pubsubSerializerSvc->freeMsg = pubsubSerializer_freeMsg;
+ pubsubSerializerSvc->handle = activator->serializer;
+ pubsubSerializerSvc->createSerializerMap = (void*)pubsubSerializer_createSerializerMap;
+ pubsubSerializerSvc->destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap;
activator->serializerService = pubsubSerializerSvc;
status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, NULL, &activator->registration);
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
index 60d5c98..2dd8258 100644
--- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -26,30 +26,28 @@
#include <stdio.h>
#include <stdlib.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
#include <string.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <inttypes.h>
-#include "constants.h"
#include "utils.h"
#include "hash_map.h"
-#include "array_list.h"
#include "bundle_context.h"
-#include "bundle.h"
-#include "service_reference.h"
-#include "service_registration.h"
+
#include "log_helper.h"
-#include "log_service.h"
-#include "service_factory.h"
#include "json_serializer.h"
-#include "dyn_msg_utils.h"
#include "pubsub_serializer_impl.h"
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer) {
+#define SYSTEM_BUNDLE_ARCHIVE_PATH "CELIX_FRAMEWORK_EXTENDER_PATH"
+
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle_pt bundle);
+
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t** serializer) {
celix_status_t status = CELIX_SUCCESS;
*serializer = calloc(1, sizeof(**serializer));
@@ -70,7 +68,7 @@ celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_seriali
return status;
}
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer) {
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
celix_status_t status = CELIX_SUCCESS;
logHelper_stop(serializer->loghelper);
@@ -81,63 +79,197 @@ celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer) {
return status;
}
-celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
- celix_status_t status = CELIX_SUCCESS;
-
- dyn_type *type = NULL;
- dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out) {
+ celix_status_t status = CELIX_SUCCESS;
+ pubsub_msg_serializer_map_t* map = calloc(1, sizeof(*map));
+ if (status == CELIX_SUCCESS) {
+ map->bundle = bundle;
+ map->serializers = hashMap_create(NULL, NULL, NULL, NULL);
+ pubsubSerializer_fillMsgSerializerMap(map->serializers, bundle);
+ } else {
+ logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map");
+ status = CELIX_ENOMEM;
+ }
- char *jsonOutput = NULL;
- int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
- if (rc != 0){
- status = CELIX_BUNDLE_EXCEPTION;
- }
+ if (status == CELIX_SUCCESS) {
+ *out = map;
+ }
+ return status;
+}
- *output = (void *) jsonOutput;
- *outputLen = strlen(jsonOutput) + 1;
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, pubsub_msg_serializer_map_t* map) {
+ celix_status_t status = CELIX_SUCCESS;
+ if (map == NULL) {
+ return status;
+ }
- return status;
+ hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+ pubsub_msg_serializer_impl_t* impl = msgSer->handle;
+ dynMessage_destroy(impl->dynMsg); //note msgSer->name and msgSer->version owned by dynType
+ free(impl); //also contains the service struct.
+ }
+ hashMap_destroy(map->serializers, false, false);
+ free(map);
+ return status;
}
-celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output){
- celix_status_t status = CELIX_SUCCESS;
- dyn_type *type = NULL;
- dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen) {
+ celix_status_t status = CELIX_SUCCESS;
- void *textOutput = NULL;
- int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+ char *jsonOutput = NULL;
+ dyn_type* dynType = NULL;
+ dynMessage_getMessageType(impl->dynMsg, &dynType);
+ int rc = jsonSerializer_serialize(dynType, msg, &jsonOutput);
if (rc != 0){
status = CELIX_BUNDLE_EXCEPTION;
}
-
- *output = textOutput;
+ if (status == CELIX_SUCCESS) {
+ *out = jsonOutput;
+ *outLen = strlen(jsonOutput) + 1;
+ }
return status;
}
-void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap, bundle_pt bundle){
- fillMsgTypesMap(msgTypesMap, bundle);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out) {
+ celix_status_t status = CELIX_SUCCESS;
+ void *msg = NULL;
+ dyn_type* dynType = NULL;
+ dynMessage_getMessageType(impl->dynMsg, &dynType);
+ int rc = jsonSerializer_deserialize(dynType, input, &msg);
+ if (rc != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+ if (status == CELIX_SUCCESS) {
+ *out = msg;
+ }
+ return status;
}
-void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap){
- emptyMsgTypesMap(msgTypesMap);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg) {
+ dyn_type* dynType = NULL;
+ dynMessage_getMessageType(impl->dynMsg, &dynType);
+ if (dynType != NULL) {
+ dynType_free(dynType, msg);
+ }
}
-version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type *msgType){
- version_pt msgVersion = NULL;
- dynMessage_getVersion((dyn_message_type *) msgType, &msgVersion);
- return msgVersion;
+
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bundle_pt bundle) {
+ char* root = NULL;
+ char* metaInfPath = NULL;
+
+ root = pubsubSerializer_getMsgDescriptionDir(bundle);
+
+ if(root != NULL){
+ asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
+
+ pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
+ pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
+
+ free(metaInfPath);
+ free(root);
+ }
}
-char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType){
- char *name = NULL;
- dynMessage_getName((dyn_message_type *) msgType, &name);
- return name;
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
+{
+ char *root = NULL;
+
+ bool isSystemBundle = false;
+ bundle_isSystemBundle(bundle, &isSystemBundle);
+
+ if(isSystemBundle == true) {
+ bundle_context_pt context;
+ bundle_getContext(bundle, &context);
+
+ const char *prop = NULL;
+
+ bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+
+ if(prop != NULL) {
+ root = strdup(prop);
+ } else {
+ root = getcwd(NULL, 0);
+ }
+ } else {
+ bundle_getEntry(bundle, ".", &root);
+ }
+
+ return root;
}
-void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg){
- dyn_type *type = NULL;
- dynMessage_getMessageType((dyn_message_type *) msgType, &type);
- dynType_free(type, msg);
+
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgSerializers)
+{
+ char path[128];
+ struct dirent *entry = NULL;
+ DIR *dir = opendir(root);
+
+ if(dir) {
+ entry = readdir(dir);
+ }
+
+ while (entry != NULL) {
+
+ if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+ printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+ memset(path,0,128);
+ snprintf(path, 128, "%s/%s", root, entry->d_name);
+ FILE *stream = fopen(path,"r");
+
+ if (stream != NULL){
+ dyn_message_type* msgType = NULL;
+
+ int rc = dynMessage_parse(stream, &msgType);
+ if (rc == 0 && msgType != NULL) {
+
+ char* msgName = NULL;
+ dynMessage_getName(msgType,&msgName);
+
+ version_pt msgVersion = NULL;
+ dynMessage_getVersion(msgType, &msgVersion);
+
+ unsigned int msgId = utils_stringHash(msgName);
+
+ pubsub_msg_serializer_impl_t* impl = calloc(1, sizeof(*impl));
+ impl->dynMsg = msgType;
+ impl->msgSerializer.handle = impl;
+ impl->msgSerializer.msgId = msgId;
+ impl->msgSerializer.msgName = msgName;
+ impl->msgSerializer.msgVersion = msgVersion;
+ impl->msgSerializer.serialize = (void*) pubsubMsgSerializer_serialize;
+ impl->msgSerializer.deserialize = (void*) pubsubMsgSerializer_deserialize;
+ impl->msgSerializer.freeMsg = (void*) pubsubMsgSerializer_freeMsg;
+
+ bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
+ if (clash) {
+ printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
+ } else if ( msgName != NULL && msgVersion != NULL && msgId != 0) {
+ hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, &impl->msgSerializer);
+ } else {
+ printf("Error adding creating msg serializer\n");
+ }
+
+ }
+ else{
+ printf("DMU: cannot parse message from descriptor %s\n.",path);
+ }
+ fclose(stream);
+ }else{
+ printf("DMU: cannot open descriptor file %s\n.",path);
+ }
+
+ }
+ entry = readdir(dir);
+ }
+
+ if(dir) {
+ closedir(dir);
+ }
}
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 3826a25..36ea422 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -328,7 +328,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle, servic
celix_status_t status = CELIX_SUCCESS;
pubsub_topology_manager_pt manager = handle;
- pubsub_serializer_service_pt new_serializer = (pubsub_serializer_service_pt) service;
+ pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
celixThreadMutex_lock(&manager->serializerListLock);
@@ -360,7 +360,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle, ser
celix_status_t status = CELIX_SUCCESS;
pubsub_topology_manager_pt manager = handle;
- pubsub_serializer_service_pt new_serializer = (pubsub_serializer_service_pt) service;
+ pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
celixThreadMutex_lock(&manager->serializerListLock);
@@ -377,7 +377,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle, ser
if (arrayList_size(manager->serializerList) > 0){
//there is another serializer available, change the admin so it is using another serializer
- pubsub_serializer_service_pt replacing_serializer = (pubsub_serializer_service_pt) arrayList_get(manager->serializerList,0);
+ pubsub_serializer_service_t* replacing_serializer = (pubsub_serializer_service_t*) arrayList_get(manager->serializerList,0);
for(j=0; j<arrayList_size(manager->psaList); j++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,j);
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/test/msg_descriptors/msg.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/test/msg_descriptors/msg.descriptor b/pubsub/test/msg_descriptors/msg.descriptor
index 808644c..03b15ba 100644
--- a/pubsub/test/msg_descriptors/msg.descriptor
+++ b/pubsub/test/msg_descriptors/msg.descriptor
@@ -5,4 +5,4 @@ version=1.0.0
:annotations
:types
:message
-{n seqnR}
\ No newline at end of file
+{n seqnR}
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/test/test/tst_activator.cpp
----------------------------------------------------------------------
diff --git a/pubsub/test/test/tst_activator.cpp b/pubsub/test/test/tst_activator.cpp
index 6d957a0..266f73e 100644
--- a/pubsub/test/test/tst_activator.cpp
+++ b/pubsub/test/test/tst_activator.cpp
@@ -145,13 +145,14 @@ TEST_GROUP(PUBSUB_INT_GROUP)
//check if message are returned
msg_t initMsg;
initMsg.seqNr = 0;
+ int count = 0;
for (int i = 0; i < TRIES; ++i) {
pthread_mutex_lock(&g_act.mutex);
g_act.pubSvc->send(g_act.pubSvc->handle, g_act.msgId, &initMsg);
pthread_mutex_unlock(&g_act.mutex);
usleep(TIMEOUT);
pthread_mutex_lock(&g_act.mutex);
- int count = g_act.count;
+ count = g_act.count;
pthread_mutex_unlock(&g_act.mutex);
if (count > 0) {
break;
@@ -159,6 +160,7 @@ TEST_GROUP(PUBSUB_INT_GROUP)
printf("No return message received, waiting for a while\n");
}
}
+ CHECK(count > 0);
}
void teardown() {
@@ -167,7 +169,10 @@ TEST_GROUP(PUBSUB_INT_GROUP)
};
TEST(PUBSUB_INT_GROUP, sendRecvTest) {
- g_act.count = 0;
+ pthread_mutex_lock(&g_act.mutex);
+ g_act.count = 0; //reset counter
+ pthread_mutex_unlock(&g_act.mutex);
+
constexpr int COUNT = 50;
msg_t msg;
for (int i = 0; i < COUNT; ++i) {
[2/2] celix git commit: CELIX-408: Major refactoring of the pubsub
serializer design and usage. example of udpmc and zmq seesm to be working.
combi and zmq multicast are not stable yet.
Posted by pn...@apache.org.
CELIX-408: Major refactoring of the pubsub serializer design and usage. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.
- The serializer is refactored to miminize to needed callback directly to the services. The serializer is now a two step approach.
The serializer service can be used to create a map os msg serializers.
And the msg serializer are serializer structs (with function ptrs) to serialize a specific msg
- Where feasible replace _pt types with _t types. Removing the pointer in the typedef makes it possible to add const info and sizeof calls of the typedef
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7efe4331
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7efe4331
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7efe4331
Branch: refs/heads/feature/CELIX-408_runtime
Commit: 7efe4331aafdfea1df8bd181c136f20acaf52b49
Parents: 97df926
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Tue Apr 11 21:19:10 2017 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Tue Apr 11 21:19:10 2017 +0200
----------------------------------------------------------------------
cmake/cmake_celix/BundlePackaging.cmake | 2 +-
dfi/private/src/json_serializer.c | 8 +-
dfi/public/include/json_serializer.h | 4 +-
pubsub/pubsub_admin_udp_mc/CMakeLists.txt | 1 -
.../private/include/pubsub_admin_impl.h | 6 +-
.../include/pubsub_publish_service_private.h | 8 +-
.../private/include/topic_subscription.h | 6 +-
.../private/src/pubsub_admin_impl.c | 4 +-
.../private/src/topic_publication.c | 173 +++++++-------
.../private/src/topic_subscription.c | 158 +++++++------
pubsub/pubsub_admin_zmq/CMakeLists.txt | 1 -
.../private/include/pubsub_admin_impl.h | 6 +-
.../include/pubsub_publish_service_private.h | 6 +-
.../private/include/topic_subscription.h | 6 +-
.../private/src/pubsub_admin_impl.c | 4 +-
.../private/src/topic_publication.c | 137 ++++++-----
.../private/src/topic_subscription.c | 184 ++++++---------
.../public/include/dyn_msg_utils.h | 39 ----
.../pubsub_common/public/include/pubsub_admin.h | 4 +-
.../public/include/pubsub_serializer.h | 47 ++--
pubsub/pubsub_common/public/src/dyn_msg_utils.c | 160 -------------
pubsub/pubsub_serializer_json/CMakeLists.txt | 1 -
.../private/include/pubsub_serializer_impl.h | 35 ++-
.../private/src/ps_activator.c | 20 +-
.../private/src/pubsub_serializer_impl.c | 234 +++++++++++++++----
.../private/src/pubsub_topology_manager.c | 6 +-
pubsub/test/msg_descriptors/msg.descriptor | 2 +-
pubsub/test/test/tst_activator.cpp | 9 +-
28 files changed, 582 insertions(+), 689 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/cmake/cmake_celix/BundlePackaging.cmake
----------------------------------------------------------------------
diff --git a/cmake/cmake_celix/BundlePackaging.cmake b/cmake/cmake_celix/BundlePackaging.cmake
index ded1ca5..7eb42fa 100644
--- a/cmake/cmake_celix/BundlePackaging.cmake
+++ b/cmake/cmake_celix/BundlePackaging.cmake
@@ -319,7 +319,7 @@ function(bundle_libs)
if (ADD_TO_MANIFEST)
list(APPEND LIBS "$<TARGET_SONAME_FILE_NAME:${LIB}>")
endif()
- list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>.
+ list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>.
endif()
get_target_property(IS_LIB ${BUNDLE} "BUNDLE_TARGET_IS_LIB")
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/private/src/json_serializer.c
----------------------------------------------------------------------
diff --git a/dfi/private/src/json_serializer.c b/dfi/private/src/json_serializer.c
index 0c06998..c1cd339 100644
--- a/dfi/private/src/json_serializer.c
+++ b/dfi/private/src/json_serializer.c
@@ -280,7 +280,7 @@ static int jsonSerializer_parseSequence(dyn_type *seq, json_t *array, void *seqL
return status;
}
-int jsonSerializer_serialize(dyn_type *type, void *input, char **output) {
+int jsonSerializer_serialize(dyn_type *type, const void* input, char **output) {
int status = OK;
json_t *root = NULL;
@@ -294,11 +294,11 @@ int jsonSerializer_serialize(dyn_type *type, void *input, char **output) {
return status;
}
-int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out) {
- return jsonSerializer_writeAny(type, input, out);
+int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out) {
+ return jsonSerializer_writeAny(type, (void*)input /*TODO update static function to take const void**/, out);
}
-static int jsonSerializer_writeAny(dyn_type *type, void *input, json_t **out) {
+static int jsonSerializer_writeAny(dyn_type *type, void* input, json_t **out) {
int status = OK;
int descriptor = dynType_descriptorType(type);
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/public/include/json_serializer.h
----------------------------------------------------------------------
diff --git a/dfi/public/include/json_serializer.h b/dfi/public/include/json_serializer.h
index c785b01..2f91f2b 100644
--- a/dfi/public/include/json_serializer.h
+++ b/dfi/public/include/json_serializer.h
@@ -31,7 +31,7 @@ DFI_SETUP_LOG_HEADER(jsonSerializer);
int jsonSerializer_deserialize(dyn_type *type, const char *input, void **result);
int jsonSerializer_deserializeJson(dyn_type *type, json_t *input, void **result);
-int jsonSerializer_serialize(dyn_type *type, void *input, char **output);
-int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out);
+int jsonSerializer_serialize(dyn_type *type, const void* input, char **output);
+int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out);
#endif
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index ce32db0..1ac0c2d 100644
--- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -35,7 +35,6 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
private/src/topic_subscription.c
private/src/topic_publication.c
private/src/large_udp.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
)
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
index 9eddf15..89e6547 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -33,7 +33,7 @@
struct pubsub_admin {
- pubsub_serializer_service_pt serializerSvc;
+ pubsub_serializer_service_t* serializerSvc;
bundle_context_pt bundle_context;
log_helper_pt loghelper;
@@ -73,7 +73,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
#endif /* PUBSUB_ADMIN_IMPL_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
index 81690b8..57d942a 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
@@ -39,17 +39,17 @@ typedef struct pubsub_udp_msg {
struct pubsub_msg_header header;
unsigned int payloadSize;
char payload[];
-} *pubsub_udp_msg_pt;
+} pubsub_udp_msg_t;
typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
index 36c902e..1535ae5 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
@@ -38,7 +38,7 @@
typedef struct topic_subscription* topic_subscription_pt;
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out);
celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -49,8 +49,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 6e9f4e8..ebfe3e6 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -645,7 +645,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
return status;
}
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
admin->serializerSvc = serializerSvc;
@@ -673,7 +673,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
return status;
}
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
admin->serializerSvc = NULL;
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index aa3faf0..be0a433 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -37,7 +37,6 @@
#include "array_list.h"
#include "celixbool.h"
#include "service_registration.h"
-#include "dyn_msg_utils.h"
#include "utils.h"
#include "service_factory.h"
#include "version.h"
@@ -59,7 +58,7 @@ struct topic_publication {
hash_map_pt boundServices; //<bundle_pt,bound_service>
celix_thread_mutex_t tp_lock;
struct sockaddr_in destAddr;
- pubsub_serializer_service_pt serializerSvc;
+ pubsub_serializer_service_t* serializerSvc;
};
typedef struct publish_bundle_bound_service {
@@ -68,27 +67,27 @@ typedef struct publish_bundle_bound_service {
bundle_pt bundle;
char *scope;
char *topic;
- hash_map_pt msgTypes;
unsigned short getCount;
celix_thread_mutex_t mp_lock;
bool mp_send_in_progress;
array_list_pt mp_parts;
largeUdp_pt largeUdpHandle;
-}* publish_bundle_bound_service_pt;
+ pubsub_msg_serializer_map_t* map;
+} publish_bundle_bound_service_t;
typedef struct pubsub_msg{
pubsub_msg_header_pt header;
char* payload;
int payloadSize;
-}* pubsub_msg_pt;
+} pubsub_msg_t;
static unsigned int rand_range(unsigned int min, unsigned int max);
static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc);
static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
@@ -98,7 +97,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
static void delay_first_send_for_late_joiners(void);
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out){
char* ep = malloc(EP_ADDRESS_LEN);
memset(ep,0,EP_ADDRESS_LEN);
@@ -136,7 +135,7 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
while(hashMapIterator_hasNext(iter)){
- publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+ publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter);
pubsub_destroyPublishBundleBoundService(bound);
}
hashMapIterator_destroy(iter);
@@ -223,41 +222,49 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
return CELIX_SUCCESS;
}
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&(pub->tp_lock));
- pub->serializerSvc = serializerSvc;
+ //clear old serializer
+ if (pub->serializerSvc != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle , value = svc
+ while (hashMapIterator_hasNext(&iter)) {
+ publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+ pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+ bound->map = NULL;
+ }
+ }
- hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
- while(hashMapIterator_hasNext(bs_iter)){
- publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
- if (hashMap_size(boundSvc->msgTypes) == 0){
- pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle);
- }
+ //setup new serializer
+ pub->serializerSvc = serializerSvc;
+ hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+ bundle_pt bundle = hashMapEntry_getKey(entry);
+ publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry);
+ pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
}
- hashMapIterator_destroy(bs_iter);
celixThreadMutex_unlock(&(pub->tp_lock));
return status;
}
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&(pub->tp_lock));
-
- hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
- while(hashMapIterator_hasNext(bs_iter)){
- publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
- pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes);
- }
- hashMapIterator_destroy(bs_iter);
-
+ if (pub->serializerSvc == svc) {
+ hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+ pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+ bound->map = NULL;
+ }
+ }
pub->serializerSvc = NULL;
-
celixThreadMutex_unlock(&(pub->tp_lock));
return status;
@@ -275,18 +282,18 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
celixThreadMutex_lock(&(publish->tp_lock));
- publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
- if(bound==NULL){
- bound = pubsub_createPublishBundleBoundService(publish,bundle);
- if(bound!=NULL){
- hashMap_put(publish->boundServices,bundle,bound);
+ publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle);
+ if (bound == NULL) {
+ bound = pubsub_createPublishBundleBoundService(publish, bundle);
+ if (bound != NULL) {
+ hashMap_put(publish->boundServices, bundle, bound);
}
}
- else{
+ else {
bound->getCount++;
}
- if(bound!=NULL){
+ if (bound != NULL) {
*service = bound->service;
}
@@ -301,17 +308,16 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
celixThreadMutex_lock(&(publish->tp_lock));
- publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
- if(bound!=NULL){
+ publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle);
+ if (bound != NULL) {
bound->getCount--;
- if(bound->getCount==0){
+ if (bound->getCount == 0) {
pubsub_destroyPublishBundleBoundService(bound);
hashMap_remove(publish->boundServices,bundle);
}
-
}
- else{
+ else {
long bundleId = -1;
bundle_getBundleId(bundle,&bundleId);
printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
@@ -325,12 +331,11 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
return CELIX_SUCCESS;
}
-static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt msg, bool last, pubsub_release_callback_t *releaseCallback){
+static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
const int iovec_len = 3; // header + size + payload
bool ret = true;
- pubsub_udp_msg_pt udpMsg;
- int compiledMsgSize = sizeof(*udpMsg) + msg->payloadSize;
+ int compiledMsgSize = sizeof(pubsub_udp_msg_t) + msg->payloadSize;
struct iovec msg_iovec[iovec_len];
msg_iovec[0].iov_base = msg->header;
@@ -348,51 +353,51 @@ static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt
ret = false;
}
- //free(udpMsg);
if(releaseCallback) {
releaseCallback->release(msg->payload, bound);
}
return ret;
-
}
static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
int status = 0;
- publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+ publish_bundle_bound_service_t* bound = handle;
celixThreadMutex_lock(&(bound->parent->tp_lock));
celixThreadMutex_lock(&(bound->mp_lock));
- //TODO //FIXME -> should use pointer to int as identifier, can be many pointers to int ....
- printf("TODO FIX usage of msg id's in the serializer hashmap. This seems wrongly based on pointers to uints!!!!\n");
- pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
- int major=0, minor=0;
+ pubsub_msg_serializer_t *msgSer = NULL;
+ if (bound->map != NULL) {
+ msgSer = hashMap_get(bound->map->serializers, (void *)(uintptr_t)msgTypeId);
+ }
- if (msgType != NULL && bound->parent->serializerSvc != NULL) {
+ if (bound->map == NULL) {
+ printf("TP: Serializer is not set!\n");
+ } else if (msgSer == NULL ){
+ printf("TP: No msg serializer available for msg type id %d\n", msgTypeId);
+ }
- version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType);
+ int major=0, minor=0;
+ if (msgSer != NULL) {
pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
-
strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
msg_hdr->type = msgTypeId;
- if (msgVersion != NULL){
- version_getMajor(msgVersion, &major);
- version_getMinor(msgVersion, &minor);
+ if (msgSer->msgVersion != NULL){
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
msg_hdr->major = major;
msg_hdr->minor = minor;
}
- void* serializedOutput = NULL;
- int serializedOutputLen = 0;
- bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen);
+ char* serializedOutput = NULL;
+ size_t serializedOutputLen = 0;
+ msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen);
- pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+ pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
msg->header = msg_hdr;
- msg->payload = (char *) serializedOutput;
+ msg->payload = serializedOutput;
msg->payloadSize = serializedOutputLen;
if(send_pubsub_msg(bound, msg,true, NULL) == false) {
@@ -403,11 +408,7 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
free(serializedOutput);
} else {
- if (bound->parent->serializerSvc == NULL) {
- printf("TP: Serializer is not set!\n");
- } else {
- printf("TP: Message %u not supported.\n",msgTypeId);
- }
+ printf("TP: Message %u not supported.\n",msgTypeId);
status=-1;
}
@@ -430,9 +431,9 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
}
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle) {
- publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+ publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
if (bound != NULL) {
bound->service = calloc(1, sizeof(*bound->service));
@@ -445,7 +446,6 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
bound->getCount = 1;
bound->mp_send_in_progress = false;
celixThreadMutex_create(&bound->mp_lock,NULL);
- bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type>
arrayList_create(&bound->mp_parts);
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
@@ -457,10 +457,10 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
bound->service->send = pubsub_topicPublicationSend;
bound->service->sendMultipart = NULL; //Multipart not supported (jet) for UDP
- if (tp->serializerSvc != NULL){
- tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle);
+ //TODO check if lock on tp is needed? (e.g. is lock already done by caller?)
+ if (tp->serializerSvc != NULL) {
+ tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
}
-
}
else
{
@@ -474,30 +474,33 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
return bound;
}
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc) {
celixThreadMutex_lock(&boundSvc->mp_lock);
- if(boundSvc->service != NULL){
+ if (boundSvc->service != NULL) {
free(boundSvc->service);
}
- if(boundSvc->msgTypes != NULL){
- if (boundSvc->parent->serializerSvc != NULL){
- boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes);
- }
- hashMap_destroy(boundSvc->msgTypes,false,false);
- }
+ //TODO check if lock on parent is needed, e.g. does the caller already lock?
+ if (boundSvc->map != NULL) {
+ if (boundSvc->parent->serializerSvc == NULL) {
+ printf("TP: Cannot destroy pubsub msg serializer map. No serliazer service\n");
+ } else {
+ boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map);
+ boundSvc->map = NULL;
+ }
+ }
- if(boundSvc->mp_parts!=NULL){
+ if (boundSvc->mp_parts!=NULL) {
arrayList_destroy(boundSvc->mp_parts);
}
- if(boundSvc->scope!=NULL){
+ if (boundSvc->scope!=NULL) {
free(boundSvc->scope);
}
- if(boundSvc->topic!=NULL){
+ if (boundSvc->topic!=NULL) {
free(boundSvc->topic);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 91bce9f..da23b21 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -49,7 +49,6 @@
#include "topic_subscription.h"
#include "subscriber.h"
#include "publisher.h"
-#include "dyn_msg_utils.h"
#include "pubsub_publish_service_private.h"
#include "large_udp.h"
@@ -68,11 +67,15 @@ struct topic_subscription{
celix_thread_mutex_t ts_lock;
bundle_context_pt context;
int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
- hash_map_pt servicesMap; // key = service, value = msg types map
+
+ //NOTE. using a service ptr can be dangerous, because pointer can be reused.
+ //ensuring that pointer are removed before new (refurbish) pionter comes along is crucial!
+ hash_map_pt msgSerializersMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
+
hash_map_pt socketMap; // key = URL, value = listen-socket
unsigned int nrSubscribers;
largeUdp_pt largeUdpHandle;
- pubsub_serializer_service_pt serializerSvc;
+ pubsub_serializer_service_t* serializerSvc;
};
@@ -94,7 +97,7 @@ static void sigusr1_sighandler(int signo);
static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){
celix_status_t status = CELIX_SUCCESS;
topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
@@ -115,7 +118,7 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl
celixThreadMutex_create(&ts->ts_lock,NULL);
arrayList_create(&ts->sub_ep_list);
- ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+ ts->msgSerializersMap = hashMap_create(NULL, NULL, NULL, NULL);
ts->socketMap = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
@@ -161,7 +164,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
serviceTracker_destroy(ts->tracker);
arrayList_clear(ts->sub_ep_list);
arrayList_destroy(ts->sub_ep_list);
- hashMap_destroy(ts->servicesMap,false,false);
+ hashMap_destroy(ts->msgSerializersMap,false,false);
hashMap_destroy(ts->socketMap,false,false);
largeUdp_destroy(ts->largeUdpHandle);
@@ -391,7 +394,7 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
return ts->nrSubscribers;
}
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->ts_lock);
@@ -403,50 +406,39 @@ celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p
return status;
}
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->ts_lock);
-
- hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
- while(hashMapIterator_hasNext(svc_iter)){
- hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter);
- if (hashMap_size(msgTypes) > 0){
- ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+ if (ts->serializerSvc == serializerSvc) { //only act if svc removed is services used
+ hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializersMap);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
+ ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
}
+ ts->serializerSvc = NULL;
}
- hashMapIterator_destroy(svc_iter);
-
- ts->serializerSvc = NULL;
-
celixThreadMutex_unlock(&ts->ts_lock);
return status;
}
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc){
celix_status_t status = CELIX_SUCCESS;
topic_subscription_pt ts = handle;
celixThreadMutex_lock(&ts->ts_lock);
- if (!hashMap_containsKey(ts->servicesMap, service)) {
- hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
+ if (!hashMap_containsKey(ts->msgSerializersMap, svc)) {
bundle_pt bundle = NULL;
serviceReference_getBundle(reference, &bundle);
- if (ts->serializerSvc != NULL){
- ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle);
- }
-
- if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
- hashMap_destroy(msgTypes,false,false);
- printf("TS: Unsupported subscriber!\n");
- }
- else{
- hashMap_put(ts->servicesMap, service, msgTypes);
+ if (ts->serializerSvc != NULL) {
+ pubsub_msg_serializer_map_t* map = NULL;
+ ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+ if (map != NULL) {
+ hashMap_put(ts->msgSerializersMap, svc, map);
+ }
}
-
}
celixThreadMutex_unlock(&ts->ts_lock);
printf("TS: New subscriber registered.\n");
@@ -454,18 +446,16 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
}
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc){
celix_status_t status = CELIX_SUCCESS;
topic_subscription_pt ts = handle;
- celixThreadMutex_lock(&ts->ts_lock);
- if (hashMap_containsKey(ts->servicesMap, service)) {
- hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
- if(msgTypes!=NULL){
- if (ts->serializerSvc != NULL){
- ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
- }
- hashMap_destroy(msgTypes,false,false);
+
+ celixThreadMutex_lock(&ts->ts_lock);
+ if (hashMap_containsKey(ts->msgSerializersMap, svc)) {
+ pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializersMap, svc);
+ if (ts->serializerSvc != NULL){
+ ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
}
}
celixThreadMutex_unlock(&ts->ts_lock);
@@ -475,59 +465,51 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
}
-static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
+static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){
- hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
- while (hashMapIterator_hasNext(iter)) {
- hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializersMap);
+ celixThreadMutex_lock(&sub->ts_lock);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
- hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+ pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
- pubsub_message_type *msgType = hashMap_get(msgTypes,&(msg->header.type));
+ pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void *)(uintptr_t )msg->header.type);
- if (msgType == NULL) {
+ if (msgSer == NULL) {
printf("TS: Primary message %d not supported. NOT receiving any part of the whole message.\n",msg->header.type);
- }
- else if (sub->serializerSvc == NULL){
- printf("TS: No active serializer service found!\n");
- }
- else{
+ } else {
void *msgInst = NULL;
- char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
- version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
- bool validVersion = checkVersion(msgVersion,&msg->header);
-
+ bool validVersion = checkVersion(msgSer->msgVersion, &msg->header);
if(validVersion){
- celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) msg->payload, &msgInst);
-
+ celix_status_t status = msgSer->deserialize(msgSer->handle, msg->payload, 0, &msgInst);
if (status == CELIX_SUCCESS) {
bool release = true;
pubsub_multipart_callbacks_t mp_callbacks;
- mp_callbacks.handle = sub;
+ mp_callbacks.handle = map;
mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
mp_callbacks.getMultipart = NULL;
- subsvc->receive(subsvc->handle, name, msg->header.type, msgInst, &mp_callbacks, &release);
- if(release){
- sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
+ subsvc->receive(subsvc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+ if (release) {
+ msgSer->freeMsg(msgSer->handle, msgInst);
}
}
else{
- printf("TS: Cannot deserialize msgType %s.\n",name);
+ printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName);
}
}
- else{
+ else {
int major=0,minor=0;
- version_getMajor(msgVersion,&major);
- version_getMinor(msgVersion,&minor);
- printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,msg->header.major,msg->header.minor);
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
+ printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+ msgSer->msgName, major, minor, msg->header.major, msg->header.minor);
}
-
}
}
- hashMapIterator_destroy(iter);
+ celixThreadMutex_unlock(&sub->ts_lock);
}
static void* udp_recv_thread_func(void * arg) {
@@ -555,7 +537,7 @@ static void* udp_recv_thread_func(void * arg) {
unsigned int size;
if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
// Handle data
- pubsub_udp_msg_pt udpMsg = NULL;
+ pubsub_udp_msg_t* udpMsg = NULL;
if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
printf("TS: ERROR largeUdp_read with index %d\n", index);
continue;
@@ -577,19 +559,19 @@ static void* udp_recv_thread_func(void * arg) {
}
-static void sigusr1_sighandler(int signo){
+static void sigusr1_sighandler(int signo) {
printf("TS: Topic subscription being shut down...\n");
return;
}
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) {
bool check=false;
int major=0,minor=0;
- if(msgVersion!=NULL){
+ if (msgVersion!=NULL) {
version_getMajor(msgVersion,&major);
version_getMinor(msgVersion,&minor);
- if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+ if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
}
}
@@ -597,8 +579,24 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
return check;
}
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
- *msgTypeId = utils_stringHash(msgType);
- return 0;
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* out) {
+ pubsub_msg_serializer_map_t* map = handle;
+ hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
+ unsigned int msgTypeId = 0;
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+ if (strncmp(msgSer->msgName, msgType, 1024 * 1024) == 0) {
+ msgTypeId = msgSer->msgId;
+ break;
+ }
+ }
+
+ if (msgTypeId == 0) {
+ printf("Cannot find msg type id for msgType %s\n", msgType);
+ return -1;
+ } else {
+ *out = msgTypeId;
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 956830d..49eba87 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -52,7 +52,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
${ZMQ_CRYPTO_C}
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
)
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index 2f81bff..3c36986 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -49,7 +49,7 @@
struct pubsub_admin {
- pubsub_serializer_service_pt serializerSvc;
+ pubsub_serializer_service_t* serializerSvc;
bundle_context_pt bundle_context;
log_helper_pt loghelper;
@@ -89,7 +89,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
#endif /* PUBSUB_ADMIN_IMPL_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
index b6b76c6..158dfe7 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
@@ -34,14 +34,14 @@
typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index c6fe93a..1fbbaaf 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -38,7 +38,7 @@
typedef struct topic_subscription* topic_subscription_pt;
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out);
celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -51,8 +51,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 09fcd8c..5c9a5d5 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -666,7 +666,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
return status;
}
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
admin->serializerSvc = serializerSvc;
@@ -694,7 +694,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
return status;
}
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
admin->serializerSvc = NULL;
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index bb8ff56..2e95874 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -47,7 +47,6 @@
#include "version.h"
#include "pubsub_common.h"
-#include "dyn_msg_utils.h"
#include "pubsub_utils.h"
#include "publisher.h"
@@ -70,7 +69,7 @@ struct topic_publication {
array_list_pt pub_ep_list; //List<pubsub_endpoint>
hash_map_pt boundServices; //<bundle_pt,bound_service>
celix_thread_mutex_t tp_lock;
- pubsub_serializer_service_pt serializerSvc;
+ pubsub_serializer_service_t* serializerSvc;
};
typedef struct publish_bundle_bound_service {
@@ -78,26 +77,26 @@ typedef struct publish_bundle_bound_service {
pubsub_publisher_pt service;
bundle_pt bundle;
char *topic;
- hash_map_pt msgTypes;
+ pubsub_msg_serializer_map_t* map;
unsigned short getCount;
celix_thread_mutex_t mp_lock;
bool mp_send_in_progress;
array_list_pt mp_parts;
-}* publish_bundle_bound_service_pt;
+} publish_bundle_bound_service_t;
-typedef struct pubsub_msg{
+typedef struct pubsub_msg {
pubsub_msg_header_pt header;
char* payload;
int payloadSize;
-}* pubsub_msg_pt;
+} pubsub_msg_t;
static unsigned int rand_range(unsigned int min, unsigned int max);
static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc);
static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags);
@@ -105,7 +104,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
static void delay_first_send_for_late_joiners(void);
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
celix_status_t status = CELIX_SUCCESS;
#ifdef BUILD_WITH_ZMQ_SECURITY
@@ -235,7 +234,7 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
while(hashMapIterator_hasNext(iter)){
- publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+ publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter);
pubsub_destroyPublishBundleBoundService(bound);
}
hashMapIterator_destroy(iter);
@@ -332,43 +331,50 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
return CELIX_SUCCESS;
}
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&(pub->tp_lock));
- pub->serializerSvc = serializerSvc;
-
- hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
- while(hashMapIterator_hasNext(bs_iter)){
- publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
- if (hashMap_size(boundSvc->msgTypes) == 0){
- pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle);
+ //clearing pref serializer
+ if (pub->serializerSvc != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+ pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+ bound->map = NULL;
}
}
- hashMapIterator_destroy(bs_iter);
+
+ pub->serializerSvc = serializerSvc;
+ hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+ bundle_pt bundle = hashMapEntry_getKey(entry);
+ publish_bundle_bound_service_t* boundSvc = hashMapEntry_getValue(entry);
+ pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &boundSvc->map);
+ }
celixThreadMutex_unlock(&(pub->tp_lock));
return status;
}
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){
celix_status_t status = CELIX_SUCCESS;
-
celixThreadMutex_lock(&(pub->tp_lock));
- hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
- while(hashMapIterator_hasNext(bs_iter)){
- publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
- pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes);
+ if (pub->serializerSvc == svc) {
+ hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ publish_bundle_bound_service_t *boundSvc = hashMapIterator_nextValue(&iter);
+ pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, boundSvc->map);
+ boundSvc->map = NULL;
+ }
+ pub->serializerSvc = NULL;
}
- hashMapIterator_destroy(bs_iter);
-
- pub->serializerSvc = NULL;
celixThreadMutex_unlock(&(pub->tp_lock));
-
return status;
}
@@ -384,7 +390,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
celixThreadMutex_lock(&(publish->tp_lock));
- publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+ publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle);
if(bound==NULL){
bound = pubsub_createPublishBundleBoundService(publish,bundle);
if(bound!=NULL){
@@ -410,7 +416,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
celixThreadMutex_lock(&(publish->tp_lock));
- publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+ publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle);
if(bound!=NULL){
bound->getCount--;
@@ -434,7 +440,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
return CELIX_SUCCESS;
}
-static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
+static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_t* msg, bool last){
bool ret = true;
@@ -474,7 +480,7 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
unsigned int i = 0;
unsigned int mp_num = arrayList_size(mp_msg_parts);
for(;i<mp_num;i++){
- ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
+ ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_t*)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
}
arrayList_clear(mp_msg_parts);
@@ -489,10 +495,8 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
}
static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags){
-
int status = 0;
-
- publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+ publish_bundle_bound_service_t* bound = handle;
celixThreadMutex_lock(&(bound->mp_lock));
if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
@@ -501,38 +505,33 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
return -3;
}
- pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
+ pubsub_msg_serializer_t* msgSer = NULL;
+ if (bound->map != NULL) {
+ msgSer = hashMap_get(bound->map->serializers, (void*)(uintptr_t)msgTypeId);
+ }
int major=0, minor=0;
- if (msgType != NULL && bound->parent->serializerSvc != NULL) {
-
- version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType);
-
+ if (msgSer != NULL) {
pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
-
strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
msg_hdr->type = msgTypeId;
- if (msgVersion != NULL){
- version_getMajor(msgVersion, &major);
- version_getMinor(msgVersion, &minor);
+ if (msgSer->msgVersion != NULL){
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
msg_hdr->major = major;
msg_hdr->minor = minor;
}
- void* serializedOutput = NULL;
- int serializedOutputLen = 0;
- bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen);
-
- pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+ char* serializedOutput = NULL;
+ size_t serializedOutputLen = 0;
+ msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen);
+ pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
msg->header = msg_hdr;
msg->payload = (char *) serializedOutput;
msg->payloadSize = serializedOutputLen;
-
bool snd = true;
- switch(flags){
+ switch (flags) {
case PUBSUB_PUBLISHER_FIRST_MSG:
bound->mp_send_in_progress = true;
arrayList_add(bound->mp_parts,msg);
@@ -602,9 +601,9 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
}
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
- publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+ publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
if (bound != NULL) {
bound->service = calloc(1, sizeof(*bound->service));
@@ -617,7 +616,11 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
bound->getCount = 1;
bound->mp_send_in_progress = false;
celixThreadMutex_create(&bound->mp_lock,NULL);
- bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type>
+
+ //TODO check if lock is needed. e.g. was the caller already locked?
+ if (tp->serializerSvc != NULL) {
+ tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
+ }
arrayList_create(&bound->mp_parts);
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
@@ -628,10 +631,6 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
bound->service->send = pubsub_topicPublicationSend;
bound->service->sendMultipart = pubsub_topicPublicationSendMultipart;
- if (tp->serializerSvc != NULL){
- tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle);
- }
-
}
else
{
@@ -645,26 +644,24 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
return bound;
}
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc){
celixThreadMutex_lock(&boundSvc->mp_lock);
- if(boundSvc->service != NULL){
+ if (boundSvc->service != NULL) {
free(boundSvc->service);
}
- if(boundSvc->msgTypes != NULL){
- if (boundSvc->parent->serializerSvc != NULL){
- boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes);
- }
- hashMap_destroy(boundSvc->msgTypes,false,false);
+ if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
+ boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map);
+ boundSvc->map = NULL;
}
- if(boundSvc->mp_parts!=NULL){
+ if (boundSvc->mp_parts!=NULL) {
arrayList_destroy(boundSvc->mp_parts);
}
- if(boundSvc->topic!=NULL){
+ if (boundSvc->topic!=NULL) {
free(boundSvc->topic);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 3de56af..7ef2c5d 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -46,7 +46,6 @@
#include "subscriber.h"
#include "publisher.h"
-#include "dyn_msg_utils.h"
#include "pubsub_utils.h"
#ifdef BUILD_WITH_ZMQ_SECURITY
@@ -58,8 +57,7 @@
#define POLL_TIMEOUT 250
#define ZMQ_POLL_TIMEOUT_MS_ENV "ZMQ_POLL_TIMEOUT_MS"
-struct topic_subscription{
-
+struct topic_subscription {
zsock_t* zmq_socket;
zcert_t * zmq_cert;
zcert_t * zmq_pub_cert;
@@ -71,26 +69,25 @@ struct topic_subscription{
celix_thread_mutex_t ts_lock;
bundle_context_pt context;
- hash_map_pt servicesMap; // key = service, value = msg types map
+ hash_map_pt msgSerializers; // key = service ptr, value = pubsub_msg_serializer_map_t*
array_list_pt pendingConnections;
array_list_pt pendingDisconnections;
celix_thread_mutex_t pendingConnections_lock;
celix_thread_mutex_t pendingDisconnections_lock;
unsigned int nrSubscribers;
- pubsub_serializer_service_pt serializerSvc;
-
+ pubsub_serializer_service_t* serializerSvc;
};
-typedef struct complete_zmq_msg{
+typedef struct complete_zmq_msg {
zframe_t* header;
zframe_t* payload;
}* complete_zmq_msg_pt;
-typedef struct mp_handle{
- hash_map_pt svc_msg_db;
+typedef struct mp_handle {
+ pubsub_msg_serializer_map_t* map;
hash_map_pt rcv_msg_map;
-}* mp_handle_pt;
+} mp_handle_t;
typedef struct msg_map_entry{
bool retain;
@@ -104,12 +101,12 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
static void sigusr1_sighandler(int signo);
static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part);
-static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list);
-static void destroy_mp_handle(mp_handle_pt mp_handle);
+static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list);
+static void destroy_mp_handle(mp_handle_t* mp_handle);
static void connectPendingPublishers(topic_subscription_pt sub);
static void disconnectPendingPublishers(topic_subscription_pt sub);
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){
celix_status_t status = CELIX_SUCCESS;
#ifdef BUILD_WITH_ZMQ_SECURITY
@@ -223,7 +220,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
celixThreadMutex_create(&ts->socket_lock, NULL);
celixThreadMutex_create(&ts->ts_lock,NULL);
arrayList_create(&ts->sub_ep_list);
- ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+ ts->msgSerializers = hashMap_create(NULL, NULL, NULL, NULL);
arrayList_create(&ts->pendingConnections);
arrayList_create(&ts->pendingDisconnections);
celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
@@ -269,7 +266,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
serviceTracker_destroy(ts->tracker);
arrayList_clear(ts->sub_ep_list);
arrayList_destroy(ts->sub_ep_list);
- hashMap_destroy(ts->servicesMap,false,false);
+ hashMap_destroy(ts->msgSerializers,false,false);
celixThreadMutex_lock(&ts->pendingConnections_lock);
arrayList_destroy(ts->pendingConnections);
@@ -429,7 +426,7 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
return ts->nrSubscribers;
}
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->ts_lock);
@@ -441,22 +438,18 @@ celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p
return status;
}
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* svc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->ts_lock);
-
- hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
- while(hashMapIterator_hasNext(svc_iter)){
- hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter);
- if (hashMap_size(msgTypes) > 0){
- ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+ if (ts->serializerSvc == svc) {
+ hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializers);
+ while(hashMapIterator_hasNext(&iter)){
+ pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
+ ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
}
}
- hashMapIterator_destroy(svc_iter);
-
ts->serializerSvc = NULL;
-
celixThreadMutex_unlock(&ts->ts_lock);
return status;
@@ -467,24 +460,19 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
topic_subscription_pt ts = handle;
celixThreadMutex_lock(&ts->ts_lock);
- if (!hashMap_containsKey(ts->servicesMap, service)) {
- hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
+ if (!hashMap_containsKey(ts->msgSerializers, service)) {
bundle_pt bundle = NULL;
serviceReference_getBundle(reference, &bundle);
- if (ts->serializerSvc != NULL){
- ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle);
- }
-
- if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
- hashMap_destroy(msgTypes,false,false);
- printf("TS: Unsupported subscriber!\n");
- }
- else{
- hashMap_put(ts->servicesMap, service, msgTypes);
+ if (ts->serializerSvc != NULL) {
+ pubsub_msg_serializer_map_t* map = NULL;
+ ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+ if (map != NULL) {
+ hashMap_put(ts->msgSerializers, service, map);
+ } else {
+ printf("TS: Cannot create msg serializer map\n");
+ }
}
-
}
celixThreadMutex_unlock(&ts->ts_lock);
printf("TS: New subscriber registered.\n");
@@ -497,14 +485,9 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
topic_subscription_pt ts = handle;
celixThreadMutex_lock(&ts->ts_lock);
- if (hashMap_containsKey(ts->servicesMap, service)) {
- hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
- if(msgTypes!=NULL){
- if (ts->serializerSvc != NULL){
- ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
- }
- hashMap_destroy(msgTypes,false,false);
- }
+ if (hashMap_containsKey(ts->msgSerializers, service)) {
+ pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializers, service);
+ ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
}
celixThreadMutex_unlock(&ts->ts_lock);
@@ -513,66 +496,55 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
}
-static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
+static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) {
pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
- hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
- while (hashMapIterator_hasNext(iter)) {
- hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializers);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
- hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+ pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
- pubsub_message_type *msgType = hashMap_get(msgTypes,&(first_msg_hdr->type));
- if (msgType == NULL) {
- printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type);
- }
- else if (sub->serializerSvc == NULL){
- printf("TS: No active serializer found!\n");
- }
- else{
+ pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t )first_msg_hdr->type);
+ if (msgSer == NULL) {
+ printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n", first_msg_hdr->type);
+ } else {
void *msgInst = NULL;
- char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
- version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
- bool validVersion = checkVersion(msgVersion,first_msg_hdr);
-
+ bool validVersion = checkVersion(msgSer->msgVersion, first_msg_hdr);
if(validVersion){
-
- celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
+ celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst);
if (status == CELIX_SUCCESS) {
bool release = true;
- mp_handle_pt mp_handle = create_mp_handle(sub, msgTypes,msg_list);
+ mp_handle_t* mp_handle = create_mp_handle(sub, map, msg_list);
pubsub_multipart_callbacks_t mp_callbacks;
mp_callbacks.handle = mp_handle;
mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
mp_callbacks.getMultipart = pubsub_getMultipart;
- subsvc->receive(subsvc->handle, name, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
+ subsvc->receive(subsvc->handle, msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
- if(release){
- sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
+ if (release) {
+ msgSer->freeMsg(msgSer->handle, msgInst);
}
- if(mp_handle!=NULL){
+ if (mp_handle!=NULL) {
destroy_mp_handle(mp_handle);
}
}
else{
- printf("TS: Cannot deserialize msgType %s.\n",name);
+ printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName);
}
- }
- else{
+ } else {
int major=0,minor=0;
- version_getMajor(msgVersion,&major);
- version_getMinor(msgVersion,&minor);
- printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
+ printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+ msgSer->msgName, major, minor, first_msg_hdr->major, first_msg_hdr->minor);
}
-
}
}
- hashMapIterator_destroy(iter);
int i = 0;
for(;i<arrayList_size(msg_list);i++){
@@ -737,7 +709,7 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain
return -1;
}
- mp_handle_pt mp_handle = (mp_handle_pt)handle;
+ mp_handle_t* mp_handle = handle;
msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId);
if(entry!=NULL){
entry->retain = retain;
@@ -753,63 +725,55 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain
}
-static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){
+static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list) {
if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message
return NULL;
}
- mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
- mp_handle->svc_msg_db = svc_msg_db;
- mp_handle->rcv_msg_map = hashMap_create(uintHash, NULL, uintEquals, NULL);
-
- int i=1; //We skip the first message, it will be handle differently
- for(;i<arrayList_size(rcv_msg_list);i++){
- complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
+ mp_handle_t* mp_handle = calloc(1,sizeof(struct mp_handle));
+ mp_handle->map = map;
+ mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL);
+ int i; //We skip the first message, it will be handle differently
+ for (i=1 ; i<arrayList_size(rcv_msg_list) ; i++) {
+ complete_zmq_msg_pt c_msg = arrayList_get(rcv_msg_list,i);
pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header);
- pubsub_message_type *msgType = hashMap_get(svc_msg_db,&(header->type));
- if (msgType != NULL && sub->serializerSvc != NULL) {
+ pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t)(header->type));
+ if (msgSer != NULL) {
void *msgInst = NULL;
- version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
- bool validVersion = checkVersion(msgVersion,header);
-
+ bool validVersion = checkVersion(msgSer->msgVersion, header);
if(validVersion){
- celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
+ //TODO make the getMultipart lazy?
+ celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(c_msg->payload), 0, &msgInst);
if(status == CELIX_SUCCESS){
- unsigned int* msgId = calloc(1,sizeof(unsigned int));
- *msgId = header->type;
msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry));
entry->msgInst = msgInst;
- hashMap_put(mp_handle->rcv_msg_map,msgId,entry);
+ hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)(header->type), entry);
}
}
}
-
}
-
return mp_handle;
-
}
-static void destroy_mp_handle(mp_handle_pt mp_handle){
+static void destroy_mp_handle(mp_handle_t* mp_handle){
hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map);
while(hashMapIterator_hasNext(iter)){
hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
- unsigned int* msgId = (unsigned int*)hashMapEntry_getKey(entry);
+ unsigned int msgId = (unsigned int)(uintptr_t)hashMapEntry_getKey(entry);
msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
- pubsub_message_type* msgType = hashMap_get(mp_handle->svc_msg_db,msgId);
- if(msgType!=NULL){
- if(!msgEntry->retain){
- free(msgEntry->msgInst);
+ pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->map->serializers, (void*)(uintptr_t)msgId);
+ if (msgSer != NULL) {
+ if (!msgEntry->retain) {
+ msgSer->freeMsg(msgSer->handle, msgEntry->msgInst);
}
}
else{
- printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n",*msgId);
+ printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n", msgId);
}
}
hashMapIterator_destroy(iter);
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/dyn_msg_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/dyn_msg_utils.h b/pubsub/pubsub_common/public/include/dyn_msg_utils.h
deleted file mode 100644
index 71085ab..0000000
--- a/pubsub/pubsub_common/public/include/dyn_msg_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * dyn_msg_utils.h
- *
- * \date Nov 11, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef DYN_MSG_UTILS_H_
-#define DYN_MSG_UTILS_H_
-
-#include "bundle.h"
-#include "hash_map.h"
-
-unsigned int uintHash(const void * uintNum);
-int uintEquals(const void * uintNum, const void * toCompare);
-
-void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle);
-void emptyMsgTypesMap(hash_map_pt msgTypesMap);
-
-#endif /* DYN_MSG_UTILS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h b/pubsub/pubsub_common/public/include/pubsub_admin.h
index 52cb75c..f7ab7e0 100644
--- a/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ b/pubsub/pubsub_common/public/include/pubsub_admin.h
@@ -56,8 +56,8 @@ struct pubsub_admin_service {
celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
- celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
- celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+ celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+ celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
};
http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h b/pubsub/pubsub_common/public/include/pubsub_serializer.h
index f2df075..e9f9f6c 100644
--- a/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -24,33 +24,44 @@
* \copyright Apache License, Version 2.0
*/
-#ifndef PUBSUB_SERIALIZER_H_
-#define PUBSUB_SERIALIZER_H_
+#ifndef PUBSUB_SERIALIZER_SERVICE_H_
+#define PUBSUB_SERIALIZER_SERVICE_H_
#include "service_reference.h"
#include "pubsub_common.h"
-typedef struct _pubsub_message_type pubsub_message_type;
-
-typedef struct pubsub_serializer *pubsub_serializer_pt;
-
-struct pubsub_serializer_service {
+/**
+ * There should be a pubsub_serializer_t
+ * per msg type (msg id) per bundle
+ *
+ * The pubsub_serializer_service can create
+ * a serializer_map per bundle. Potentially using
+ * the extender pattern.
+ */
+typedef struct pubsub_msg_serializer {
+ void* handle;
+ unsigned int msgId;
+ const char* msgName;
+ version_pt msgVersion;
- pubsub_serializer_pt serializer;
+ celix_status_t (*serialize)(void* handle, const void* input, char** out, size_t* outLen);
+ celix_status_t (*deserialize)(void* handle, const char* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
- celix_status_t (*serialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
- celix_status_t (*deserialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output);
+ void (*freeMsg)(void* handle, void* msg);
+} pubsub_msg_serializer_t;
- void (*fillMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt bundle);
- void (*emptyMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap);
+typedef struct pubsub_msg_serializer_map {
+ bundle_pt bundle;
+ hash_map_pt serializers; //key = msg id (unsigned int), value = pubsub_serializer_t*
+} pubsub_msg_serializer_map_t;
- version_pt (*getVersion)(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
- char* (*getName)(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
- void (*freeMsg)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg);
+typedef struct pubsub_serializer_service {
+ void* handle;
-};
+ celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, pubsub_msg_serializer_map_t** out);
+ celix_status_t (*destroySerializerMap)(void* handle, pubsub_msg_serializer_map_t* map);
-typedef struct pubsub_serializer_service *pubsub_serializer_service_pt;
+} pubsub_serializer_service_t;
-#endif /* PUBSUB_SERIALIZER_H_ */
+#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */