You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/05/24 13:49:00 UTC
[celix] branch feature/use_ser_hander_in_psa updated: Updates
pubsub zmq v2 to use pubsub_serializer_handler_t
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/use_ser_hander_in_psa by this push:
new e840b83 Updates pubsub zmq v2 to use pubsub_serializer_handler_t
e840b83 is described below
commit e840b83a3a20960a70c7a9bdd29596a316964d6c
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon May 24 15:48:33 2021 +0200
Updates pubsub zmq v2 to use pubsub_serializer_handler_t
---
.../pubsub/pubsub_admin_zmq/v2/src/psa_activator.c | 11 -
.../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c | 239 ++++++---------------
.../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h | 13 --
.../v2/src/pubsub_zmq_topic_receiver.c | 153 ++++++-------
.../v2/src/pubsub_zmq_topic_receiver.h | 7 +-
.../v2/src/pubsub_zmq_topic_sender.c | 80 +++----
.../v2/src/pubsub_zmq_topic_sender.h | 5 +-
.../include/pubsub_message_serialization_marker.h | 60 ++++++
.../gtest/src/PubSubMatchingTestSuite.cpp | 94 +++-----
.../src/PubSubSerializationHandlerTestSuite.cc | 35 ++-
.../include/pubsub_serializer_handler.h | 52 ++++-
.../include/pubsub_serializer_provider.h | 36 ++--
bundles/pubsub/pubsub_utils/src/pubsub_matching.c | 19 +-
.../pubsub_utils/src/pubsub_serializer_handler.c | 78 ++++++-
14 files changed, 454 insertions(+), 428 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
index 014401e..a6d1c2d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
@@ -59,17 +59,6 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
act->admin = pubsub_zmqAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
- //track serializers
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_zmqAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_zmqAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//track protocols
if (status == CELIX_SUCCESS) {
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index 5d396e5..e30403e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -23,15 +23,17 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <ifaddrs.h>
-#include <pubsub_endpoint.h>
#include <czmq.h>
-#include <pubsub_serializer.h>
-#include <pubsub_protocol.h>
#include <ip_utils.h>
-#include <pubsub_matching.h>
-#include <pubsub_utils.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_serializer_handler.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_serializer.h"
+#include "pubsub_protocol.h"
+#include "pubsub_matching.h"
+#include "pubsub_utils.h"
+#include "pubsub_message_serialization_service.h"
+#include "pubsub_message_serialization_marker.h"
#include "pubsub_zmq_admin.h"
#include "pubsub_psa_zmq_constants.h"
#include "pubsub_zmq_topic_sender.h"
@@ -88,6 +90,10 @@ struct pubsub_zmq_admin {
hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+ } serializationHandlers;
};
typedef struct psa_zmq_protocol_entry {
@@ -99,6 +105,7 @@ typedef struct psa_zmq_protocol_entry {
static celix_status_t zmq_getIpAddress(const char* interface, char** ip);
static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId);
static bool pubsub_zmqAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
@@ -197,6 +204,9 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_lo
celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+ psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
return psa;
}
@@ -248,6 +258,14 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->protocols.mutex);
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ iter = hashMapIterator_construct(psa->serializationHandlers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+ pubsub_serializerHandler_destroy(entry);
+ }
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
@@ -263,6 +281,9 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
celixThreadMutex_destroy(&psa->protocols.mutex);
hashMap_destroy(psa->protocols.map, false, false);
+ celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+ hashMap_destroy(psa->serializationHandlers.map, false, false);
+
if (psa->zmq_auth != NULL) {
zactor_destroy(&psa->zmq_auth);
}
@@ -272,92 +293,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
free(psa);
}
-void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_zmq_admin_t *psa = handle;
-
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
- const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
- const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
- if (serType == NULL || msgId == -1L || msgFqn == NULL) {
- L_INFO("[PSA_ZMQ] Ignoring serializer service without one of the following properties: %s or %s or %s",
- PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
- L_INFO("[PSA_ZMQ] Ignored serializer type %s msgId %li fqn %s\n", serType, msgId, msgFqn);
- return;
- }
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries == NULL) {
- typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
- hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
- }
- psa_zmq_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
- if (entry == NULL) {
- entry = calloc(1, sizeof(psa_zmq_serializer_entry_t));
- entry->svc = svc;
- entry->fqn = celix_utils_strdup(msgFqn);
- entry->version = celix_utils_strdup(msgVersion);
- hashMap_put(typeEntries, (void*)msgId, entry);
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_zmq_admin_t *psa = handle;
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries != NULL) {
- psa_zmq_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
- free((void*)entry->fqn);
- free((void*)entry->version);
- free(entry);
-
- // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
- if(hashMap_size(typeEntries) == 0) {
- hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_zmq_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
- if (sender != NULL && strncmp(serType, pubsub_zmqTopicSender_serializerType(sender), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_zmqTopicSender_destroy(sender);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter);
- pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry);
- if (receiver != NULL && strncmp(serType, pubsub_zmqTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(receiverEntry);
- hashMapIterator_remove(&iter);
- pubsub_zmqTopicReceiver_destroy(receiver);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
-}
-
void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
pubsub_zmq_admin_t *psa = handle;
@@ -462,20 +397,21 @@ celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix
return status;
}
-static void pubsub_zmqAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
- const char** out = handle;
- *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
-
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId __attribute__((unused)), long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_zmq_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
- //1) Create TopicSender
- //2) Store TopicSender
- //3) Connect existing endpoints
- //4) set outPublisherEndpoint
+ //1) Get serialization handler
+ //2) Create TopicSender
+ //3) Store TopicSender
+ //4) Connect existing endpoints
+ //5) set outPublisherEndpoint
+
+ pubsub_serializer_handler_t* handler = pubsub_zmqAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic sender without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
celix_properties_t *newEndpoint = NULL;
@@ -485,17 +421,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
}
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_zmqAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
pubsub_zmq_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
@@ -503,14 +428,14 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
if (sender == NULL) {
psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
if (protEntry != NULL) {
- sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle,
+ sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle,
protocolSvcId, protEntry->svc, psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
}
if (sender != NULL) {
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, protType, NULL);
+ pubsub_zmqTopicSender_serializerType(sender), protType, NULL);
celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
//if configured use a static discover url
@@ -546,10 +471,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
}
celixThreadMutex_unlock(&psa->protocols.mutex);
- if (sender != NULL && newEndpoint != NULL) {
- //TODO connect endpoints to sender, NOTE is this needed for a zmq topic sender?
- }
-
if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
*outPublisherEndpoint = newEndpoint;
}
@@ -572,7 +493,6 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
pubsub_zmq_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
celixThreadMutex_unlock(&psa->topicSenders.mutex);
free(mapKey);
- //TODO disconnect endpoints to sender. note is this needed for a zmq topic sender?
pubsub_zmqTopicSender_destroy(sender);
} else {
celixThreadMutex_unlock(&psa->topicSenders.mutex);
@@ -585,21 +505,15 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId __attribute__((unused)), long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_zmq_admin_t *psa = handle;
-
celix_properties_t *newEndpoint = NULL;
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ pubsub_serializer_handler_t* handler = pubsub_zmqAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic receiver without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_zmqAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -608,7 +522,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
if (receiver == NULL) {
psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
if (protEntry != NULL) {
- receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serType, handle, protocolSvcId, protEntry->svc);
+ receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, handler, handle, protocolSvcId, protEntry->svc);
} else {
L_ERROR("[PSA_ZMQ] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
}
@@ -616,7 +530,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+ pubsub_zmqTopicReceiver_serializerType(receiver), protType, NULL);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -856,46 +771,6 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE
return status;
}
-psa_zmq_serializer_entry_t* pubsub_zmqAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
- pubsub_zmq_admin_t *psa = handle;
- psa_zmq_serializer_entry_t *serializer = NULL;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
- }
-
- return serializer;
-}
-
-void pubsub_zmqAdmin_releaseSerializer(void *handle, psa_zmq_serializer_entry_t* serializer __attribute__((unused))) {
- pubsub_zmq_admin_t *psa = handle;
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_zmqAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
- pubsub_zmq_admin_t *psa = handle;
- int64_t id = -1L;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
- while(hashMapIterator_hasNext(&iterator)) {
- void *key = hashMapIterator_nextKey(&iterator);
- psa_zmq_serializer_entry_t *entry = hashMap_get(typeEntries, key);
- if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
- id = (uint32_t)(uintptr_t)key;
- break;
- }
- }
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- return id;
-}
-
pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle) {
pubsub_zmq_admin_t *psa = handle;
pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
@@ -924,6 +799,20 @@ pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle) {
return result;
}
+static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId) {
+ pubsub_serializer_handler_t* handler = NULL;
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+ if (handler == NULL) {
+ handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+ if (handler != NULL) {
+ hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+ }
+ }
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+ return handler;
+}
+
#ifndef ANDROID
static celix_status_t zmq_getIpAddress(const char* interface, char** ip) {
celix_status_t status = CELIX_BUNDLE_EXCEPTION;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
index f99ee08..475e464 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
@@ -27,12 +27,6 @@
typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
-typedef struct psa_zmq_serializer_entry {
- const char *fqn;
- const char *version;
- pubsub_message_serialization_service_t *svc;
-} psa_zmq_serializer_entry_t;
-
pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper);
void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
@@ -49,18 +43,11 @@ celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *s
celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
-void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-
void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
-psa_zmq_serializer_entry_t* pubsub_zmqAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId);
-void pubsub_zmqAdmin_releaseSerializer(void *handle, psa_zmq_serializer_entry_t* serializer);
-int64_t pubsub_zmqAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn);
-
pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle);
#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 0568834..1051d77 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -26,22 +26,22 @@
#if !defined(__APPLE__)
#include <sys/epoll.h>
#endif
+
#include <assert.h>
-#include <pubsub_endpoint.h>
#include <arpa/inet.h>
#include <czmq.h>
-#include <celix_log_helper.h>
-#include "pubsub_zmq_topic_receiver.h"
-#include "pubsub_psa_zmq_constants.h"
-
#include <uuid/uuid.h>
-#include <pubsub_admin_metrics.h>
-#include <pubsub_utils.h>
-#include <celix_api.h>
-#include <celix_version.h>
+#include "pubsub_endpoint.h"
+#include "celix_log_helper.h"
+#include "pubsub_zmq_topic_receiver.h"
+#include "pubsub_psa_zmq_constants.h"
+#include "pubsub_admin_metrics.h"
+#include "pubsub_utils.h"
+#include "celix_api.h"
+#include "celix_version.h"
+#include "pubsub_serializer_handler.h"
#include "pubsub_interceptors_handler.h"
-
#include "celix_utils_api.h"
#include "pubsub_zmq_admin.h"
@@ -64,7 +64,7 @@
struct pubsub_zmq_topic_receiver {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
- const char *serializerType;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
@@ -95,6 +95,7 @@ struct pubsub_zmq_topic_receiver {
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ hash_map_t *msgFqns; //key = msg id, value = char* msgFqn
bool allInitialized;
} subscribers;
};
@@ -118,7 +119,6 @@ static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t
static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
-static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -126,14 +126,14 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char* serializerType,
+ pubsub_serializer_handler_t* serHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *protocol) {
pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
- receiver->serializerType = celix_utils_strdup(serializerType);
+ receiver->serializerHandler = serHandler;
receiver->admin = admin;
receiver->protocolSvcId = protocolSvcId;
receiver->protocol = protocol;
@@ -207,6 +207,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ receiver->subscribers.msgFqns = hashMap_create(NULL, NULL, NULL, NULL);
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
}
@@ -287,6 +288,7 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
}
}
hashMap_destroy(receiver->subscribers.map, false, false);
+ hashMap_destroy(receiver->subscribers.msgFqns, false, true);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -312,7 +314,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
free(receiver->scope);
free(receiver->topic);
- free((void*)receiver->serializerType);
}
free(receiver);
}
@@ -325,7 +326,7 @@ const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver)
}
const char* pubsub_zmqTopicReceiver_serializerType(pubsub_zmq_topic_receiver_t *receiver) {
- return receiver->serializerType;
+ return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
long pubsub_zmqTopicReceiver_protocolSvcId(pubsub_zmq_topic_receiver_t *receiver) {
@@ -446,7 +447,6 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
//NOTE receiver->subscribers.mutex locked
bool monitor = receiver->metricsEnabled;
- psa_zmq_serializer_entry_t *msgSer = pubsub_zmqAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serializerType, message->header.msgId);
//monitoring
struct timespec beginSer;
@@ -454,60 +454,64 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
int updateReceiveCount = 0;
int updateSerError = 0;
- if (msgSer!= NULL) {
- void *deserializedMsg = NULL;
- const char *msgFqn = msgSer->fqn;
- celix_version_t *version = celix_version_createVersionFromString(msgSer->version);
- bool validVersion = psa_zmq_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion);
- celix_version_destroy(version);
- if (validVersion) {
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &beginSer);
- }
- struct iovec deSerializeBuffer;
- deSerializeBuffer.iov_base = message->payload.payload;
- deSerializeBuffer.iov_len = message->payload.length;
- celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 0, &deserializedMsg);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &endSer);
- }
- if (status == CELIX_SUCCESS) {
- uint32_t msgId = message->header.msgId;
- celix_properties_t *metadata = message->metadata.metadata;
- bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
- bool release = true;
- if (cont) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
- while (hashMapIterator_hasNext(&iter2)) {
- pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
- svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
- if (!release && hashMapIterator_hasNext(&iter2)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
- status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 0, &deserializedMsg);
- if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
- break;
- }
- release = true;
+ char* msgFqn = hashMap_get(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId);
+ if (msgFqn == NULL) {
+ msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+ hashMap_put(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId, msgFqn);
+ }
+
+ void *deserializedMsg = NULL;
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+ if (validVersion) {
+ if (monitor) {
+ clock_gettime(CLOCK_REALTIME, &beginSer);
+ }
+ struct iovec deSerializeBuffer;
+ deSerializeBuffer.iov_base = message->payload.payload;
+ deSerializeBuffer.iov_len = message->payload.length;
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+ if (monitor) {
+ clock_gettime(CLOCK_REALTIME, &endSer);
+ }
+ if (status == CELIX_SUCCESS) {
+ uint32_t msgId = message->header.msgId;
+ celix_properties_t *metadata = message->metadata.metadata;
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
+ bool release = true;
+ if (cont) {
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
+ if (!release && hashMapIterator_hasNext(&iter2)) {
+ //receive function has taken ownership and still more receive function to come ..
+ //deserialize again for new message
+ status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ break;
}
+ release = true;
}
- if (release) {
- msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deserializedMsg);
- }
- updateReceiveCount += 1;
}
- } else {
- updateSerError += 1;
- L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ if (release) {
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
+ }
+ updateReceiveCount += 1;
}
+ } else {
+ updateSerError += 1;
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
} else {
- L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId);
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s', version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+ msgFqn,
+ (int)message->header.msgMajorVersion,
+ (int)message->header.msgMinorVersion,
+ pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
+ pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
}
-
- pubsub_zmqAdmin_releaseSerializer(receiver->admin, msgSer);
}
static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
@@ -744,24 +748,3 @@ static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const
ts->zmq_pub_cert = pub_cert;
#endif
}
-
-static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
- bool check=false;
-
- if (major == 0 && minor == 0) {
- //no check
- return true;
- }
-
- int versionMajor;
- int versionMinor;
- if (msgVersion!=NULL) {
- version_getMajor(msgVersion, &versionMajor);
- version_getMinor(msgVersion, &versionMinor);
- if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
- check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
- }
- }
-
- return check;
-}
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
index 67ba6d4..a2f953b 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
@@ -20,9 +20,10 @@
#ifndef CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
#define CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
-#include <pubsub_admin_metrics.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_admin_metrics.h"
+#include "pubsub_message_serialization_service.h"
#include "celix_bundle_context.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_zmq_topic_receiver pubsub_zmq_topic_receiver_t;
@@ -31,7 +32,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char* serializerType,
+ pubsub_serializer_handler_t* serHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *protocol);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index f969f4c..9996c53 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -21,17 +21,18 @@
#include <pubsub_protocol.h>
#include <stdlib.h>
#include <memory.h>
-#include <pubsub_constants.h>
-#include <pubsub/publisher.h>
-#include <utils.h>
-#include <zconf.h>
#include <arpa/inet.h>
#include <czmq.h>
-#include <celix_log_helper.h>
+#include <uuid/uuid.h>
+
+#include "celix_utils.h"
+#include "pubsub_constants.h"
+#include "pubsub/publisher.h"
+#include "celix_log_helper.h"
#include "pubsub_zmq_topic_sender.h"
#include "pubsub_psa_zmq_constants.h"
-#include <uuid/uuid.h>
-#include <celix_version.h>
+#include "celix_version.h"
+#include "pubsub_serializer_handler.h"
#include "celix_constants.h"
#include "pubsub_interceptors_handler.h"
#include "pubsub_zmq_admin.h"
@@ -51,7 +52,7 @@
struct pubsub_zmq_topic_sender {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
- const char *serializerType;
+ pubsub_serializer_handler_t* serializationHandler;
void *admin;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
@@ -85,8 +86,8 @@ struct pubsub_zmq_topic_sender {
typedef struct psa_zmq_send_msg_entry {
uint32_t type; //msg type id (hash of fqn)
const char *fqn;
- uint8_t major;
- uint8_t minor;
+ uint16_t msgMajorVersion;
+ uint16_t msgMinorVersion;
unsigned char originUUID[16];
pubsub_protocol_service_t *protSer;
unsigned int seqNr;
@@ -117,7 +118,8 @@ typedef struct psa_zmq_bounded_service_entry {
} psa_zmq_bounded_service_entry_t;
typedef struct psa_zmq_zerocopy_free_entry {
- psa_zmq_serializer_entry_t *msgSer;
+ uint32_t msgId;
+ pubsub_serializer_handler_t *serHandler;
struct iovec *serializedOutput;
size_t serializedOutputLen;
} psa_zmq_zerocopy_free_entry;
@@ -135,7 +137,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char* serializerType,
+ pubsub_serializer_handler_t* serializationHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *prot,
@@ -146,7 +148,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serializerType = celix_utils_strdup(serializerType);
+ sender->serializationHandler = serializationHandler;
sender->admin = admin;
sender->protocolSvcId = protocolSvcId;
sender->protocol = prot;
@@ -352,13 +354,12 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
}
free(sender->topic);
free(sender->url);
- free((void*)sender->serializerType);
free(sender);
}
}
const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
- return sender->serializerType;
+ return pubsub_serializerHandler_getSerializationType(sender->serializationHandler);
}
long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) {
@@ -383,10 +384,7 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender) {
static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t *) handle;
- int64_t rc = pubsub_zmqAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType);
- if(rc >= 0) {
- *msgTypeId = (unsigned int)rc;
- }
+ *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializationHandler, msgType);
return 0;
}
@@ -485,7 +483,7 @@ pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
static void psa_zmq_freeMsg(void *msg, void *hint) {
psa_zmq_zerocopy_free_entry *entry = hint;
- entry->msgSer->svc->freeSerializedMsg(entry->msgSer->svc->handle, entry->serializedOutput, entry->serializedOutputLen);
+ pubsub_serializerHandler_freeSerializedMsg(entry->serHandler, entry->msgId, entry->serializedOutput, entry->serializedOutputLen);
free(entry);
}
@@ -500,14 +498,6 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
pubsub_zmq_topic_sender_t *sender = bound->parent;
bool monitor = sender->metricsEnabled;
- psa_zmq_serializer_entry_t *serializer = pubsub_zmqAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId);
-
- if(serializer == NULL) {
- pubsub_zmqAdmin_releaseSerializer(sender->admin, serializer);
- L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- return CELIX_SERVICE_EXCEPTION;
- }
-
psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId));
//metrics updates
@@ -519,15 +509,13 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
int serializationErrorUpdate = 0;
int sendCountUpdate = 0;
- if(entry == NULL) {
+ if (entry == NULL) {
entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t));
entry->protSer = sender->protocol;
entry->type = msgTypeId;
- entry->fqn = serializer->fqn;
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- entry->major = (uint8_t)celix_version_getMajor(version);
- entry->minor = (uint8_t)celix_version_getMinor(version);
- celix_version_destroy(version);
+ entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializationHandler, msgTypeId);
+ entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializationHandler, msgTypeId);
+ entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializationHandler, msgTypeId);
uuid_copy(entry->originUUID, sender->fwUUID);
celixThreadMutex_create(&entry->metrics.mutex, NULL);
hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
@@ -540,8 +528,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
}
size_t serializedOutputLen = 0;
struct iovec *serializedOutput = NULL;
- status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedOutput, &serializedOutputLen);
-
+ status = pubsub_serializerHandler_serialize(sender->serializationHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
if (monitor) {
clock_gettime(CLOCK_REALTIME, &serializationEnd);
}
@@ -554,7 +541,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
usleep(500);
}
- bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata);
+ bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata);
if (cont) {
pubsub_protocol_message_t message;
@@ -576,8 +563,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
message.header.msgId = msgTypeId;
message.header.seqNr = entry->seqNr;
- message.header.msgMajorVersion = 0;
- message.header.msgMinorVersion = 0;
+ message.header.msgMajorVersion = entry->msgMajorVersion;
+ message.header.msgMinorVersion = entry->msgMinorVersion;
message.header.payloadSize = payloadLength;
message.header.metadataSize = entry->metadataBufferSize;
message.header.payloadPartSize = payloadLength;
@@ -601,7 +588,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
zmq_msg_t msg4; // Footer
void *socket = zsock_resolve(sender->zmq.socket);
psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
- freeMsgEntry->msgSer = serializer;
+ freeMsgEntry->serHandler = sender->serializationHandler;
+ freeMsgEntry->msgId = msgTypeId;
freeMsgEntry->serializedOutput = serializedOutput;
freeMsgEntry->serializedOutputLen = serializedOutputLen;
@@ -671,13 +659,13 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
__atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
}
- pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata);
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata);
if (message.metadata.metadata) {
celix_properties_destroy(message.metadata.metadata);
}
if (!bound->parent->zeroCopyEnabled && serializedOutput) {
- serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedOutput, serializedOutputLen);
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializationHandler, msgTypeId, serializedOutput, serializedOutputLen);
}
if (sendOk) {
@@ -691,15 +679,13 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
}
} else {
serializationErrorUpdate = 1;
- L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
}
- pubsub_zmqAdmin_releaseSerializer(sender->admin, serializer);
-
- if (monitor && entry != NULL) {
+ if (monitor) {
celixThreadMutex_lock(&entry->metrics.mutex);
- long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed;
+ double n = (double)(entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed);
double diff = celix_difftime(&serializationStart, &serializationEnd);
double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n+1);
entry->metrics.averageSerializationTimeInSeconds = average;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
index 744f653..c2c0d57 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
@@ -20,7 +20,7 @@
#ifndef CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
#define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_serializer_handler.h"
#include "celix_bundle_context.h"
#include "pubsub_admin_metrics.h"
#include "celix_log_helper.h"
@@ -32,7 +32,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char* serializerType,
+ pubsub_serializer_handler_t* serializationHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *prot,
@@ -40,6 +40,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
const char *staticBindUrl,
unsigned int basePort,
unsigned int maxPort);
+
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);
const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
new file mode 100644
index 0000000..a4ff06b
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
+
+#include "hash_map.h"
+#include "version.h"
+#include "celix_bundle.h"
+#include "sys/uio.h"
+
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME "pubsub_message_serialization_marker"
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION "1.0.0"
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_RANGE "[1,2)"
+
+/**
+ * @brief Service property (named "serialization.type") identifying the serialization type (e.g json, avrobin, etc)
+ */
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY "serialization.type"
+
+/**
+ * @brief Service property (named "serialization.backwards.compatible") identifying whether the serialization is
+ * backwards compatible (i.e. for json a extra - new - field can be safely ignored and is thus backwards compatible.
+ *
+ * Type if boolean. If service property is not present, false will be used.
+ */
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE "serialization.backwards.compatible"
+
+/**
+ * @brief Marker interface - interface with no methods - to indicate that a serialization.type is available.
+ *
+ * This marker interface is used to indicate that a serialization type is available without the need to rely on
+ * pubsub message serialization service per msg type.
+ * The service.ranking of this marker interface must be used to select a serialization type if no serialization type is
+ * configured. The service.ranking of individual pubsub_messge_serization_service is used to override serializers per
+ * type.
+ *
+ * The properties serialization.type is mandatory
+ */
+typedef struct pubsub_message_serialization_marker {
+ void* handle;
+} pubsub_message_serialization_marker_t;
+
+#endif /* PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_ */
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp b/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
index 85c86c1..10555a3 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
@@ -20,16 +20,17 @@
#include "gtest/gtest.h"
#include <memory>
+#include <cstdarg>
-#include <celix_api.h>
+#include "celix_api.h"
#include "pubsub_message_serialization_service.h"
#include "dyn_message.h"
-#include <cstdarg>
-#include <pubsub_protocol.h>
-#include <pubsub_constants.h>
-#include <pubsub_matching.h>
-#include <pubsub/api.h>
-#include <pubsub_endpoint.h>
+#include "pubsub_protocol.h"
+#include "pubsub_constants.h"
+#include "pubsub_matching.h"
+#include "pubsub/api.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_message_serialization_marker.h"
static void stdLog(void*, int level, const char *file, int line, const char *msg, ...) {
va_list ap;
@@ -54,70 +55,46 @@ public:
bndId = celix_bundleContext_installBundle(ctx.get(), MATCHING_BUNDLE, true);
dynMessage_logSetup(stdLog, NULL, 1);
-
- msgSerSvc.handle = this;
- msgSerSvc.serialize = [](void*, const void*, struct iovec**, size_t*) -> celix_status_t {
- return CELIX_SUCCESS;
- };
- msgSerSvc.freeSerializedMsg = [](void*, struct iovec* , size_t) {
- };
- msgSerSvc.deserialize = [](void*, const struct iovec*, size_t, void**) -> celix_status_t {
- return CELIX_SUCCESS;
- };
- msgSerSvc.freeDeserializedMsg = [](void*, void*) {
- };
}
~PubSubMatchingTestSuite() override {
celix_bundleContext_uninstallBundle(ctx.get(), bndId);
}
- long registerSerSvc(const char* type, uint32_t msgId, const char* msgFqn, const char* msgVersion, long ranking) {
+ long registerMarkerSerSvc(const char* type) {
auto* p = celix_properties_create();
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, type);
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, std::to_string(msgId).c_str());
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, msgFqn);
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
- celix_properties_setLong(p, OSGI_FRAMEWORK_SERVICE_RANKING, ranking);
+ celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, type);
celix_service_registration_options_t opts{};
- opts.svc = static_cast<void*>(&msgSerSvc);
+ opts.svc = static_cast<void*>(&serMarkerSvc);
opts.properties = p;
- opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_VERSION;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+ opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION;
return celix_bundleContext_registerServiceWithOptions(ctx.get(), &opts);
}
std::shared_ptr<celix_framework_t> fw{};
std::shared_ptr<celix_bundle_context_t> ctx{};
- pubsub_message_serialization_service_t msgSerSvc{};
+ pubsub_message_serialization_marker_t serMarkerSvc{};
pubsub_protocol_service_t protocolSvc{};
long bndId{};
};
TEST_F(PubSubMatchingTestSuite, MatchPublisherSimple) {
- auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
-
+ auto serMarkerId = registerMarkerSerSvc("fiets");
long foundSvcId = -1;
-
pubsub_utils_matchPublisher(ctx.get(), bndId, "(&(objectClass=pubsub.publisher)(service.lang=C)(topic=fiets))", "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
- EXPECT_EQ(foundSvcId, serId);
-
- celix_bundleContext_unregisterService(ctx.get(), serId);
+ EXPECT_EQ(foundSvcId, serMarkerId);
+ celix_bundleContext_unregisterService(ctx.get(), serMarkerId);
}
TEST_F(PubSubMatchingTestSuite, MatchPublisherMultiple) {
- auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
- auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
- auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
- auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
-
+ auto serFietsId = registerMarkerSerSvc("fiets");
+ auto serFiets2Id = registerMarkerSerSvc("fiets");
+ auto serAutoId = registerMarkerSerSvc("auto");
+ auto serBelId = registerMarkerSerSvc("bel");
long foundSvcId = -1;
-
pubsub_utils_matchPublisher(ctx.get(), bndId, "(&(objectClass=pubsub.publisher)(service.lang=C)(topic=fiets))", "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
- EXPECT_EQ(foundSvcId, serFiets2Id);
-
+ EXPECT_EQ(foundSvcId, serFietsId); //older service are ranked higher
celix_bundleContext_unregisterService(ctx.get(), serFietsId);
celix_bundleContext_unregisterService(ctx.get(), serFiets2Id);
celix_bundleContext_unregisterService(ctx.get(), serAutoId);
@@ -125,26 +102,23 @@ TEST_F(PubSubMatchingTestSuite, MatchPublisherMultiple) {
}
TEST_F(PubSubMatchingTestSuite, MatchSubscriberSimple) {
- auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
+ auto serId = registerMarkerSerSvc("fiets");
long foundSvcId = -1;
auto* p = celix_properties_create();
celix_properties_set(p, PUBSUB_SUBSCRIBER_SCOPE, "scope");
celix_properties_set(p, PUBSUB_SUBSCRIBER_TOPIC, "fiets");
-
pubsub_utils_matchSubscriber(ctx.get(), bndId, p, "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
EXPECT_EQ(foundSvcId, serId);
-
celix_properties_destroy(p);
celix_bundleContext_unregisterService(ctx.get(), serId);
}
TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
- auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
- auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
- auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
- auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
+ auto serFietsId = registerMarkerSerSvc("fiets");
+ auto serFiets2Id = registerMarkerSerSvc("fiets");
+ auto serAutoId = registerMarkerSerSvc("auto");
+ auto serBelId = registerMarkerSerSvc("bel");
long foundSvcId = -1;
@@ -154,7 +128,7 @@ TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
pubsub_utils_matchSubscriber(ctx.get(), bndId, p, "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
- EXPECT_EQ(foundSvcId, serFiets2Id);
+ EXPECT_EQ(foundSvcId, serFietsId);
celix_properties_destroy(p);
celix_bundleContext_unregisterService(ctx.get(), serFietsId);
@@ -164,7 +138,7 @@ TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
}
TEST_F(PubSubMatchingTestSuite, MatchEndpointSimple) {
- auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
+ auto serId = registerMarkerSerSvc("fiets");
long foundSvcId = -1;
@@ -183,10 +157,10 @@ TEST_F(PubSubMatchingTestSuite, MatchEndpointSimple) {
}
TEST_F(PubSubMatchingTestSuite, MatchEndpointMultiple) {
- auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
- auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
- auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
- auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
+ auto serFietsId = registerMarkerSerSvc("fiets");
+ auto serFiets2Id = registerMarkerSerSvc("fiets");
+ auto serAutoId = registerMarkerSerSvc("auto");
+ auto serBelId = registerMarkerSerSvc("bel");
long foundSvcId = -1;
@@ -197,7 +171,7 @@ TEST_F(PubSubMatchingTestSuite, MatchEndpointMultiple) {
pubsub_utils_matchEndpoint(ctx.get(), logHelper, ep, "admin?", false, &foundSvcId, NULL);
- EXPECT_EQ(foundSvcId, serFiets2Id);
+ EXPECT_EQ(foundSvcId, serFietsId);
celix_properties_destroy(ep);
celix_logHelper_destroy(logHelper);
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
index 2448c18..550c85f 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
@@ -20,12 +20,13 @@
#include "gtest/gtest.h"
#include <memory>
+#include <cstdarg>
#include "celix_api.h"
#include "pubsub_message_serialization_service.h"
#include "pubsub_serializer_handler.h"
#include "dyn_message.h"
-#include <cstdarg>
+#include "pubsub_message_serialization_marker.h"
static void stdLog(void*, int level, const char *file, int line, const char *msg, ...) {
va_list ap;
@@ -98,6 +99,7 @@ public:
TEST_F(PubSubSerializationHandlerTestSuite, CreateDestroy) {
auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
ASSERT_TRUE(handler != nullptr);
+ ASSERT_STREQ("json", pubsub_serializerHandler_getSerializationType(handler));
pubsub_serializerHandler_destroy(handler);
}
@@ -176,6 +178,8 @@ TEST_F(PubSubSerializationHandlerTestSuite, MultipleVersions) {
EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 14));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 1));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMajorVersion(handler, 42), 1);
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMinorVersion(handler, 42), 0);
celix_bundleContext_unregisterService(ctx.get(), svcId1);
celix_bundleContext_unregisterService(ctx.get(), svcId2);
@@ -194,6 +198,8 @@ TEST_F(PubSubSerializationHandlerTestSuite, NoBackwardsCompatbile) {
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 14));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 1));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMajorVersion(handler, 42), 1);
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMinorVersion(handler, 42), 0);
celix_bundleContext_unregisterService(ctx.get(), svcId1);
pubsub_serializerHandler_destroy(handler);
@@ -261,4 +267,29 @@ TEST_F(PubSubSerializationHandlerTestSuite, BackwardsCompatibleCall) {
celix_bundleContext_unregisterService(ctx.get(), svcId1);
pubsub_serializerHandler_destroy(handler);
-}
\ No newline at end of file
+}
+
+TEST_F(PubSubSerializationHandlerTestSuite, CreateHandlerFromMarker) {
+ auto* logHelper = celix_logHelper_create(ctx.get(), "test");
+ auto* marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), 1032 /*invalid*/, logHelper);
+ EXPECT_FALSE(marker); //non existing svc
+
+ pubsub_message_serialization_marker_t markerSvc;
+ long svcId = celix_bundleContext_registerService(ctx.get(), &markerSvc, PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, NULL);
+ EXPECT_GE(svcId, 0);
+ marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), svcId, logHelper);
+ EXPECT_FALSE(marker); //missing ser type service property
+ celix_bundleContext_unregisterService(ctx.get(), svcId);
+
+ auto* props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, "test");
+ svcId = celix_bundleContext_registerService(ctx.get(), &markerSvc, PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, props);
+ EXPECT_GE(svcId, 0);
+ marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), svcId, logHelper);
+ EXPECT_TRUE(marker);
+ EXPECT_STREQ("test", pubsub_serializerHandler_getSerializationType(marker));
+ celix_bundleContext_unregisterService(ctx.get(), svcId);
+ pubsub_serializerHandler_destroy(marker);
+
+ celix_logHelper_destroy(logHelper);
+}
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
index 06c18de..0519891 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
@@ -23,6 +23,7 @@
#include <stdint.h>
#include <sys/uio.h>
+#include "celix_log_helper.h"
#include "celix_api.h"
#ifdef __cplusplus
@@ -33,8 +34,10 @@ typedef struct pubsub_serializer_handler pubsub_serializer_handler_t; //opaque t
/**
- * @brief Creates a handler which track pubsub_custom_msg_serialization_service services with a (serialization.type=<serializerType)) filter.
- * If multiple pubsub_message_serialization_service for the same msg fqn (targeted.msg.fqn property) the highest ranking service will be used.
+ * @brief Creates a pubsub serializer handler which tracks pubsub_custom_msg_serialization_service services using the provided serialization type.
+ *
+ * If there are multiple pubsub_message_serialization_service services for the same msg fqn
+ * (targeted.msg.fqn property) the highest ranking service will be used.
*
* The message handler assumes (and checks) that all provided serialization services do not clash in message ids (so every msgId should have its own msgFqn)
* and that only one version for a message serialization is registered.
@@ -53,6 +56,29 @@ typedef struct pubsub_serializer_handler pubsub_serializer_handler_t; //opaque t
pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible);
/**
+ * @brief Creates a pubsub serializer handler which tracks pubsub_custom_msg_serialization_service services using the serialization type of the provided
+ * marker service.id
+ *
+ * If there are multiple pubsub_message_serialization_service services for the same msg fqn
+ * (targeted.msg.fqn property) the highest ranking service will be used.
+ *
+ * The message handler assumes (and checks) that all provided serialization services do not clash in message ids (so every msgId should have its own msgFqn)
+ * and that only one version for a message serialization is registered.
+ * This means that all bundles in a single celix container (a single process) should all use the same version of messages.
+ *
+ * If backwards compatibility is supported, when serialized message with a higher minor version when available in the serializer handler are used to
+ * deserialize. This could be supported for serialization like json.
+ * So when a json message of version 1.1.x with content {"field1":"value1", "field2":"value2"} is deserialized to a version 1.0 (which only has field1),
+ * the message can and will be deserialized
+ *
+ * @param ctx The bundle contest.
+ * @param pubsubSerializerMarkerSvcId The service.id of the pubsub_serialization_marker to use for deferring serializationType and backwardsCompatible.
+ * @param logHelper Optional log helper. If provided will be used to log issues whit creating a serializer handler for the provided marker svc id.
+ * @return A newly created pubsub serializer handler.
+ */
+pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(celix_bundle_context_t* ctx, long pubsubSerializerMarkerSvcId, celix_log_helper_t* logHelper);
+
+/**
* @brief destroy the pubsub_serializer_handler and free the used memory.
*/
void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler);
@@ -128,6 +154,28 @@ uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler,
*/
size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializer_handler_t* handler);
+
+/**
+ * @brief Get the serializer type for this hanlder.
+ *
+ * Valid as long as the handler exists.
+ */
+const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler);
+
+/**
+ * @brief Returns the major version part of a message version.
+ *
+ * Returns -1 if message cannot be found.
+ */
+int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
+ * @brief Returns the minor version part of a message version.
+ *
+ * Returns -1 if message cannot be found.
+ */
+int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
#ifdef __cplusplus
}
#endif
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
index c64e62e..034547f 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
@@ -22,22 +22,26 @@
typedef struct pubsub_serializer_provider pubsub_serializer_provider_t; //opaque type
-
-/**
- * Creates a handler which track bundles and registers pubsub_custom_msg_serialization_service
- * for every descriptor file found for json serialization.
- *
- * Added properties:
- * serialization.type=json
- * targeted.msg.fqn=<descriptor fqn>
- * targeted.msg.id=<msg fqn hash or msg id annotated in the descriptor>
- * targeted.msg.version=<msg version in descriptor if present> (optional)
- * service.ranking=0
- *
- * For descriptor found multiple times (same fqn and version) only the first one is registered
- *
- */
-pubsub_serializer_provider_t* pubsub_providerHandler_create(celix_bundle_context_t* ctx, const char *serializerType /* i.e. json */);
+ /**
+ *
+ * Creates a handler which track bundles and registers pubsub_custom_msg_serialization_service
+ * for every descriptor file found for json serialization.
+ *
+ * Added properties:
+ * serialization.type=json
+ * targeted.msg.fqn=<descriptor fqn>
+ * targeted.msg.id=<msg fqn hash or msg id annotated in the descriptor>
+ * targeted.msg.version=<msg version in descriptor if present> (optional)
+ * service.ranking=0
+ *
+ * For descriptor found multiple times (same fqn and version) only the first one is registered
+ *
+ * @param ctx
+ * @param serializerType
+ * @param backwardsCompatible
+ * @return
+ */
+pubsub_serializer_provider_t* pubsub_providerHandler_create(celix_bundle_context_t* ctx, const char *serializerType /* i.e. json */, bool backwardsCompatible);
void pubsub_providerHandler_destroy(pubsub_serializer_provider_t* handler);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
index b2a78df..6c90743 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
@@ -7,10 +7,10 @@
#include "celix_bundle.h"
-#include <pubsub_endpoint.h>
-#include <pubsub_admin.h>
-#include <pubsub_protocol.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_endpoint.h"
+#include "pubsub_protocol.h"
+#include "pubsub_admin.h"
+#include "pubsub_message_serialization_marker.h"
struct ps_utils_serializer_selection_data {
@@ -36,22 +36,19 @@ typedef struct ps_utils_retrieve_topic_properties_data {
static long getPSSerializer(celix_bundle_context_t *ctx, const char *requested_serializer) {
long svcId = -1L;
+ celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+
if (requested_serializer != NULL) {
char filter[512];
- int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, requested_serializer);
+ int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, requested_serializer);
if (written > 512) {
fprintf(stderr, "Cannot create serializer filter. need more than 512 char array\n");
} else {
- celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
- opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
opts.filter = filter;
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
} else {
- celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
- opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.ignoreServiceLanguage = true;
-
//note findService will automatically return the highest ranking service id
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index cb06107..7bfa5e0 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -22,9 +22,11 @@
#include <string.h>
+#include "pubsub_message_serialization_marker.h"
#include "celix_version.h"
#include "pubsub_message_serialization_service.h"
#include "celix_log_helper.h"
+#include "celix_constants.h"
#define L_DEBUG(...) \
celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
@@ -46,6 +48,7 @@ typedef struct pubsub_serialization_service_entry {
struct pubsub_serializer_handler {
celix_bundle_context_t* ctx;
+ char* serType;
bool backwardCompatible;
long serializationSvcTrackerId;
celix_log_helper_t *logHelper;
@@ -115,6 +118,7 @@ static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgI
pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
handler->ctx = ctx;
+ handler->serType = celix_utils_strdup(serializerType);
handler->backwardCompatible = backwardCompatible;
handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
@@ -137,6 +141,51 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_contex
return handler;
}
+struct pubsub_serializerHandler_callback_data {
+ celix_bundle_context_t* ctx;
+ celix_log_helper_t* logHelper;
+ pubsub_serializer_handler_t* handler;
+};
+
+static void pubsub_serializerHandler_useMarkerSvcCallback(void *handle, void* svc __attribute__((unused)), const celix_properties_t* props) {
+ struct pubsub_serializerHandler_callback_data* data = handle;
+ const char* serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, NULL);
+ bool backwardsCompatible = celix_properties_getAsBool(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE, false);
+ if (serType != NULL) {
+ data->handler = pubsub_serializerHandler_create(data->ctx, serType, backwardsCompatible);
+ } else if (data->logHelper != NULL) {
+ celix_logHelper_error(
+ data->logHelper,
+ "Cannot create serializer handler because service %s does not have a %s service property",
+ PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME,
+ PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY);
+ }
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(celix_bundle_context_t* ctx, long pubsubSerializerMarkerSvcId, celix_log_helper_t* logHelper) {
+ struct pubsub_serializerHandler_callback_data data;
+ memset(&data, 0, sizeof(data));
+ data.ctx = ctx;
+ data.logHelper = logHelper;
+
+ char filter[32];
+ snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, pubsubSerializerMarkerSvcId);
+ celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+ opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+ opts.filter.filter = filter;
+ opts.callbackHandle = &data;
+ opts.useWithProperties = pubsub_serializerHandler_useMarkerSvcCallback;
+ bool called = celix_bundleContext_useServiceWithOptions(ctx, &opts);
+ if (!called && logHelper != NULL) {
+ celix_logHelper_error(
+ logHelper,
+ "Cannot find %s service for service id %li",
+ PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME,
+ pubsubSerializerMarkerSvcId);
+ }
+ return data.handler;
+}
+
void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
if (handler != NULL) {
@@ -155,6 +204,7 @@ void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
}
hashMap_destroy(handler->serializationServices, false, false);
celix_logHelper_destroy(handler->logHelper);
+ free(handler->serType);
free(handler);
}
}
@@ -330,7 +380,6 @@ char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, u
char *msgFqn = celix_utils_strdup(getMsgFqn(handler, msgId));
celixThreadRwlock_unlock(&handler->lock);
return msgFqn;
-
}
uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler, const char* msgFqn) {
@@ -347,6 +396,29 @@ uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler,
celixThreadRwlock_unlock(&handler->lock);
return result;
}
+int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+ celixThreadRwlock_readLock(&handler->lock);
+ int major = -1;
+ celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ if (entries != NULL) {
+ pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+ major = celix_version_getMajor(entry->msgVersion);
+ }
+ celixThreadRwlock_unlock(&handler->lock);
+ return major;
+}
+
+int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+ celixThreadRwlock_readLock(&handler->lock);
+ int major = -1;
+ celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ if (entries != NULL) {
+ pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+ major = celix_version_getMinor(entry->msgVersion);
+ }
+ celixThreadRwlock_unlock(&handler->lock);
+ return major;
+}
size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializer_handler_t* handler) {
size_t count = 0;
@@ -358,4 +430,8 @@ size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializ
}
celixThreadRwlock_unlock(&handler->lock);
return count;
+}
+
+const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler) {
+ return handler->serType;
}
\ No newline at end of file