You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/06/04 15:05:46 UTC
[celix] branch master updated: Feature/use ser hander in psa (#345)
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/master by this push:
new 68300f3 Feature/use ser hander in psa (#345)
68300f3 is described below
commit 68300f308957187174ce6e4eeb5dc080bf1ed808
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Fri Jun 4 17:05:39 2021 +0200
Feature/use ser hander in psa (#345)
* Updates pubsub zmq v2 to use pubsub_serializer_handler_t
* Fixes the bswap_16 marcro used in OSX
---
.../pubsub/pubsub_admin_tcp/v2/src/psa_activator.c | 15 -
.../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c | 247 +++--------
.../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h | 4 -
.../v2/src/pubsub_tcp_topic_receiver.c | 70 +--
.../v2/src/pubsub_tcp_topic_receiver.h | 9 +-
.../v2/src/pubsub_tcp_topic_sender.c | 190 +++-----
.../v2/src/pubsub_tcp_topic_sender.h | 9 +-
.../pubsub_admin_websocket/v2/src/psa_activator.c | 11 -
.../v2/src/pubsub_websocket_admin.c | 259 +++--------
.../v2/src/pubsub_websocket_topic_receiver.c | 52 +--
.../v2/src/pubsub_websocket_topic_receiver.h | 5 +-
.../v2/src/pubsub_websocket_topic_sender.c | 103 ++---
.../v2/src/pubsub_websocket_topic_sender.h | 3 +-
.../pubsub/pubsub_admin_zmq/v2/src/psa_activator.c | 25 --
.../v2/src/pubsub_psa_zmq_constants.h | 4 -
.../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c | 279 +++---------
.../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h | 15 -
.../v2/src/pubsub_zmq_topic_receiver.c | 161 +++----
.../v2/src/pubsub_zmq_topic_receiver.h | 9 +-
.../v2/src/pubsub_zmq_topic_sender.c | 489 +++++++--------------
.../v2/src/pubsub_zmq_topic_sender.h | 9 +-
.../pubsub_serializer_avrobin/gtest/CMakeLists.txt | 2 +-
.../src/pubsub_avrobin_serialization_provider.c | 2 +-
.../pubsub_serializer_json/gtest/CMakeLists.txt | 2 +-
.../src/pubsub_json_serialization_provider.c | 2 +-
.../include/pubsub_message_serialization_marker.h | 60 +++
.../include/pubsub_message_serialization_service.h | 19 +-
bundles/pubsub/pubsub_utils/CMakeLists.txt | 3 +-
bundles/pubsub/pubsub_utils/gtest/CMakeLists.txt | 2 +-
.../gtest/src/PubSubMatchingTestSuite.cpp | 96 ++--
.../src/PubSubSerializationHandlerTestSuite.cc | 59 ++-
.../src/PubSubSerializationProviderTestSuite.cc | 9 +-
.../include/pubsub_serialization_provider.h | 15 +-
.../include/pubsub_serializer_handler.h | 103 ++++-
.../include/pubsub_serializer_provider.h | 50 ---
bundles/pubsub/pubsub_utils/src/pubsub_matching.c | 19 +-
.../src/pubsub_serialization_provider.c | 28 +-
.../pubsub_utils/src/pubsub_serializer_handler.c | 194 ++++++--
cmake/cmake_celix/Generic.cmake | 1 -
libs/utils/include/celix_byteswap.h | 2 +-
40 files changed, 1007 insertions(+), 1629 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
index ec9badb..ec3f853 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
@@ -20,7 +20,6 @@
#include <stdlib.h>
#include "celix_api.h"
-#include "pubsub_serializer.h"
#include "pubsub_protocol.h"
#include "celix_log_helper.h"
@@ -34,7 +33,6 @@ typedef struct psa_tcp_activator {
pubsub_tcp_admin_t *admin;
- long serializersTrackerId;
long protocolsTrackerId;
pubsub_admin_service_t adminService;
@@ -50,7 +48,6 @@ typedef struct psa_tcp_activator {
int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
act->adminSvcId = -1L;
act->cmdSvcId = -1L;
- act->serializersTrackerId = -1L;
act->protocolsTrackerId = -1L;
act->logHelper = celix_logHelper_create(ctx, "celix_psa_admin_tcp_v2");
@@ -58,17 +55,6 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
act->admin = pubsub_tcpAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
- //track serializers
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_tcpAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_tcpAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//track protocols
if (status == CELIX_SUCCESS) {
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
@@ -132,7 +118,6 @@ int psa_tcp_stop(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->adminSvcId);
celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
- celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
pubsub_tcpAdmin_destroy(act->admin);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
index 66c92ef..f5f200f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
@@ -55,11 +55,6 @@ struct pubsub_tcp_admin {
bool verbose;
struct {
- celix_thread_rwlock_t mutex;
- hash_map_t *map; //key = svcId, value = psa_tcp_serializer_entry_t*
- } serializers;
-
- struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = svcId, value = psa_tcp_protocol_entry_t*
} protocols;
@@ -79,6 +74,11 @@ struct pubsub_tcp_admin {
hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+ } serializationHandlers;
+
pubsub_tcp_endPointStore_t endpointStore;
};
@@ -101,11 +101,6 @@ static bool pubsub_tcpAdmin_endpointIsPublisher(const celix_properties_t *endpoi
return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
}
-static void pubsub_tcpAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
- const char** out = handle;
- *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) {
pubsub_tcp_admin_t *psa = calloc(1, sizeof(*psa));
psa->ctx = ctx;
@@ -120,11 +115,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_CONTROL_SCORE_KEY,
PSA_TCP_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadRwlock_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
celixThreadMutex_create(&psa->protocols.mutex, NULL);
psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
+
celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -137,6 +130,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo
celixThreadMutex_create(&psa->endpointStore.mutex, NULL);
psa->endpointStore.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+ psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
return psa;
}
@@ -177,13 +173,13 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- iter = hashMapIterator_construct(psa->serializers.map);
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ iter = hashMapIterator_construct(psa->serializationHandlers.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
- free(entry);
+ pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+ pubsub_serializerHandler_destroy(entry);
}
- celixThreadRwlock_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
celixThreadMutex_lock(&psa->protocols.mutex);
iter = hashMapIterator_construct(psa->protocols.map);
@@ -205,8 +201,9 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, false, false);
- celixThreadRwlock_destroy(&psa->serializers.mutex);
- hashMap_destroy(psa->serializers.map, false, false);
+ celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+ hashMap_destroy(psa->serializationHandlers.map, false, false);
+
celixThreadMutex_destroy(&psa->protocols.mutex);
hashMap_destroy(psa->protocols.map, false, false);
@@ -215,101 +212,6 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
free(psa);
}
-void pubsub_tcpAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_tcp_admin_t *psa = handle;
-
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
- const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
- const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
- if (serType == NULL || msgId == -1L || msgFqn == NULL) {
- L_INFO("[PSA_TCP_V2] Ignoring serializer service without one of the following properties: %s or %s or %s",
- PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
- L_INFO("[PSA_TCP_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
- return;
- }
- L_INFO("[PSA_TCP_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries == NULL) {
- typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
- hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
- L_INFO("[PSA_TCP_V2] typeEntries added %p %s", psa->serializers.map, serType);
- }
- psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
- if (entry == NULL) {
- entry = calloc(1, sizeof(psa_tcp_serializer_entry_t));
- entry->svc = svc;
- entry->fqn = celix_utils_strdup(msgFqn);
- entry->version = celix_utils_strdup(msgVersion);
- hashMap_put(typeEntries, (void*)msgId, entry);
- L_INFO("[PSA_TCP_V2] entry added");
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_tcp_admin_t *psa = handle;
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
- //remove serializer
- // 1) First find entry and
- // 2) loop and destroy all topic sender using the serializer and
- // 3) loop and destroy all topic receivers using the serializer
- // Note that it is the responsibility of the topology manager to create new topic senders/receivers
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries != NULL) {
- psa_tcp_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
- free((void*)entry->fqn);
- free((void*)entry->version);
- free(entry);
-
- // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
- if(hashMap_size(typeEntries) == 0) {
- hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_tcp_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
- if (sender != NULL && strncmp(serType, pubsub_tcpTopicSender_serializerType(sender), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_tcpTopicSender_destroy(sender);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter);
- pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry);
- if (receiver != NULL && strncmp(serType, pubsub_tcpTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(receiverEntry);
- hashMapIterator_remove(&iter);
- pubsub_tcpTopicReceiver_destroy(receiver);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
-}
-
void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
pubsub_tcp_admin_t *psa = handle;
@@ -421,30 +323,40 @@ pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *
return status;
}
+static pubsub_serializer_handler_t* pubsub_tcpAdmin_getSerializationHandler(pubsub_tcp_admin_t* psa, long msgSerializationMarkerSvcId) {
+ pubsub_serializer_handler_t* handler = NULL;
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+ if (handler == NULL) {
+ handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+ if (handler != NULL) {
+ hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+ }
+ }
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+ return handler;
+}
+
celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
const celix_properties_t *topicProperties, long serializerSvcId,
long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_tcp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
- //1) Create TopicSender
- //2) Store TopicSender
- //3) Connect existing endpoints
- //4) set outPublisherEndpoint
+ //1) Get serialization handler
+ //2) Create TopicSender
+ //3) Store TopicSender
+ //4) Connect existing endpoints
+ //5) set outPublisherEndpoint
+
+ pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic sender without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_tcpAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
@@ -453,14 +365,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
if (sender == NULL) {
psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
if (protEntry != NULL) {
- sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+ sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle, topicProperties,
&psa->endpointStore, protocolSvcId,
protEntry->svc);
}
if (sender != NULL) {
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *protType = protEntry->protType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+ pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
@@ -525,21 +438,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_tcp_admin_t *psa = handle;
- celix_properties_t *newEndpoint = NULL;
+ pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic receiver without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
+ celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_tcpAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
@@ -547,7 +454,8 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
if (receiver == NULL) {
psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
if (protEntry != NULL) {
- receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+ receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic,
+ handler, handle, topicProperties,
&psa->endpointStore, protocolSvcId, protEntry->svc);
} else {
L_ERROR("[PSA_TCP_V2] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
@@ -556,7 +464,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -795,53 +703,6 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
return status;
}
-
-
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
- pubsub_tcp_admin_t *psa = handle;
- psa_tcp_serializer_entry_t *serializer = NULL;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
- }
-
- return serializer;
-}
-
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer __attribute__((unused))) {
- pubsub_tcp_admin_t *psa = handle;
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
- pubsub_tcp_admin_t *psa = handle;
- int64_t id = -1L;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
- while(hashMapIterator_hasNext(&iterator)) {
- void *key = hashMapIterator_nextKey(&iterator);
- psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, key);
- L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn);
- if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
- id = (uint32_t)(uintptr_t)key;
- break;
- }
- }
- } else {
- L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn);
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
-
- return id;
-}
-
pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle) {
return NULL;
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
index 2440fbb..513a934 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
@@ -82,10 +82,6 @@ void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_propert
void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId);
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer);
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn);
-
pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle);
#endif //CELIX_PUBSUB_TCP_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 853e49d..a49ff23 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -56,7 +56,7 @@ struct pubsub_tcp_topic_receiver {
pubsub_protocol_service_t *protocol;
char *scope;
char *topic;
- char *serType;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
size_t timeout;
bool isPassive;
@@ -104,13 +104,12 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
static void processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime);
static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -119,7 +118,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
- receiver->serType = celix_utils_strdup(serType);
+ receiver->serializerHandler = serializerHandler;
receiver->admin = admin;
receiver->protocolSvcId = protocolSvcId;
receiver->protocol = protocol;
@@ -269,7 +268,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
}
}
hashMap_destroy(receiver->subscribers.map, false, false);
-
celixThreadMutex_unlock(&receiver->subscribers.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -299,7 +297,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
free(receiver->scope);
}
free(receiver->topic);
- free(receiver->serType);
}
free(receiver);
}
@@ -313,7 +310,7 @@ const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver)
}
const char *pubsub_tcpTopicReceiver_serializerType(pubsub_tcp_topic_receiver_t *receiver) {
- return receiver->serType;
+ return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver) {
@@ -460,47 +457,43 @@ static inline void
processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
//NOTE receiver->subscribers.mutex locked
- psa_tcp_serializer_entry_t *msgSer = pubsub_tcpAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, message->header.msgId);
- if(msgSer == NULL) {
- pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
- L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X. Received payload size is %u.", message->header.msgId, message->payload.length);
+ const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+ if (msgFqn == NULL) {
+ L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
return;
}
void *deSerializedMsg = NULL;
- celix_version_t *version = celix_version_createVersionFromString(msgSer->version);
- bool validVersion = psa_tcp_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion);
- celix_version_destroy(version);
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
if (validVersion) {
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = message->payload.payload;
deSerializeBuffer.iov_len = message->payload.length;
- celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
// When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver
if (message->payload.payload == deSerializedMsg) {
*releaseMsg = true;
}
- const char *msgType = msgSer->fqn;
if (status == CELIX_SUCCESS) {
uint32_t msgId = message->header.msgId;
celix_properties_t *metadata = message->metadata.metadata;
- bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata);
bool release = true;
if (cont) {
hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
while (hashMapIterator_hasNext(&iter)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle,msgType, msgId, deSerializedMsg, message->metadata.metadata, &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
+ svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
if (!release && hashMapIterator_hasNext(&iter)) {
//receive function has taken ownership and still more receive function to come ..
//deserialize again for new message
- status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+ status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
if (status != CELIX_SUCCESS) {
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
- msgType,
+ msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);
break;
@@ -509,19 +502,25 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
}
}
if (release) {
- msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deSerializedMsg);
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
}
if (message->metadata.metadata) {
celix_properties_destroy(message->metadata.metadata);
}
}
} else {
- L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgType,
+ L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
+ } else {
+ L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
+ msgFqn,
+ pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+ (int)message->header.msgMajorVersion,
+ (int)message->header.msgMinorVersion,
+ pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
+ pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
}
-
- pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
}
static void
@@ -664,24 +663,3 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
- bool check = false;
-
- if (major == 0 && minor == 0) {
- //no check
- return true;
- }
-
- int versionMajor;
- int versionMinor;
- if (msgVersion!=NULL) {
- version_getMajor(msgVersion, &versionMajor);
- version_getMinor(msgVersion, &versionMinor);
- if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
- check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
- }
- }
-
- return check;
-}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
index a7de405..35c14c6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
@@ -20,10 +20,11 @@
#ifndef CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
#define CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
-#include <pubsub_admin_metrics.h>
+#include "pubsub_admin_metrics.h"
#include "celix_bundle_context.h"
-#include <pubsub_protocol.h>
+#include "pubsub_protocol.h"
#include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_tcp_topic_receiver pubsub_tcp_topic_receiver_t;
@@ -31,7 +32,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -52,6 +53,4 @@ bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *sender);
void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
-pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver);
-
#endif //CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 847d538..2c8daf4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -57,16 +57,17 @@ struct pubsub_tcp_topic_sender {
pubsub_tcpHandler_t *socketHandler;
pubsub_tcpHandler_t *sharedSocketHandler;
pubsub_interceptors_handler_t *interceptorsHandler;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
char *scope;
char *topic;
char *url;
- char *serializerType;
bool isStatic;
bool isPassive;
bool verbose;
unsigned long send_delay;
+ int seqNr; //atomic
struct {
long svcId;
@@ -79,24 +80,10 @@ struct pubsub_tcp_topic_sender {
} boundedServices;
};
-typedef struct psa_tcp_send_msg_entry {
- uint32_t type; //msg type id (hash of fqn)
- const char *fqn;
- uint8_t major;
- uint8_t minor;
- unsigned char originUUID[16];
-// pubsub_msg_serializer_t *msgSer;
- pubsub_protocol_service_t *protSer;
- struct iovec *serializedIoVecOutput;
- size_t serializedIoVecOutputLen;
- unsigned int seqNr;
-} psa_tcp_send_msg_entry_t;
-
typedef struct psa_tcp_bounded_service_entry {
pubsub_tcp_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
- hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t
int getCount;
} psa_tcp_bounded_service_entry_t;
@@ -118,7 +105,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -127,7 +114,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serializerType = celix_utils_strdup(serializerType);
+ sender->serializerHandler = serializerHandler;
sender->admin = admin;
sender->protocolSvcId = protocolSvcId;
sender->protocol = protocol;
@@ -189,7 +176,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
- // Hhen passiveKey is specified, enable receive event for full-duplex connection using key.
+ // When passiveKey is specified, enable receive event for full-duplex connection using key.
// Because the topic receiver is already started, enable the receive event.
pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
@@ -273,18 +260,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
- if (msgEntry->serializedIoVecOutput)
- free(msgEntry->serializedIoVecOutput);
- msgEntry->serializedIoVecOutput = NULL;
- free(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
- free(entry);
- }
+ free(entry);
}
hashMap_destroy(sender->boundedServices.map, false, false);
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -301,7 +277,6 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
}
free(sender->topic);
free(sender->url);
- free(sender->serializerType);
free(sender);
}
}
@@ -319,7 +294,7 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
}
const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender) {
- return sender->serializerType;
+ return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
}
const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
@@ -337,15 +312,6 @@ bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
return sender->isPassive;
}
-static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
- psa_tcp_bounded_service_entry_t *entry = (psa_tcp_bounded_service_entry_t *) handle;
- int64_t rc = pubsub_tcpAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType);
- if(rc >= 0) {
- *msgTypeId = (unsigned int)rc;
- }
- return 0;
-}
-
static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties __attribute__((unused))) {
pubsub_tcp_topic_sender_t *sender = handle;
@@ -360,7 +326,6 @@ static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *req
entry->getCount = 1;
entry->parent = sender;
entry->bndId = bndId;
- entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_tcp_localMsgTypeIdForMsgType;
entry->service.send = psa_tcp_topicPublicationSend;
@@ -384,16 +349,6 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
if (entry != NULL && entry->getCount == 0) {
//free entry
hashMap_remove(sender->boundedServices.map, (void *) bndId);
-
- hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
- if (msgEntry->serializedIoVecOutput)
- free(msgEntry->serializedIoVecOutput);
- msgEntry->serializedIoVecOutput = NULL;
- free(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -402,91 +357,75 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
static int
psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
- int status = CELIX_SUCCESS;
psa_tcp_bounded_service_entry_t *bound = handle;
pubsub_tcp_topic_sender_t *sender = bound->parent;
-
- psa_tcp_serializer_entry_t *serializer = pubsub_tcpAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId);
-
- if(serializer == NULL) {
- pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
- L_WARN("[PSA_TCP_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- return CELIX_SERVICE_EXCEPTION;
+ const char* msgFqn;
+ int majorVersion;
+ int minorVersion;
+ celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion);
+
+ if (status != CELIX_SUCCESS) {
+ L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+ pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+ return status;
}
- psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
-
- if(entry == NULL) {
- entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
- entry->protSer = sender->protocol;
- entry->type = msgTypeId;
- entry->fqn = serializer->fqn;
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- entry->major = (uint8_t)celix_version_getMajor(version);
- entry->minor = (uint8_t)celix_version_getMinor(version);
- celix_version_destroy(version);
- uuid_copy(entry->originUUID, sender->fwUUID);
- hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+ bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+ if (!cont) {
+ L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+ return status;
}
- delay_first_send_for_late_joiners(sender);
-
size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
struct iovec *serializedIoVecOutput = NULL;
- status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
- entry->serializedIoVecOutputLen = MAX(serializedIoVecOutputLen, entry->serializedIoVecOutputLen);
+ status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn,
+ sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ return status;
+ }
- bool cont = false;
- if (status == CELIX_SUCCESS) /*ser ok*/ {
- cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata);
+ delay_first_send_for_late_joiners(sender);
+
+ pubsub_protocol_message_t message;
+ message.metadata.metadata = NULL;
+ message.payload.payload = NULL;
+ message.payload.length = 0;
+ if (serializedIoVecOutput) {
+ message.payload.payload = serializedIoVecOutput->iov_base;
+ message.payload.length = serializedIoVecOutput->iov_len;
}
- if (cont) {
- pubsub_protocol_message_t message;
- message.metadata.metadata = NULL;
- message.payload.payload = NULL;
- message.payload.length = 0;
- if (serializedIoVecOutput) {
- message.payload.payload = serializedIoVecOutput->iov_base;
- message.payload.length = serializedIoVecOutput->iov_len;
+ message.header.msgId = msgTypeId;
+ message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
+ message.header.msgMajorVersion = (uint16_t)majorVersion;
+ message.header.msgMinorVersion = (uint16_t)minorVersion;
+ message.header.payloadSize = 0;
+ message.header.payloadPartSize = 0;
+ message.header.payloadOffset = 0;
+ message.header.metadataSize = 0;
+ if (metadata != NULL) {
+ message.metadata.metadata = metadata;
+ }
+ bool sendOk = true;
+ {
+ int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
+ if (rc < 0) {
+ status = -1;
+ sendOk = false;
}
- message.header.msgId = msgTypeId;
- message.header.seqNr = entry->seqNr;
- message.header.msgMajorVersion = entry->major;
- message.header.msgMinorVersion = entry->minor;
- message.header.payloadSize = 0;
- message.header.payloadPartSize = 0;
- message.header.payloadOffset = 0;
- message.header.metadataSize = 0;
- if (metadata != NULL) {
- message.metadata.metadata = metadata;
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
+ if (message.metadata.metadata) {
+ celix_properties_destroy(message.metadata.metadata);
}
- entry->seqNr++;
- bool sendOk = true;
- {
- int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
- if (rc < 0) {
- status = -1;
- sendOk = false;
- }
- pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata);
- if (message.metadata.metadata) {
- celix_properties_destroy(message.metadata.metadata);
- }
- if (serializedIoVecOutput) {
- serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedIoVecOutput, serializedIoVecOutputLen);
- serializedIoVecOutput = NULL;
- }
- }
-
- if (!sendOk) {
- L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
+ if (serializedIoVecOutput) {
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
+ serializedIoVecOutput = NULL;
}
- } else {
- L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn,
- sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
}
- pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
+ if (!sendOk) {
+ L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
+ }
return status;
}
@@ -502,4 +441,11 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender)
usleep(sender->send_delay * 1000);
firstSend = false;
}
+}
+
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
+ psa_tcp_bounded_service_entry_t* entry = handle;
+ uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
+ *msgTypeId = (unsigned int)msgId;
+ return 0;
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
index dfb5014..57b13a6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
@@ -24,6 +24,7 @@
#include "pubsub_admin_metrics.h"
#include "pubsub_protocol.h"
#include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_tcp_topic_sender pubsub_tcp_topic_sender_t;
@@ -32,7 +33,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -46,12 +47,6 @@ const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender);
bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
-long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
-/**
- * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
- */
-pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender);
-
#endif //CELIX_PUBSUB_TCP_TOPIC_SENDER_H
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
index 159d8ed..33cc86f 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
@@ -53,17 +53,6 @@ int psa_websocket_start(psa_websocket_activator_t *act, celix_bundle_context_t *
act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
- //track serializers (only json)
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_websocketAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//register pubsub admin service
if (status == CELIX_SUCCESS) {
pubsub_admin_service_t *psaSvc = &act->adminService;
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
index 8950fda..2a103ee 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
@@ -18,18 +18,18 @@
*/
#include <memory.h>
-#include <pubsub_endpoint.h>
-#include <pubsub_serializer.h>
#include <ip_utils.h>
-#include <pubsub_message_serialization_service.h>
-#include <pubsub_matching.h>
+#include "pubsub_endpoint.h"
+#include "pubsub_serializer.h"
+#include "pubsub_matching.h"
#include "pubsub_utils.h"
#include "pubsub_websocket_admin.h"
#include "pubsub_psa_websocket_constants.h"
#include "pubsub_websocket_topic_sender.h"
#include "pubsub_websocket_topic_receiver.h"
#include "pubsub_websocket_common.h"
+#include "pubsub_serializer_handler.h"
#define L_DEBUG(...) \
celix_logHelper_log(psa->log, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
@@ -52,11 +52,6 @@ struct pubsub_websocket_admin {
bool verbose;
struct {
- celix_thread_rwlock_t mutex;
- hash_map_t *map; //key = svcId, value = psa_websocket_serializer_entry_t*
- } serializers;
-
- struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t*
} topicSenders;
@@ -71,17 +66,15 @@ struct pubsub_websocket_admin {
hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+ } serializationHandlers;
};
static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint);
static celix_status_t pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint);
-
-static void pubsub_websocketAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
- const char** out = handle;
- *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) {
pubsub_websocket_admin_t *psa = calloc(1, sizeof(*psa));
psa->ctx = ctx;
@@ -93,9 +86,6 @@ pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *c
psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE);
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadRwlock_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -105,6 +95,9 @@ pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *c
celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+ psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
return psa;
}
@@ -138,13 +131,13 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- iter = hashMapIterator_construct(psa->serializers.map);
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ iter = hashMapIterator_construct(psa->serializationHandlers.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_websocket_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
- free(entry);
+ pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+ pubsub_serializerHandler_destroy(entry);
}
- celixThreadRwlock_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
@@ -155,112 +148,12 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, false, false);
- celixThreadRwlock_destroy(&psa->serializers.mutex);
- hashMap_destroy(psa->serializers.map, false, false);
+ celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+ hashMap_destroy(psa->serializationHandlers.map, false, false);
free(psa);
}
-void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_websocket_admin_t *psa = handle;
-
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
- const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
- const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
- if (serType == NULL || msgId == -1L || msgFqn == NULL) {
- L_INFO("[PSA_WEBSOCKET_V2] Ignoring serializer service without one of the following properties: %s or %s or %s",
- PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
- L_INFO("[PSA_WEBSOCKET_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
- return;
- }
- L_INFO("[PSA_WEBSOCKET_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries == NULL) {
- typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
- hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
- L_INFO("[PSA_WEBSOCKET_V2] typeEntries added %p %s", psa->serializers.map, serType);
- }
- psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
- if (entry == NULL) {
- entry = calloc(1, sizeof(psa_websocket_serializer_entry_t));
- entry->svc = svc;
- entry->fqn = celix_utils_strdup(msgFqn);
- entry->version = celix_utils_strdup(msgVersion);
- hashMap_put(typeEntries, (void*)msgId, entry);
- L_INFO("[PSA_WEBSOCKET_V2] entry added");
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_websocket_admin_t *psa = handle;
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
- if(serType == NULL || msgId == -1) {
- L_ERROR("[PSA_WEBSOCKET_V2] Error removing serializer svc %s %i", serType, msgId);
- return;
- }
-
- //remove serializer
- // 1) First find entry and
- // 2) loop and destroy all topic sender using the serializer and
- // 3) loop and destroy all topic receivers using the serializer
- // Note that it is the responsibility of the topology manager to create new topic senders/receivers
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries != NULL) {
- psa_websocket_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
- free((void*)entry->fqn);
- free((void*)entry->version);
- free(entry);
-
- // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
- if(hashMap_size(typeEntries) == 0) {
- hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_websocket_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
- if (sender != NULL && strncmp(serType, pubsub_websocketTopicSender_serializerType(sender), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_websocketTopicSender_destroy(sender);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
- if (receiver != NULL && strncmp(serType, pubsub_websocketTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_websocketTopicReceiver_destroy(receiver);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
-}
-
celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_websocket_admin_t *psa = handle;
L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchPublisher");
@@ -297,38 +190,49 @@ celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const
return status;
}
+static pubsub_serializer_handler_t* pubsub_websocketAdmin_getSerializationHandler(pubsub_websocket_admin_t* psa, long msgSerializationMarkerSvcId) {
+ pubsub_serializer_handler_t* handler = NULL;
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+ if (handler == NULL) {
+ handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+ if (handler != NULL) {
+ hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+ }
+ }
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+ return handler;
+}
+
+
celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_websocket_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
- //1) Create TopicSender
- //2) Store TopicSender
- //3) Connect existing endpoints
- //4) set outPublisherEndpoint
+ //1) Get serialization handler
+ //2) Create TopicSender
+ //3) Store TopicSender
+ //4) Connect existing endpoints
+ //5) set outPublisherEndpoint
+
+ pubsub_serializer_handler_t* handler = pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic sender without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_websocketAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
celixThreadMutex_lock(&psa->topicSenders.mutex);
pubsub_websocket_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
if (sender == NULL) {
- sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, serType, psa);
+ sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, handler, psa);
if (sender != NULL) {
const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, NULL, NULL);
+ pubsub_serializerHandler_getSerializationType(handler), NULL, NULL);
//Set endpoint visibility to local because the http server handles discovery
celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
@@ -382,29 +286,25 @@ celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const cha
celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_websocket_admin_t *psa = handle;
-
celix_properties_t *newEndpoint = NULL;
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_websocketAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+ pubsub_serializer_handler_t* handler = pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic receiver without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
+
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
if (receiver == NULL) {
- receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serType, psa);
+ receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, handler, psa);
if (receiver != NULL) {
const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+ pubsub_serializerHandler_getSerializationType(handler), NULL, NULL);
//Set endpoint visibility to local because the http server handles discovery
celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
@@ -578,57 +478,11 @@ celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, cons
return status;
}
-psa_websocket_serializer_entry_t* pubsub_websocketAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
- pubsub_websocket_admin_t *psa = handle;
- psa_websocket_serializer_entry_t *serializer = NULL;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
- }
-
- return serializer;
-}
-
-void pubsub_websocketAdmin_releaseSerializer(void *handle, psa_websocket_serializer_entry_t* serializer) {
- pubsub_websocket_admin_t *psa = handle;
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_websocketAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
- pubsub_websocket_admin_t *psa = handle;
- int64_t id = -1L;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
- while(hashMapIterator_hasNext(&iterator)) {
- void *key = hashMapIterator_nextKey(&iterator);
- psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, key);
- L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn);
- if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
- id = (uint32_t)(uintptr_t)key;
- break;
- }
- }
- } else {
- L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn);
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
-
- return id;
-}
-
bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
pubsub_websocket_admin_t *psa = handle;
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -642,11 +496,9 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine
fprintf(out, " |- url = %s\n", url);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
fprintf(out, "\nTopic Receivers:\n");
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -677,7 +529,6 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine
celix_arrayList_destroy(unconnected);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
return true;
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index c93f078..ea997e9 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -68,10 +68,11 @@ struct pubsub_websocket_topic_receiver {
void *admin;
char *scope;
char *topic;
- char *serType;
char scopeAndTopicFilter[5];
char *uri;
+ pubsub_serializer_handler_t* serializerHandler;
+
celix_websocket_service_t sockSvc;
long svcId;
@@ -131,12 +132,12 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin) {
pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
- receiver->serType = celix_utils_strdup(serType);
+ receiver->serializerHandler = serializerHandler;
receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
receiver->admin = admin;
@@ -309,7 +310,6 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
free(receiver->uri);
free(receiver->scope);
free(receiver->topic);
- free(receiver->serType);
}
free(receiver);
}
@@ -325,7 +325,7 @@ const char* pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t
}
const char *pubsub_websocketTopicReceiver_serializerType(pubsub_websocket_topic_receiver_t *receiver) {
- return receiver->serType;
+ return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) {
@@ -451,58 +451,52 @@ static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, const char* payload, size_t payloadSize) {
//NOTE receiver->subscribers.mutex locked
- int64_t msgTypeId = pubsub_websocketAdmin_getMessageIdForMessageFqn(receiver->admin, receiver->serType, hdr->id);
-
- if(msgTypeId < 0) {
- L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
- return;
- }
-
- psa_websocket_serializer_entry_t *serializer = pubsub_websocketAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, msgTypeId);
+ uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, hdr->id);
- if(serializer == NULL) {
- pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
- L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ if (msgId == 0) {
+ L_WARN("Cannot find msg id for msg fqn %s", hdr->id);
return;
}
void *deSerializedMsg = NULL;
-
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- bool validVersion = psa_websocket_checkVersion(version, hdr);
- celix_version_destroy(version);
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, msgId, hdr->major, hdr->minor);
if (validVersion) {
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = (void *)payload;
deSerializeBuffer.iov_len = payloadSize;
- celix_status_t status = serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, &deSerializedMsg);
-
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
if (status == CELIX_SUCCESS) {
hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
bool release = true;
while (hashMapIterator_hasNext(&iter)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle, serializer->fqn, msgTypeId, deSerializedMsg, NULL, &release);
+ svc->receive(svc->handle, hdr->id, msgId, deSerializedMsg, NULL, &release);
if (!release && hashMapIterator_hasNext(&iter)) {
//receive function has taken ownership and still more receive function to come ..
//deserialize again for new message
- status = serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, &deSerializedMsg);
+ status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
break;
}
release = true;
}
}
if (release) {
- serializer->svc->freeDeserializedMsg(serializer->svc->handle, deSerializedMsg);
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deSerializedMsg);
}
} else {
- L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
+ } else {
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+ hdr->id,
+ pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+ (int)hdr->major,
+ (int)hdr->minor,
+ pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, msgId),
+ pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, msgId));
}
-
- pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
}
static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg, size_t msgSize) {
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
index 55d5255..f5edda5 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
@@ -20,8 +20,9 @@
#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
#define CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
-#include <pubsub_admin_metrics.h>
+#include "pubsub_admin_metrics.h"
#include "celix_bundle_context.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_websocket_topic_receiver pubsub_websocket_topic_receiver_t;
@@ -30,7 +31,7 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin);
void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver);
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index 98a1ad7..adc5ffe 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -52,10 +52,13 @@ struct pubsub_websocket_topic_sender {
void *admin;
char *scope;
char *topic;
- char *serType;
char scopeAndTopicFilter[5];
char *uri;
+ pubsub_serializer_handler_t* serializerHandler;
+
+ int seqNr; //atomic
+
celix_websocket_service_t websockSvc;
long websockSvcId;
struct mg_connection *sockConnection;
@@ -71,16 +74,10 @@ struct pubsub_websocket_topic_sender {
} boundedServices;
};
-typedef struct psa_websocket_send_msg_entry {
- pubsub_websocket_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
- uint32_t type; //msg type id (hash of fqn)
-} psa_websocket_send_msg_entry_t;
-
typedef struct psa_websocket_bounded_service_entry {
pubsub_websocket_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
- hash_map_t *msgEntries; //key = msg type id, value = psa_websocket_send_msg_entry_t
int getCount;
} psa_websocket_bounded_service_entry_t;
@@ -99,18 +96,12 @@ pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin) {
pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serType = celix_utils_strdup(serType);
-
- if(sender->serType == NULL) {
- L_ERROR("[PSA_WEBSOCKET_V2_TS] Error getting serType");
- free(sender);
- return NULL;
- }
+ sender->serializerHandler = serializerHandler;
psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
sender->uri = psa_websocket_createURI(scope, topic);
@@ -174,17 +165,7 @@ void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
- free(msgEntry);
-
- }
- hashMap_destroy(entry->msgEntries, false, false);
-
- free(entry);
- }
+ free(entry);
}
hashMap_destroy(sender->boundedServices.map, false, false);
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -198,7 +179,6 @@ void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
}
free(sender->topic);
free(sender->uri);
- free(sender->serType);
free(sender);
}
}
@@ -216,16 +196,17 @@ const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sen
}
const char* pubsub_websocketTopicSender_serializerType(pubsub_websocket_topic_sender_t *sender) {
- return sender->serType;
+ return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
}
static int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
psa_websocket_bounded_service_entry_t *entry = (psa_websocket_bounded_service_entry_t *) handle;
- int64_t rc = pubsub_websocketAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serType, msgType);
- if(rc >= 0) {
- *msgTypeId = (unsigned int)rc;
+ uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
+ if (msgId != 0) {
+ *msgTypeId = msgId;
+ return 0;
}
- return 0;
+ return -1;
}
static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
@@ -241,7 +222,6 @@ static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_
entry->getCount = 1;
entry->parent = sender;
entry->bndId = bndId;
- entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_websocket_localMsgTypeIdForMsgType;
entry->service.send = psa_websocket_topicPublicationSend;
@@ -264,60 +244,39 @@ static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle
if (entry != NULL && entry->getCount == 0) {
//free entry
hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
- hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter)) {
- psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
- free(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
-
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
}
static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
- int status = CELIX_SERVICE_EXCEPTION;
psa_websocket_bounded_service_entry_t *bound = handle;
pubsub_websocket_topic_sender_t *sender = bound->parent;
- psa_websocket_serializer_entry_t *serializer = pubsub_websocketAdmin_acquireSerializerForMessageId(sender->admin, sender->serType, msgTypeId);
-
- if(serializer == NULL) {
- pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
- L_WARN("[PSA_WEBSOCKET_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- return CELIX_SERVICE_EXCEPTION;
- }
-
- psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
-
- if(entry == NULL) {
- entry = calloc(1, sizeof(psa_websocket_send_msg_entry_t));
- entry->type = msgTypeId;
- entry->header.id = serializer->fqn;
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- entry->header.major = (uint8_t)celix_version_getMajor(version);
- entry->header.minor = (uint8_t)celix_version_getMinor(version);
- entry->header.seqNr = 0;
- celix_version_destroy(version);
- hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+ const char* msgFqn;
+ int majorVersion;
+ int minorVersion;
+ celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+ pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+ return status;
}
+
if (sender->sockConnection != NULL) {
delay_first_send_for_late_joiners(sender);
size_t serializedOutputLen = 0;
struct iovec* serializedOutput = NULL;
- status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedOutput, &serializedOutputLen);
-
+ status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
if (status == CELIX_SUCCESS /*ser ok*/) {
json_error_t jsError;
json_t *jsMsg = json_object();
- json_object_set_new_nocheck(jsMsg, "id", json_string(entry->header.id));
- json_object_set_new_nocheck(jsMsg, "major", json_integer(entry->header.major));
- json_object_set_new_nocheck(jsMsg, "minor", json_integer(entry->header.minor));
- uint32_t seqNr = __atomic_fetch_add(&entry->header.seqNr, 1, __ATOMIC_RELAXED);
+ json_object_set_new_nocheck(jsMsg, "id", json_string(msgFqn));
+ json_object_set_new_nocheck(jsMsg, "major", json_integer(majorVersion));
+ json_object_set_new_nocheck(jsMsg, "minor", json_integer(minorVersion));
+ uint32_t seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
json_object_set_new_nocheck(jsMsg, "seqNr", json_integer(seqNr));
json_t *jsData;
@@ -338,17 +297,15 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType
}
json_decref(jsMsg); //Decrease ref count means freeing the object
- serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedOutput, serializedOutputLen);
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen);
} else {
- L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for scope/topic %s/%s",
- entry->header.id, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %u for scope/topic %s/%s",
+ msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
}
} else { // when (sender->sockConnection == NULL) we dont have a client, but we do have a valid entry
status = CELIX_SUCCESS; // Not an error, just nothing to do
}
- pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
-
return status;
}
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
index 8f8cebf..6b42500 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
@@ -22,6 +22,7 @@
#include "celix_bundle_context.h"
#include "pubsub_admin_metrics.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_websocket_topic_sender pubsub_websocket_topic_sender_t;
@@ -30,7 +31,7 @@ pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin);
void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
index 014401e..7aaee4d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
@@ -41,9 +41,6 @@ typedef struct psa_zmq_activator {
pubsub_admin_service_t adminService;
long adminSvcId;
- pubsub_admin_metrics_service_t adminMetricsService;
- long adminMetricsSvcId;
-
celix_shell_command_t cmdSvc;
long cmdSvcId;
} psa_zmq_activator_t;
@@ -59,17 +56,6 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
act->admin = pubsub_zmqAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
- //track serializers
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_zmqAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_zmqAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//track protocols
if (status == CELIX_SUCCESS) {
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
@@ -101,16 +87,6 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
}
- if (status == CELIX_SUCCESS) {
- act->adminMetricsService.handle = act->admin;
- act->adminMetricsService.metrics = pubsub_zmqAdmin_metrics;
-
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
-
- act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
- }
-
//register shell command service
{
act->cmdSvc.handle = act->admin;
@@ -128,7 +104,6 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->adminSvcId);
celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
- celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
pubsub_zmqAdmin_destroy(act->admin);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
index c50006a..7f1d891 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
@@ -35,10 +35,6 @@
#define PSA_ZMQ_QOS_CONTROL_SCORE_KEY "PSA_ZMQ_QOS_CONTROL_SCORE"
#define PSA_ZMQ_DEFAULT_SCORE_KEY "PSA_ZMQ_DEFAULT_SCORE"
-
-#define PSA_ZMQ_METRICS_ENABLED "PSA_ZMQ_METRICS_ENABLED"
-#define PSA_ZMQ_DEFAULT_METRICS_ENABLED true
-
#define PSA_ZMQ_ZEROCOPY_ENABLED "PSA_ZMQ_ZEROCOPY_ENABLED"
#define PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED false
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index 5d396e5..f842f01 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -23,15 +23,17 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <ifaddrs.h>
-#include <pubsub_endpoint.h>
#include <czmq.h>
-#include <pubsub_serializer.h>
-#include <pubsub_protocol.h>
#include <ip_utils.h>
-#include <pubsub_matching.h>
-#include <pubsub_utils.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_serializer_handler.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_serializer.h"
+#include "pubsub_protocol.h"
+#include "pubsub_matching.h"
+#include "pubsub_utils.h"
+#include "pubsub_message_serialization_service.h"
+#include "pubsub_message_serialization_marker.h"
#include "pubsub_zmq_admin.h"
#include "pubsub_psa_zmq_constants.h"
#include "pubsub_zmq_topic_sender.h"
@@ -64,11 +66,6 @@ struct pubsub_zmq_admin {
bool verbose;
struct {
- celix_thread_rwlock_t mutex;
- hash_map_t *map; //key = svcId, value = psa_zmq_serializer_entry_t*
- } serializers;
-
- struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = svcId, value = psa_zmq_protocol_entry_t*
} protocols;
@@ -88,6 +85,10 @@ struct pubsub_zmq_admin {
hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+ } serializationHandlers;
};
typedef struct psa_zmq_protocol_entry {
@@ -99,6 +100,7 @@ typedef struct psa_zmq_protocol_entry {
static celix_status_t zmq_getIpAddress(const char* interface, char** ip);
static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId);
static bool pubsub_zmqAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
@@ -182,9 +184,6 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_lo
psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE);
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadRwlock_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
celixThreadMutex_create(&psa->protocols.mutex, NULL);
psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
@@ -197,6 +196,9 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_lo
celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+ psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
return psa;
}
@@ -231,15 +233,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- iter = hashMapIterator_construct(psa->serializers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_t *entry = hashMapIterator_nextValue(&iter);
- hashMap_destroy(entry, false, true);
- }
- hashMap_clear(psa->serializers.map, false, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
celixThreadMutex_lock(&psa->protocols.mutex);
iter = hashMapIterator_construct(psa->protocols.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -248,6 +241,14 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->protocols.mutex);
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ iter = hashMapIterator_construct(psa->serializationHandlers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+ pubsub_serializerHandler_destroy(entry);
+ }
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
@@ -257,12 +258,12 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, false, false);
- celixThreadRwlock_destroy(&psa->serializers.mutex);
- hashMap_destroy(psa->serializers.map, false, false);
-
celixThreadMutex_destroy(&psa->protocols.mutex);
hashMap_destroy(psa->protocols.map, false, false);
+ celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+ hashMap_destroy(psa->serializationHandlers.map, false, false);
+
if (psa->zmq_auth != NULL) {
zactor_destroy(&psa->zmq_auth);
}
@@ -272,92 +273,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
free(psa);
}
-void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_zmq_admin_t *psa = handle;
-
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
- const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
- const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
- if (serType == NULL || msgId == -1L || msgFqn == NULL) {
- L_INFO("[PSA_ZMQ] Ignoring serializer service without one of the following properties: %s or %s or %s",
- PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
- L_INFO("[PSA_ZMQ] Ignored serializer type %s msgId %li fqn %s\n", serType, msgId, msgFqn);
- return;
- }
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries == NULL) {
- typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
- hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
- }
- psa_zmq_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
- if (entry == NULL) {
- entry = calloc(1, sizeof(psa_zmq_serializer_entry_t));
- entry->svc = svc;
- entry->fqn = celix_utils_strdup(msgFqn);
- entry->version = celix_utils_strdup(msgVersion);
- hashMap_put(typeEntries, (void*)msgId, entry);
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_zmq_admin_t *psa = handle;
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries != NULL) {
- psa_zmq_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
- free((void*)entry->fqn);
- free((void*)entry->version);
- free(entry);
-
- // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
- if(hashMap_size(typeEntries) == 0) {
- hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_zmq_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
- if (sender != NULL && strncmp(serType, pubsub_zmqTopicSender_serializerType(sender), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_zmqTopicSender_destroy(sender);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter);
- pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry);
- if (receiver != NULL && strncmp(serType, pubsub_zmqTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(receiverEntry);
- hashMapIterator_remove(&iter);
- pubsub_zmqTopicReceiver_destroy(receiver);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
-}
-
void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
pubsub_zmq_admin_t *psa = handle;
@@ -462,20 +377,21 @@ celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix
return status;
}
-static void pubsub_zmqAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
- const char** out = handle;
- *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
-
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId __attribute__((unused)), long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_zmq_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
- //1) Create TopicSender
- //2) Store TopicSender
- //3) Connect existing endpoints
- //4) set outPublisherEndpoint
+ //1) Get serialization handler
+ //2) Create TopicSender
+ //3) Store TopicSender
+ //4) Connect existing endpoints
+ //5) set outPublisherEndpoint
+
+ pubsub_serializer_handler_t* handler = pubsub_zmqAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic sender without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
celix_properties_t *newEndpoint = NULL;
@@ -485,17 +401,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
}
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_zmqAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
pubsub_zmq_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
@@ -503,14 +408,14 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
if (sender == NULL) {
psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
if (protEntry != NULL) {
- sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle,
+ sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle,
protocolSvcId, protEntry->svc, psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
}
if (sender != NULL) {
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, protType, NULL);
+ pubsub_zmqTopicSender_serializerType(sender), protType, NULL);
celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
//if configured use a static discover url
@@ -546,10 +451,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
}
celixThreadMutex_unlock(&psa->protocols.mutex);
- if (sender != NULL && newEndpoint != NULL) {
- //TODO connect endpoints to sender, NOTE is this needed for a zmq topic sender?
- }
-
if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
*outPublisherEndpoint = newEndpoint;
}
@@ -572,7 +473,6 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
pubsub_zmq_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
celixThreadMutex_unlock(&psa->topicSenders.mutex);
free(mapKey);
- //TODO disconnect endpoints to sender. note is this needed for a zmq topic sender?
pubsub_zmqTopicSender_destroy(sender);
} else {
celixThreadMutex_unlock(&psa->topicSenders.mutex);
@@ -585,21 +485,15 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId __attribute__((unused)), long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_zmq_admin_t *psa = handle;
-
celix_properties_t *newEndpoint = NULL;
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ pubsub_serializer_handler_t* handler = pubsub_zmqAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic receiver without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_zmqAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -608,7 +502,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
if (receiver == NULL) {
psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
if (protEntry != NULL) {
- receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serType, handle, protocolSvcId, protEntry->svc);
+ receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, handler, handle, protocolSvcId, protEntry->svc);
} else {
L_ERROR("[PSA_ZMQ] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
}
@@ -616,7 +510,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+ pubsub_zmqTopicReceiver_serializerType(receiver), protType, NULL);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -856,72 +751,18 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE
return status;
}
-psa_zmq_serializer_entry_t* pubsub_zmqAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
- pubsub_zmq_admin_t *psa = handle;
- psa_zmq_serializer_entry_t *serializer = NULL;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
- }
-
- return serializer;
-}
-
-void pubsub_zmqAdmin_releaseSerializer(void *handle, psa_zmq_serializer_entry_t* serializer __attribute__((unused))) {
- pubsub_zmq_admin_t *psa = handle;
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_zmqAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
- pubsub_zmq_admin_t *psa = handle;
- int64_t id = -1L;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
- while(hashMapIterator_hasNext(&iterator)) {
- void *key = hashMapIterator_nextKey(&iterator);
- psa_zmq_serializer_entry_t *entry = hashMap_get(typeEntries, key);
- if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
- id = (uint32_t)(uintptr_t)key;
- break;
- }
+static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId) {
+ pubsub_serializer_handler_t* handler = NULL;
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+ if (handler == NULL) {
+ handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+ if (handler != NULL) {
+ hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
}
}
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- return id;
-}
-
-pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle) {
- pubsub_zmq_admin_t *psa = handle;
- pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->psaType, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", PUBSUB_ZMQ_ADMIN_TYPE);
- result->senders = celix_arrayList_create();
- result->receivers = celix_arrayList_create();
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_zmq_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
- pubsub_admin_sender_metrics_t *metrics = pubsub_zmqTopicSender_metrics(sender);
- celix_arrayList_add(result->senders, metrics);
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- pubsub_admin_receiver_metrics_t *metrics = pubsub_zmqTopicReceiver_metrics(receiver);
- celix_arrayList_add(result->receivers, metrics);
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
- return result;
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+ return handler;
}
#ifndef ANDROID
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
index f99ee08..6a8ba97 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
@@ -27,12 +27,6 @@
typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
-typedef struct psa_zmq_serializer_entry {
- const char *fqn;
- const char *version;
- pubsub_message_serialization_service_t *svc;
-} psa_zmq_serializer_entry_t;
-
pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper);
void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
@@ -49,19 +43,10 @@ celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *s
celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
-void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-
void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
-psa_zmq_serializer_entry_t* pubsub_zmqAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId);
-void pubsub_zmqAdmin_releaseSerializer(void *handle, psa_zmq_serializer_entry_t* serializer);
-int64_t pubsub_zmqAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn);
-
-pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle);
-
#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 0568834..90e9510 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -26,22 +26,22 @@
#if !defined(__APPLE__)
#include <sys/epoll.h>
#endif
+
#include <assert.h>
-#include <pubsub_endpoint.h>
#include <arpa/inet.h>
#include <czmq.h>
-#include <celix_log_helper.h>
-#include "pubsub_zmq_topic_receiver.h"
-#include "pubsub_psa_zmq_constants.h"
-
#include <uuid/uuid.h>
-#include <pubsub_admin_metrics.h>
-#include <pubsub_utils.h>
-#include <celix_api.h>
-#include <celix_version.h>
+#include "pubsub_endpoint.h"
+#include "celix_log_helper.h"
+#include "pubsub_zmq_topic_receiver.h"
+#include "pubsub_psa_zmq_constants.h"
+#include "pubsub_admin_metrics.h"
+#include "pubsub_utils.h"
+#include "celix_api.h"
+#include "celix_version.h"
+#include "pubsub_serializer_handler.h"
#include "pubsub_interceptors_handler.h"
-
#include "celix_utils_api.h"
#include "pubsub_zmq_admin.h"
@@ -64,13 +64,12 @@
struct pubsub_zmq_topic_receiver {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
- const char *serializerType;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
char *scope;
char *topic;
- bool metricsEnabled;
pubsub_interceptors_handler_t *interceptorsHandler;
@@ -118,7 +117,6 @@ static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t
static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
-static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -126,20 +124,19 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char* serializerType,
+ pubsub_serializer_handler_t* serHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *protocol) {
pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
- receiver->serializerType = celix_utils_strdup(serializerType);
+ receiver->serializerHandler = serHandler;
receiver->admin = admin;
receiver->protocolSvcId = protocolSvcId;
receiver->protocol = protocol;
receiver->scope = scope == NULL ? NULL : celix_utils_strdup(scope);
receiver->topic = celix_utils_strdup(topic);
- receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
@@ -312,7 +309,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
free(receiver->scope);
free(receiver->topic);
- free((void*)receiver->serializerType);
}
free(receiver);
}
@@ -325,7 +321,7 @@ const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver)
}
const char* pubsub_zmqTopicReceiver_serializerType(pubsub_zmq_topic_receiver_t *receiver) {
- return receiver->serializerType;
+ return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
long pubsub_zmqTopicReceiver_protocolSvcId(pubsub_zmq_topic_receiver_t *receiver) {
@@ -444,70 +440,57 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
}
static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
- //NOTE receiver->subscribers.mutex locked
- bool monitor = receiver->metricsEnabled;
- psa_zmq_serializer_entry_t *msgSer = pubsub_zmqAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serializerType, message->header.msgId);
-
- //monitoring
- struct timespec beginSer;
- struct timespec endSer;
- int updateReceiveCount = 0;
- int updateSerError = 0;
-
- if (msgSer!= NULL) {
- void *deserializedMsg = NULL;
- const char *msgFqn = msgSer->fqn;
- celix_version_t *version = celix_version_createVersionFromString(msgSer->version);
- bool validVersion = psa_zmq_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion);
- celix_version_destroy(version);
- if (validVersion) {
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &beginSer);
- }
- struct iovec deSerializeBuffer;
- deSerializeBuffer.iov_base = message->payload.payload;
- deSerializeBuffer.iov_len = message->payload.length;
- celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 0, &deserializedMsg);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &endSer);
- }
- if (status == CELIX_SUCCESS) {
- uint32_t msgId = message->header.msgId;
- celix_properties_t *metadata = message->metadata.metadata;
- bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
- bool release = true;
- if (cont) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
- while (hashMapIterator_hasNext(&iter2)) {
- pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
- svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
- if (!release && hashMapIterator_hasNext(&iter2)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
- status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 0, &deserializedMsg);
- if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
- break;
- }
- release = true;
+ const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+ if (msgFqn == NULL) {
+ L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
+ return;
+ }
+
+ void *deserializedMsg = NULL;
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+ if (validVersion) {
+ struct iovec deSerializeBuffer;
+ deSerializeBuffer.iov_base = message->payload.payload;
+ deSerializeBuffer.iov_len = message->payload.length;
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+ if (status == CELIX_SUCCESS) {
+ uint32_t msgId = message->header.msgId;
+ celix_properties_t *metadata = message->metadata.metadata;
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
+ bool release = true;
+ if (cont) {
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
+ if (!release && hashMapIterator_hasNext(&iter2)) {
+ //receive function has taken ownership and still more receive function to come ..
+ //deserialize again for new message
+ status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ break;
}
+ release = true;
}
- if (release) {
- msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deserializedMsg);
- }
- updateReceiveCount += 1;
}
- } else {
- updateSerError += 1;
- L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ if (release) {
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
+ }
}
+ } else {
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
} else {
- L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId);
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
+ msgFqn,
+ pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+ (int)message->header.msgMajorVersion,
+ (int)message->header.msgMinorVersion,
+ pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
+ pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
}
-
- pubsub_zmqAdmin_releaseSerializer(receiver->admin, msgSer);
}
static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
@@ -614,13 +597,6 @@ static void* psa_zmq_recvThread(void * data) {
return NULL;
}
-pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver) {
- pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
- snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
- return result;
-}
-
static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -744,24 +720,3 @@ static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const
ts->zmq_pub_cert = pub_cert;
#endif
}
-
-static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
- bool check=false;
-
- if (major == 0 && minor == 0) {
- //no check
- return true;
- }
-
- int versionMajor;
- int versionMinor;
- if (msgVersion!=NULL) {
- version_getMajor(msgVersion, &versionMajor);
- version_getMinor(msgVersion, &versionMinor);
- if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
- check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
- }
- }
-
- return check;
-}
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
index 67ba6d4..f47ce37 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
@@ -20,9 +20,10 @@
#ifndef CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
#define CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
-#include <pubsub_admin_metrics.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_admin_metrics.h"
+#include "pubsub_message_serialization_service.h"
#include "celix_bundle_context.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_zmq_topic_receiver pubsub_zmq_topic_receiver_t;
@@ -31,7 +32,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char* serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *protocol);
@@ -48,7 +49,5 @@ void pubsub_zmqTopicReceiver_connectTo(pubsub_zmq_topic_receiver_t *receiver, co
void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url);
-pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver);
-
#endif //CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index f969f4c..7d9e750 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -21,17 +21,18 @@
#include <pubsub_protocol.h>
#include <stdlib.h>
#include <memory.h>
-#include <pubsub_constants.h>
-#include <pubsub/publisher.h>
-#include <utils.h>
-#include <zconf.h>
#include <arpa/inet.h>
#include <czmq.h>
-#include <celix_log_helper.h>
+#include <uuid/uuid.h>
+
+#include "celix_utils.h"
+#include "pubsub_constants.h"
+#include "pubsub/publisher.h"
+#include "celix_log_helper.h"
#include "pubsub_zmq_topic_sender.h"
#include "pubsub_psa_zmq_constants.h"
-#include <uuid/uuid.h>
-#include <celix_version.h>
+#include "celix_version.h"
+#include "pubsub_serializer_handler.h"
#include "celix_constants.h"
#include "pubsub_interceptors_handler.h"
#include "pubsub_zmq_admin.h"
@@ -51,12 +52,11 @@
struct pubsub_zmq_topic_sender {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
- const char *serializerType;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
uuid_t fwUUID;
- bool metricsEnabled;
bool zeroCopyEnabled;
pubsub_interceptors_handler_t *interceptorsHandler;
@@ -66,6 +66,18 @@ struct pubsub_zmq_topic_sender {
char *url;
bool isStatic;
+ long seqNr; //atomic
+
+ struct {
+ bool dataLock; //atomic, protects below and protect zmq internal data
+ void *headerBuffer;
+ size_t headerBufferSize;
+ void *metadataBuffer;
+ size_t metadataBufferSize;
+ void *footerBuffer;
+ size_t footerBufferSize;
+ } zmqBuffers;
+
struct {
zsock_t *socket;
zcert_t *cert;
@@ -82,42 +94,16 @@ struct pubsub_zmq_topic_sender {
} boundedServices;
};
-typedef struct psa_zmq_send_msg_entry {
- uint32_t type; //msg type id (hash of fqn)
- const char *fqn;
- uint8_t major;
- uint8_t minor;
- unsigned char originUUID[16];
- pubsub_protocol_service_t *protSer;
- unsigned int seqNr;
- void *headerBuffer;
- size_t headerBufferSize;
- void *metadataBuffer;
- size_t metadataBufferSize;
- void *footerBuffer;
- size_t footerBufferSize;
- bool dataLocked; // protected ZMQ functions and seqNr
- struct {
- celix_thread_mutex_t mutex; //protects entries in struct
- unsigned long nrOfMessagesSend;
- unsigned long nrOfMessagesSendFailed;
- unsigned long nrOfSerializationErrors;
- struct timespec lastMessageSend;
- double averageTimeBetweenMessagesInSeconds;
- double averageSerializationTimeInSeconds;
- } metrics;
-} psa_zmq_send_msg_entry_t;
-
typedef struct psa_zmq_bounded_service_entry {
pubsub_zmq_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
- hash_map_t *msgEntries; //key = msg type id, value = psa_zmq_send_msg_entry_t
int getCount;
} psa_zmq_bounded_service_entry_t;
typedef struct psa_zmq_zerocopy_free_entry {
- psa_zmq_serializer_entry_t *msgSer;
+ uint32_t msgId;
+ pubsub_serializer_handler_t *serHandler;
struct iovec *serializedOutput;
size_t serializedOutputLen;
} psa_zmq_zerocopy_free_entry;
@@ -135,7 +121,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char* serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *prot,
@@ -146,7 +132,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serializerType = celix_utils_strdup(serializerType);
+ sender->serializerHandler = serializerHandler;
sender->admin = admin;
sender->protocolSvcId = protocolSvcId;
sender->protocol = prot;
@@ -154,7 +140,6 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
}
- sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
@@ -305,20 +290,6 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
return sender;
}
-static void pubsub_zmqTopicSender_destroyEntry(psa_zmq_send_msg_entry_t *msgEntry) {
- celixThreadMutex_destroy(&msgEntry->metrics.mutex);
- if(msgEntry->headerBuffer != NULL) {
- free(msgEntry->headerBuffer);
- }
- if(msgEntry->metadataBuffer != NULL) {
- free(msgEntry->metadataBuffer);
- }
- if(msgEntry->footerBuffer != NULL) {
- free(msgEntry->footerBuffer);
- }
- free(msgEntry);
-}
-
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
if (sender != NULL) {
celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
@@ -326,21 +297,7 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
zsock_destroy(&sender->zmq.socket);
celixThreadMutex_lock(&sender->boundedServices.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_zmq_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
- pubsub_zmqTopicSender_destroyEntry(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
-
- free(entry);
- }
- }
- hashMap_destroy(sender->boundedServices.map, false, false);
+ hashMap_destroy(sender->boundedServices.map, false, true);
celixThreadMutex_unlock(&sender->boundedServices.mutex);
celixThreadMutex_destroy(&sender->boundedServices.mutex);
@@ -352,13 +309,15 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
}
free(sender->topic);
free(sender->url);
- free((void*)sender->serializerType);
+ free(sender->zmqBuffers.headerBuffer);
+ free(sender->zmqBuffers.metadataBuffer);
+ free(sender->zmqBuffers.footerBuffer);
free(sender);
}
}
const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
- return sender->serializerType;
+ return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
}
long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) {
@@ -383,10 +342,7 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender) {
static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t *) handle;
- int64_t rc = pubsub_zmqAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType);
- if(rc >= 0) {
- *msgTypeId = (unsigned int)rc;
- }
+ *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
return 0;
}
@@ -403,7 +359,6 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
entry->getCount = 1;
entry->parent = sender;
entry->bndId = bndId;
- entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
entry->service.send = psa_zmq_topicPublicationSend;
@@ -426,297 +381,175 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
if (entry != NULL && entry->getCount == 0) {
//free entry
hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
- hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
- pubsub_zmqTopicSender_destroyEntry(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
}
-pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender) {
- pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
- snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- size_t count = 0;
- hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- hashMapIterator_nextValue(&iter2);
- count += 1;
- }
- }
-
- result->msgMetrics = calloc(count, sizeof(*result));
-
- iter = hashMapIterator_construct(sender->boundedServices.map);
- int i = 0;
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_zmq_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2);
- celixThreadMutex_lock(&mEntry->metrics.mutex);
- result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend;
- result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
- result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
- result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
- result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
- result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
- result->msgMetrics[i].bndId = entry->bndId;
- result->msgMetrics[i].typeId = mEntry->type;
- snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->fqn);
- i += 1;
- celixThreadMutex_unlock(&mEntry->metrics.mutex);
- }
- }
-
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
- result->nrOfmsgMetrics = (int)count;
- return result;
-}
-
static void psa_zmq_freeMsg(void *msg, void *hint) {
psa_zmq_zerocopy_free_entry *entry = hint;
- entry->msgSer->svc->freeSerializedMsg(entry->msgSer->svc->handle, entry->serializedOutput, entry->serializedOutputLen);
+ pubsub_serializerHandler_freeSerializedMsg(entry->serHandler, entry->msgId, entry->serializedOutput, entry->serializedOutputLen);
free(entry);
}
-static void psa_zmq_unlockData(void *unused __attribute__((unused)), void *hint) {
- psa_zmq_send_msg_entry_t *entry = hint;
- __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
-}
-
static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
- int status = CELIX_SUCCESS;
psa_zmq_bounded_service_entry_t *bound = handle;
pubsub_zmq_topic_sender_t *sender = bound->parent;
- bool monitor = sender->metricsEnabled;
-
- psa_zmq_serializer_entry_t *serializer = pubsub_zmqAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId);
- if(serializer == NULL) {
- pubsub_zmqAdmin_releaseSerializer(sender->admin, serializer);
- L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- return CELIX_SERVICE_EXCEPTION;
+ const char* msgFqn;
+ int majorVersion;
+ int minorversion;
+ celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorversion);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+ pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+ return status;
}
- psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId));
-
- //metrics updates
- struct timespec sendTime = { 0, 0 };
- struct timespec serializationStart;
- struct timespec serializationEnd;
- //int unknownMessageCountUpdate = 0;
- int sendErrorUpdate = 0;
- int serializationErrorUpdate = 0;
- int sendCountUpdate = 0;
-
- if(entry == NULL) {
- entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t));
- entry->protSer = sender->protocol;
- entry->type = msgTypeId;
- entry->fqn = serializer->fqn;
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- entry->major = (uint8_t)celix_version_getMajor(version);
- entry->minor = (uint8_t)celix_version_getMinor(version);
- celix_version_destroy(version);
- uuid_copy(entry->originUUID, sender->fwUUID);
- celixThreadMutex_create(&entry->metrics.mutex, NULL);
- hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+ bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+ if (!cont) {
+ L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+ return status;
}
- delay_first_send_for_late_joiners(sender);
-
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &serializationStart);
- }
- size_t serializedOutputLen = 0;
- struct iovec *serializedOutput = NULL;
- status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedOutput, &serializedOutputLen);
+ size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
+ struct iovec *serializedIoVecOutput = NULL;
+ status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &serializationEnd);
+ if (status != CELIX_SUCCESS /*serialization not ok*/) {
+ L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ return status;
}
- if (status == CELIX_SUCCESS /*ser ok*/) {
- // Some ZMQ functions are not thread-safe, but this atomic compare exchange ensures one access at a time.
- bool expected = false;
- while(!__atomic_compare_exchange_n(&entry->dataLocked, &expected, true, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
- expected = false;
- usleep(500);
- }
-
- bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata);
- if (cont) {
-
- pubsub_protocol_message_t message;
- message.payload.payload = serializedOutput->iov_base;
- message.payload.length = serializedOutput->iov_len;
-
- void *payloadData = NULL;
- size_t payloadLength = 0;
- entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
-
- if (metadata != NULL) {
- message.metadata.metadata = metadata;
- entry->protSer->encodeMetadata(entry->protSer->handle, &message, &entry->metadataBuffer, &entry->metadataBufferSize);
- } else {
- message.metadata.metadata = NULL;
- }
-
- entry->protSer->encodeFooter(entry->protSer->handle, &message, &entry->footerBuffer, &entry->footerBufferSize);
-
- message.header.msgId = msgTypeId;
- message.header.seqNr = entry->seqNr;
- message.header.msgMajorVersion = 0;
- message.header.msgMinorVersion = 0;
- message.header.payloadSize = payloadLength;
- message.header.metadataSize = entry->metadataBufferSize;
- message.header.payloadPartSize = payloadLength;
- message.header.payloadOffset = 0;
- message.header.isLastSegment = 1;
- message.header.convertEndianess = 0;
-
- // increase seqNr
- entry->seqNr++;
-
- entry->protSer->encodeHeader(entry->protSer->handle, &message, &entry->headerBuffer, &entry->headerBufferSize);
-
- errno = 0;
- bool sendOk;
-
- if (bound->parent->zeroCopyEnabled) {
-
- zmq_msg_t msg1; // Header
- zmq_msg_t msg2; // Payload
- zmq_msg_t msg3; // Metadata
- zmq_msg_t msg4; // Footer
- void *socket = zsock_resolve(sender->zmq.socket);
- psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
- freeMsgEntry->msgSer = serializer;
- freeMsgEntry->serializedOutput = serializedOutput;
- freeMsgEntry->serializedOutputLen = serializedOutputLen;
-
- zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize, psa_zmq_unlockData, entry);
- //send header
- int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
- if (rc == -1) {
- L_WARN("Error sending header msg. %s", strerror(errno));
- zmq_msg_close(&msg1);
- }
-
- //send Payload
- if (rc > 0) {
- int flag = ((entry->metadataBufferSize > 0) || (entry->footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
- zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
- rc = zmq_msg_send(&msg2, socket, flag);
- if (rc == -1) {
- L_WARN("Error sending payload msg. %s", strerror(errno));
- zmq_msg_close(&msg2);
- }
- }
+ delay_first_send_for_late_joiners(sender);
- //send MetaData
- if (rc > 0 && entry->metadataBufferSize > 0) {
- int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
- zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize, NULL, NULL);
- rc = zmq_msg_send(&msg3, socket, flag);
- if (rc == -1) {
- L_WARN("Error sending metadata msg. %s", strerror(errno));
- zmq_msg_close(&msg3);
- }
- }
+ // Some ZMQ functions are not thread-safe, but this atomic compare exchange ensures one access at a time.
+ // Also protect sender->zmqBuffers (header, meta and footer)
+ bool expected = false;
+ while(!__atomic_compare_exchange_n(&sender->zmqBuffers.dataLock, &expected, true, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
+ expected = false;
+ usleep(5);
+ }
- //send Footer
- if (rc > 0 && entry->footerBufferSize > 0) {
- zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize, NULL, NULL);
- rc = zmq_msg_send(&msg4, socket, 0);
- if (rc == -1) {
- L_WARN("Error sending footer msg. %s", strerror(errno));
- zmq_msg_close(&msg4);
- }
- }
+ pubsub_protocol_message_t message;
+ message.payload.payload = serializedIoVecOutput->iov_base;
+ message.payload.length = serializedIoVecOutput->iov_len;
- sendOk = rc > 0;
- } else {
- //no zero copy
- zmsg_t *msg = zmsg_new();
- zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize);
- zmsg_addmem(msg, payloadData, payloadLength);
- if (entry->metadataBufferSize > 0) {
- zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize);
- }
- if (entry->footerBufferSize > 0) {
- zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize);
- }
- int rc = zmsg_send(&msg, sender->zmq.socket);
- sendOk = rc == 0;
+ void *payloadData = NULL;
+ size_t payloadLength = 0;
+ sender->protocol->encodePayload(sender->protocol->handle, &message, &payloadData, &payloadLength);
- if (!sendOk) {
- zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
- }
+ if (metadata != NULL) {
+ message.metadata.metadata = metadata;
+ sender->protocol->encodeMetadata(sender->protocol->handle, &message, &sender->zmqBuffers.metadataBuffer, &sender->zmqBuffers.metadataBufferSize);
+ } else {
+ message.metadata.metadata = NULL;
+ }
- // Note: serialized Payload is deleted by serializer
- if (payloadData && (payloadData != message.payload.payload)) {
- free(payloadData);
- }
+ sender->protocol->encodeFooter(sender->protocol->handle, &message, &sender->zmqBuffers.footerBuffer, &sender->zmqBuffers.footerBufferSize);
+
+ message.header.msgId = msgTypeId;
+ message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
+ message.header.msgMajorVersion = majorVersion;
+ message.header.msgMinorVersion = minorversion;
+ message.header.payloadSize = payloadLength;
+ message.header.metadataSize = sender->zmqBuffers.metadataBufferSize;
+ message.header.payloadPartSize = payloadLength;
+ message.header.payloadOffset = 0;
+ message.header.isLastSegment = 1;
+ message.header.convertEndianess = 0;
+
+ sender->protocol->encodeHeader(sender->protocol->handle, &message, &sender->zmqBuffers.headerBuffer, &sender->zmqBuffers.headerBufferSize);
+
+ errno = 0;
+ bool sendOk;
+ if (bound->parent->zeroCopyEnabled) {
+ zmq_msg_t msg1; // Header
+ zmq_msg_t msg2; // Payload
+ zmq_msg_t msg3; // Metadata
+ zmq_msg_t msg4; // Footer
+ void *socket = zsock_resolve(sender->zmq.socket);
+ psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry)); //NOTE should be improved. Not really zero copy
+ freeMsgEntry->serHandler = sender->serializerHandler;
+ freeMsgEntry->msgId = msgTypeId;
+ freeMsgEntry->serializedOutput = serializedIoVecOutput;
+ freeMsgEntry->serializedOutputLen = serializedIoVecOutputLen;
+
+ zmq_msg_init_data(&msg1, sender->zmqBuffers.headerBuffer, sender->zmqBuffers.headerBufferSize, NULL, NULL);
+ //send header
+ int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+ if (rc == -1) {
+ L_WARN("Error sending header msg. %s", strerror(errno));
+ zmq_msg_close(&msg1);
+ }
- __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
+ //send Payload
+ if (rc > 0) {
+ int flag = ((sender->zmqBuffers.metadataBufferSize > 0) || (sender->zmqBuffers.footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
+ zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+ rc = zmq_msg_send(&msg2, socket, flag);
+ if (rc == -1) {
+ L_WARN("Error sending payload msg. %s", strerror(errno));
+ zmq_msg_close(&msg2);
}
- pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata);
+ }
- if (message.metadata.metadata) {
- celix_properties_destroy(message.metadata.metadata);
- }
- if (!bound->parent->zeroCopyEnabled && serializedOutput) {
- serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedOutput, serializedOutputLen);
+ //send MetaData
+ if (rc > 0 && sender->zmqBuffers.metadataBufferSize > 0) {
+ int flag = (sender->zmqBuffers.footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
+ zmq_msg_init_data(&msg3, sender->zmqBuffers.metadataBuffer, sender->zmqBuffers.metadataBufferSize, NULL, NULL);
+ rc = zmq_msg_send(&msg3, socket, flag);
+ if (rc == -1) {
+ L_WARN("Error sending metadata msg. %s", strerror(errno));
+ zmq_msg_close(&msg3);
}
+ }
- if (sendOk) {
- sendCountUpdate = 1;
- } else {
- sendErrorUpdate = 1;
- L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+ //send Footer
+ if (rc > 0 && sender->zmqBuffers.footerBufferSize > 0) {
+ zmq_msg_init_data(&msg4, sender->zmqBuffers.footerBuffer, sender->zmqBuffers.footerBufferSize, NULL, NULL);
+ rc = zmq_msg_send(&msg4, socket, 0);
+ if (rc == -1) {
+ L_WARN("Error sending footer msg. %s", strerror(errno));
+ zmq_msg_close(&msg4);
}
- } else {
- L_WARN("no continue");
}
+ sendOk = rc > 0;
} else {
- serializationErrorUpdate = 1;
- L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- }
-
- pubsub_zmqAdmin_releaseSerializer(sender->admin, serializer);
-
- if (monitor && entry != NULL) {
- celixThreadMutex_lock(&entry->metrics.mutex);
+ //no zero copy
+ zmsg_t *msg = zmsg_new();
+ zmsg_addmem(msg, sender->zmqBuffers.headerBuffer, sender->zmqBuffers.headerBufferSize);
+ zmsg_addmem(msg, payloadData, payloadLength);
+ if (sender->zmqBuffers.metadataBufferSize > 0) {
+ zmsg_addmem(msg, sender->zmqBuffers.metadataBuffer, sender->zmqBuffers.metadataBufferSize);
+ }
+ if (sender->zmqBuffers.footerBufferSize > 0) {
+ zmsg_addmem(msg, sender->zmqBuffers.footerBuffer, sender->zmqBuffers.footerBufferSize);
+ }
+ int rc = zmsg_send(&msg, sender->zmq.socket);
+ sendOk = rc == 0;
- long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed;
- double diff = celix_difftime(&serializationStart, &serializationEnd);
- double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n+1);
- entry->metrics.averageSerializationTimeInSeconds = average;
+ if (!sendOk) {
+ zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+ }
- if (entry->metrics.nrOfMessagesSend > 2) {
- diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
- n = entry->metrics.nrOfMessagesSend;
- average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n+1);
- entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+ // Note: serialized Payload is deleted by serializer
+ if (payloadData && (payloadData != message.payload.payload)) {
+ free(payloadData);
}
+ }
+ __atomic_store_n(&sender->zmqBuffers.dataLock, false, __ATOMIC_RELEASE);
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
- entry->metrics.lastMessageSend = sendTime;
- entry->metrics.nrOfMessagesSend += sendCountUpdate;
- entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
- entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
+ if (message.metadata.metadata) {
+ celix_properties_destroy(message.metadata.metadata);
+ }
+ if (!bound->parent->zeroCopyEnabled && serializedIoVecOutput) {
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
+ }
- celixThreadMutex_unlock(&entry->metrics.mutex);
+ if (!sendOk) {
+ L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
}
return status;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
index 744f653..bb49a2a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
@@ -20,7 +20,7 @@
#ifndef CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
#define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_serializer_handler.h"
#include "celix_bundle_context.h"
#include "pubsub_admin_metrics.h"
#include "celix_log_helper.h"
@@ -32,7 +32,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char* serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *prot,
@@ -40,6 +40,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
const char *staticBindUrl,
unsigned int basePort,
unsigned int maxPort);
+
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);
const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender);
@@ -50,9 +51,5 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender);
const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender);
long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender);
-/**
- * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
- */
-pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender);
#endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
diff --git a/bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt
index 78ae8e1..59e4ca5 100644
--- a/bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt
@@ -24,7 +24,7 @@ celix_bundle_files(pubsub_avrobin_serialization_descriptor
add_executable(test_pubsub_serializer_avrobin
src/PubSubAvrobinSerializationProviderTestSuite.cc
)
-target_link_libraries(test_pubsub_serializer_avrobin PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main)
+target_link_libraries(test_pubsub_serializer_avrobin PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main Celix::pubsub_spi)
target_compile_options(test_pubsub_serializer_avrobin PRIVATE -std=c++14) #Note test code is allowed to be C++14
add_dependencies(test_pubsub_serializer_avrobin celix_pubsub_serializer_avrobin_bundle pubsub_avrobin_serialization_descriptor_bundle)
diff --git a/bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c b/bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
index 9c81e05..2008613 100644
--- a/bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
+++ b/bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
@@ -110,7 +110,7 @@ void pubsub_avrobinSerializationProvider_freeDeserializeMsg(pubsub_serialization
}
pubsub_serialization_provider_t* pubsub_avrobinSerializationProvider_create(celix_bundle_context_t* ctx) {
- pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "avrobin", 0, pubsub_avrobinSerializationProvider_serialize, pubsub_avrobinSerializationProvider_freeSerializeMsg, pubsub_avrobinSerializationProvider_deserialize, pubsub_avrobinSerializationProvider_freeDeserializeMsg);
+ pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "avrobin", false, 0, pubsub_avrobinSerializationProvider_serialize, pubsub_avrobinSerializationProvider_freeSerializeMsg, pubsub_avrobinSerializationProvider_deserialize, pubsub_avrobinSerializationProvider_freeDeserializeMsg);
avrobinSerializer_logSetup(dfi_log, pubsub_serializationProvider_getLogHelper(provider), 1);
return provider;
}
diff --git a/bundles/pubsub/pubsub_serializer_json/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_serializer_json/gtest/CMakeLists.txt
index a065ebe..9a07c94 100644
--- a/bundles/pubsub/pubsub_serializer_json/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_serializer_json/gtest/CMakeLists.txt
@@ -24,7 +24,7 @@ celix_bundle_files(pubsub_json_serialization_descriptor
add_executable(test_pubsub_serializer_json
src/PubSubJsonSerializationProviderTestSuite.cc
)
-target_link_libraries(test_pubsub_serializer_json PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main)
+target_link_libraries(test_pubsub_serializer_json PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main Celix::pubsub_spi)
target_compile_options(test_pubsub_serializer_json PRIVATE -std=c++14) #Note test code is allowed to be C++14
add_dependencies(test_pubsub_serializer_json celix_pubsub_serializer_json_bundle pubsub_json_serialization_descriptor_bundle)
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_json_serialization_provider.c b/bundles/pubsub/pubsub_serializer_json/src/pubsub_json_serialization_provider.c
index 605d108..a43f6b5 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_json_serialization_provider.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_json_serialization_provider.c
@@ -106,7 +106,7 @@ static void pubsub_jsonSerializationProvider_freeDeserializeMsg(pubsub_serializa
}
pubsub_serialization_provider_t* pubsub_jsonSerializationProvider_create(celix_bundle_context_t* ctx) {
- pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "json", 0, pubsub_jsonSerializationProvider_serialize, pubsub_jsonSerializationProvider_freeSerializeMsg, pubsub_jsonSerializationProvider_deserialize, pubsub_jsonSerializationProvider_freeDeserializeMsg);
+ pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "json", true, 0, pubsub_jsonSerializationProvider_serialize, pubsub_jsonSerializationProvider_freeSerializeMsg, pubsub_jsonSerializationProvider_deserialize, pubsub_jsonSerializationProvider_freeDeserializeMsg);
jsonSerializer_logSetup(dfi_log, pubsub_serializationProvider_getLogHelper(provider), 1);;
return provider;
}
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
new file mode 100644
index 0000000..a4ff06b
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
+
+#include "hash_map.h"
+#include "version.h"
+#include "celix_bundle.h"
+#include "sys/uio.h"
+
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME "pubsub_message_serialization_marker"
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION "1.0.0"
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_RANGE "[1,2)"
+
+/**
+ * @brief Service property (named "serialization.type") identifying the serialization type (e.g json, avrobin, etc)
+ */
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY "serialization.type"
+
+/**
+ * @brief Service property (named "serialization.backwards.compatible") identifying whether the serialization is
+ * backwards compatible (i.e. for json a extra - new - field can be safely ignored and is thus backwards compatible.
+ *
+ * Type if boolean. If service property is not present, false will be used.
+ */
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE "serialization.backwards.compatible"
+
+/**
+ * @brief Marker interface - interface with no methods - to indicate that a serialization.type is available.
+ *
+ * This marker interface is used to indicate that a serialization type is available without the need to rely on
+ * pubsub message serialization service per msg type.
+ * The service.ranking of this marker interface must be used to select a serialization type if no serialization type is
+ * configured. The service.ranking of individual pubsub_messge_serization_service is used to override serializers per
+ * type.
+ *
+ * The properties serialization.type is mandatory
+ */
+typedef struct pubsub_message_serialization_marker {
+ void* handle;
+} pubsub_message_serialization_marker_t;
+
+#endif /* PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h
index 4464cb3..09df5a5 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h
@@ -25,15 +25,6 @@
#include "celix_bundle.h"
#include "sys/uio.h"
-/**
- * There should be a pubsub_serializer_t
- * per msg type (msg id) per bundle
- *
- * The pubsub_serializer_service can create
- * a serializer_map per bundle. Potentially using
- * the extender pattern.
- */
-
#define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME "pubsub_message_serialization_service"
#define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_VERSION "1.0.0"
#define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE "[1,2)"
@@ -44,7 +35,7 @@
#define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY "msg.id"
/**
- * A message serialization service for a serialization type (e.g. json) and
+ * @brief A message serialization service for a serialization type (e.g. json) and
* for a specific msg type (based on the fully qualified name) and version.
*
* The properties serialization.type, msg,fqn, msg.version and msg.id are mandatory
@@ -53,7 +44,7 @@ typedef struct pubsub_message_serialization_service {
void* handle;
/**
- * Serialize a message into iovec structs (set of structures with buffer pointer and length)
+ * @brief Serialize a message into iovec structs (set of structures with buffer pointer and length)
*
* The correct message serialization services will be selected based on the provided msgId.
*
@@ -67,12 +58,12 @@ typedef struct pubsub_message_serialization_service {
celix_status_t (*serialize)(void* handle, const void* input, struct iovec** output, size_t* outputIovLen);
/**
- * Free the memory of for the serialized msg.
+ * @brief Free the memory of for the serialized msg.
*/
void (*freeSerializedMsg)(void* handle, struct iovec* input, size_t inputIovLen);
/**
- * Deserialize a message using the provided iovec buffers.
+ * @brief Deserialize a message using the provided iovec buffers.
*
* The deserialize function will also check if the target major/minor version of the message is valid with the version
* of the serialized data.
@@ -95,7 +86,7 @@ typedef struct pubsub_message_serialization_service {
celix_status_t (*deserialize)(void* handle, const struct iovec* input, size_t inputIovLen, void** out); //note inputLen can be 0 if predefined size is not needed
/**
- * Free the memory for the deserialized message.
+ * @brief Free the memory for the deserialized message.
*/
void (*freeDeserializedMsg)(void* handle, void* msg);
diff --git a/bundles/pubsub/pubsub_utils/CMakeLists.txt b/bundles/pubsub/pubsub_utils/CMakeLists.txt
index 8426737..0c285aa 100644
--- a/bundles/pubsub/pubsub_utils/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_utils/CMakeLists.txt
@@ -28,7 +28,8 @@ target_include_directories(pubsub_utils PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>
$<INSTALL_INTERFACE:include/celix/pubsub_utils>
)
-target_link_libraries(pubsub_utils PUBLIC Celix::framework Celix::pubsub_api Celix::pubsub_spi Celix::log_helper Celix::shell_api)
+target_link_libraries(pubsub_utils PUBLIC Celix::framework Celix::pubsub_api Celix::log_helper Celix::shell_api)
+target_link_libraries(pubsub_utils PRIVATE Celix::pubsub_spi)
add_library(Celix::pubsub_utils ALIAS pubsub_utils)
diff --git a/bundles/pubsub/pubsub_utils/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_utils/gtest/CMakeLists.txt
index ad7cce4..a883776 100644
--- a/bundles/pubsub/pubsub_utils/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_utils/gtest/CMakeLists.txt
@@ -51,7 +51,7 @@ add_executable(test_pubsub_utils
src/PubSubSerializationProviderTestSuite.cc
src/PubSubMatchingTestSuite.cpp
)
-target_link_libraries(test_pubsub_utils PRIVATE Celix::framework Celix::pubsub_utils GTest::gtest GTest::gtest_main)
+target_link_libraries(test_pubsub_utils PRIVATE Celix::framework Celix::pubsub_utils GTest::gtest GTest::gtest_main Celix::pubsub_spi)
target_compile_options(test_pubsub_utils PRIVATE -std=c++14) #Note test code is allowed to be C++14
add_test(NAME test_pubsub_utils COMMAND test_pubsub_utils)
setup_target_for_coverage(test_pubsub_utils SCAN_DIR ..)
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp b/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
index ff26f25..10555a3 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
@@ -20,16 +20,17 @@
#include "gtest/gtest.h"
#include <memory>
+#include <cstdarg>
-#include <celix_api.h>
-#include "pubsub_serializer_handler.h"
+#include "celix_api.h"
+#include "pubsub_message_serialization_service.h"
#include "dyn_message.h"
-#include <cstdarg>
-#include <pubsub_protocol.h>
-#include <pubsub_constants.h>
-#include <pubsub_matching.h>
-#include <pubsub/api.h>
-#include <pubsub_endpoint.h>
+#include "pubsub_protocol.h"
+#include "pubsub_constants.h"
+#include "pubsub_matching.h"
+#include "pubsub/api.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_message_serialization_marker.h"
static void stdLog(void*, int level, const char *file, int line, const char *msg, ...) {
va_list ap;
@@ -54,70 +55,46 @@ public:
bndId = celix_bundleContext_installBundle(ctx.get(), MATCHING_BUNDLE, true);
dynMessage_logSetup(stdLog, NULL, 1);
-
- msgSerSvc.handle = this;
- msgSerSvc.serialize = [](void*, const void*, struct iovec**, size_t*) -> celix_status_t {
- return CELIX_SUCCESS;
- };
- msgSerSvc.freeSerializedMsg = [](void*, struct iovec* , size_t) {
- };
- msgSerSvc.deserialize = [](void*, const struct iovec*, size_t, void**) -> celix_status_t {
- return CELIX_SUCCESS;
- };
- msgSerSvc.freeDeserializedMsg = [](void*, void*) {
- };
}
~PubSubMatchingTestSuite() override {
celix_bundleContext_uninstallBundle(ctx.get(), bndId);
}
- long registerSerSvc(const char* type, uint32_t msgId, const char* msgFqn, const char* msgVersion, long ranking) {
+ long registerMarkerSerSvc(const char* type) {
auto* p = celix_properties_create();
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, type);
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, std::to_string(msgId).c_str());
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, msgFqn);
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
- celix_properties_setLong(p, OSGI_FRAMEWORK_SERVICE_RANKING, ranking);
+ celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, type);
celix_service_registration_options_t opts{};
- opts.svc = static_cast<void*>(&msgSerSvc);
+ opts.svc = static_cast<void*>(&serMarkerSvc);
opts.properties = p;
- opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_VERSION;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+ opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION;
return celix_bundleContext_registerServiceWithOptions(ctx.get(), &opts);
}
std::shared_ptr<celix_framework_t> fw{};
std::shared_ptr<celix_bundle_context_t> ctx{};
- pubsub_message_serialization_service_t msgSerSvc{};
+ pubsub_message_serialization_marker_t serMarkerSvc{};
pubsub_protocol_service_t protocolSvc{};
long bndId{};
};
TEST_F(PubSubMatchingTestSuite, MatchPublisherSimple) {
- auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
-
+ auto serMarkerId = registerMarkerSerSvc("fiets");
long foundSvcId = -1;
-
pubsub_utils_matchPublisher(ctx.get(), bndId, "(&(objectClass=pubsub.publisher)(service.lang=C)(topic=fiets))", "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
- EXPECT_EQ(foundSvcId, serId);
-
- celix_bundleContext_unregisterService(ctx.get(), serId);
+ EXPECT_EQ(foundSvcId, serMarkerId);
+ celix_bundleContext_unregisterService(ctx.get(), serMarkerId);
}
TEST_F(PubSubMatchingTestSuite, MatchPublisherMultiple) {
- auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
- auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
- auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
- auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
-
+ auto serFietsId = registerMarkerSerSvc("fiets");
+ auto serFiets2Id = registerMarkerSerSvc("fiets");
+ auto serAutoId = registerMarkerSerSvc("auto");
+ auto serBelId = registerMarkerSerSvc("bel");
long foundSvcId = -1;
-
pubsub_utils_matchPublisher(ctx.get(), bndId, "(&(objectClass=pubsub.publisher)(service.lang=C)(topic=fiets))", "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
- EXPECT_EQ(foundSvcId, serFiets2Id);
-
+ EXPECT_EQ(foundSvcId, serFietsId); //older service are ranked higher
celix_bundleContext_unregisterService(ctx.get(), serFietsId);
celix_bundleContext_unregisterService(ctx.get(), serFiets2Id);
celix_bundleContext_unregisterService(ctx.get(), serAutoId);
@@ -125,26 +102,23 @@ TEST_F(PubSubMatchingTestSuite, MatchPublisherMultiple) {
}
TEST_F(PubSubMatchingTestSuite, MatchSubscriberSimple) {
- auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
+ auto serId = registerMarkerSerSvc("fiets");
long foundSvcId = -1;
auto* p = celix_properties_create();
celix_properties_set(p, PUBSUB_SUBSCRIBER_SCOPE, "scope");
celix_properties_set(p, PUBSUB_SUBSCRIBER_TOPIC, "fiets");
-
pubsub_utils_matchSubscriber(ctx.get(), bndId, p, "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
EXPECT_EQ(foundSvcId, serId);
-
celix_properties_destroy(p);
celix_bundleContext_unregisterService(ctx.get(), serId);
}
TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
- auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
- auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
- auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
- auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
+ auto serFietsId = registerMarkerSerSvc("fiets");
+ auto serFiets2Id = registerMarkerSerSvc("fiets");
+ auto serAutoId = registerMarkerSerSvc("auto");
+ auto serBelId = registerMarkerSerSvc("bel");
long foundSvcId = -1;
@@ -154,7 +128,7 @@ TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
pubsub_utils_matchSubscriber(ctx.get(), bndId, p, "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
- EXPECT_EQ(foundSvcId, serFiets2Id);
+ EXPECT_EQ(foundSvcId, serFietsId);
celix_properties_destroy(p);
celix_bundleContext_unregisterService(ctx.get(), serFietsId);
@@ -164,7 +138,7 @@ TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
}
TEST_F(PubSubMatchingTestSuite, MatchEndpointSimple) {
- auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
+ auto serId = registerMarkerSerSvc("fiets");
long foundSvcId = -1;
@@ -183,10 +157,10 @@ TEST_F(PubSubMatchingTestSuite, MatchEndpointSimple) {
}
TEST_F(PubSubMatchingTestSuite, MatchEndpointMultiple) {
- auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
- auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
- auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
- auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
+ auto serFietsId = registerMarkerSerSvc("fiets");
+ auto serFiets2Id = registerMarkerSerSvc("fiets");
+ auto serAutoId = registerMarkerSerSvc("auto");
+ auto serBelId = registerMarkerSerSvc("bel");
long foundSvcId = -1;
@@ -197,7 +171,7 @@ TEST_F(PubSubMatchingTestSuite, MatchEndpointMultiple) {
pubsub_utils_matchEndpoint(ctx.get(), logHelper, ep, "admin?", false, &foundSvcId, NULL);
- EXPECT_EQ(foundSvcId, serFiets2Id);
+ EXPECT_EQ(foundSvcId, serFietsId);
celix_properties_destroy(ep);
celix_logHelper_destroy(logHelper);
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
index ce7948f..77a2fa0 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
@@ -20,11 +20,13 @@
#include "gtest/gtest.h"
#include <memory>
+#include <cstdarg>
-#include <celix_api.h>
+#include "celix_api.h"
+#include "pubsub_message_serialization_service.h"
#include "pubsub_serializer_handler.h"
#include "dyn_message.h"
-#include <cstdarg>
+#include "pubsub_message_serialization_marker.h"
static void stdLog(void*, int level, const char *file, int line, const char *msg, ...) {
va_list ap;
@@ -97,6 +99,7 @@ public:
TEST_F(PubSubSerializationHandlerTestSuite, CreateDestroy) {
auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
ASSERT_TRUE(handler != nullptr);
+ ASSERT_STREQ("json", pubsub_serializerHandler_getSerializationType(handler));
pubsub_serializerHandler_destroy(handler);
}
@@ -108,7 +111,6 @@ TEST_F(PubSubSerializationHandlerTestSuite, SerializationServiceFound) {
EXPECT_EQ(42, pubsub_serializerHandler_getMsgId(handler, "example::Msg"));
auto *fqn = pubsub_serializerHandler_getMsgFqn(handler, 42);
EXPECT_STREQ("example::Msg", fqn);
- free(fqn);
EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 0));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
@@ -175,6 +177,8 @@ TEST_F(PubSubSerializationHandlerTestSuite, MultipleVersions) {
EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 14));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 1));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMajorVersion(handler, 42), 1);
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMinorVersion(handler, 42), 0);
celix_bundleContext_unregisterService(ctx.get(), svcId1);
celix_bundleContext_unregisterService(ctx.get(), svcId2);
@@ -193,6 +197,8 @@ TEST_F(PubSubSerializationHandlerTestSuite, NoBackwardsCompatbile) {
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 14));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 1));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMajorVersion(handler, 42), 1);
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMinorVersion(handler, 42), 0);
celix_bundleContext_unregisterService(ctx.get(), svcId1);
pubsub_serializerHandler_destroy(handler);
@@ -260,4 +266,51 @@ TEST_F(PubSubSerializationHandlerTestSuite, BackwardsCompatibleCall) {
celix_bundleContext_unregisterService(ctx.get(), svcId1);
pubsub_serializerHandler_destroy(handler);
+}
+
+TEST_F(PubSubSerializationHandlerTestSuite, CreateHandlerFromMarker) {
+ auto* logHelper = celix_logHelper_create(ctx.get(), "test");
+ auto* marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), 1032 /*invalid*/, logHelper);
+ EXPECT_FALSE(marker); //non existing svc
+
+ pubsub_message_serialization_marker_t markerSvc;
+ long svcId = celix_bundleContext_registerService(ctx.get(), &markerSvc, PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, NULL);
+ EXPECT_GE(svcId, 0);
+ marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), svcId, logHelper);
+ EXPECT_FALSE(marker); //missing ser type service property
+ celix_bundleContext_unregisterService(ctx.get(), svcId);
+
+ auto* props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, "test");
+ svcId = celix_bundleContext_registerService(ctx.get(), &markerSvc, PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, props);
+ EXPECT_GE(svcId, 0);
+ marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), svcId, logHelper);
+ EXPECT_TRUE(marker);
+ EXPECT_STREQ("test", pubsub_serializerHandler_getSerializationType(marker));
+ celix_bundleContext_unregisterService(ctx.get(), svcId);
+ pubsub_serializerHandler_destroy(marker);
+
+ celix_logHelper_destroy(logHelper);
+}
+
+TEST_F(PubSubSerializationHandlerTestSuite, GetMsgInfo) {
+ auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
+ EXPECT_FALSE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+ EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+
+ long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+ EXPECT_TRUE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+ EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+ const char* msgFqn;
+ int major;
+ int minor;
+ EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, &msgFqn, &major, &minor));
+ EXPECT_STREQ("example::Msg1", msgFqn);
+ EXPECT_EQ(1, major);
+ EXPECT_EQ(0, minor);
+
+ celix_bundleContext_unregisterService(ctx.get(), svcId1);
+ pubsub_serializerHandler_destroy(handler);
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc
index a7f660c..e9ec48d 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc
@@ -21,7 +21,8 @@
#include <memory>
-#include <celix_api.h>
+#include "celix_api.h"
+#include "pubsub_message_serialization_marker.h"
#include "pubsub_serialization_provider.h"
class PubSubSerializationProviderTestSuite : public ::testing::Test {
@@ -47,12 +48,14 @@ public:
TEST_F(PubSubSerializationProviderTestSuite, CreateDestroy) {
//checks if the bundles are started and stopped correctly (no mem leaks).
- auto* provider = pubsub_serializationProvider_create(ctx.get(), "test", 0, nullptr, nullptr, nullptr, nullptr);
+ auto* provider = pubsub_serializationProvider_create(ctx.get(), "test", false, 0, nullptr, nullptr, nullptr, nullptr);
+ auto count = celix_bundleContext_useService(ctx.get(), PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, nullptr, nullptr);
+ EXPECT_EQ(1, count);
pubsub_serializationProvider_destroy(provider);
}
TEST_F(PubSubSerializationProviderTestSuite, FindSerializationServices) {
- auto* provider = pubsub_serializationProvider_create(ctx.get(), "test", 0, nullptr, nullptr, nullptr, nullptr);
+ auto* provider = pubsub_serializationProvider_create(ctx.get(), "test", false, 0, nullptr, nullptr, nullptr, nullptr);
size_t nrEntries = pubsub_serializationProvider_nrOfEntries(provider);
EXPECT_EQ(5, nrEntries);
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serialization_provider.h b/bundles/pubsub/pubsub_utils/include/pubsub_serialization_provider.h
index 030c0bb..c5bfab2 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serialization_provider.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serialization_provider.h
@@ -49,10 +49,14 @@ typedef struct {
bool valid;
const char* invalidReason;
+
+ //custom user data, will initialized to NULL. If freeUserData is set during destruction of the entry, this will be called.
+ void* userData;
+ void (*freeUserData)(void* userData);
} pubsub_serialization_entry_t;
/**
- * Creates A (descriptor based) Serialization Provider.
+ * @brief Creates A (descriptor based) Serialization Provider.
*
* The provider monitors bundles and creates pubsub message serialization services for every unique descriptor found.
*
@@ -73,6 +77,8 @@ typedef struct {
*
* @param ctx The bundle context
* @param serializationType The serialization type (e.g. 'json')
+ * @param backwardsCompatible Whether the serializer can deserialize data if the minor version is higher. (note true for JSON)
+ * Will be used to set the 'serialization.backwards.compatible' service property for the pusbub_message_serialization_marker
* @param serializationServiceRanking The service raking used for the serialization marker service.
* @param serialize The serialize function to use
* @param freeSerializeMsg The freeSerializeMsg function to use
@@ -84,6 +90,7 @@ typedef struct {
pubsub_serialization_provider_t *pubsub_serializationProvider_create(
celix_bundle_context_t *ctx,
const char* serializationType,
+ bool backwardsCompatible,
long serializationServiceRanking,
celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen),
void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen),
@@ -97,17 +104,17 @@ void pubsub_serializationProvider_destroy(pubsub_serialization_provider_t *provi
/**
- * Returns the number of valid entries.
+ * @brief Returns the number of valid entries.
*/
size_t pubsub_serializationProvider_nrOfEntries(pubsub_serialization_provider_t *provider);
/**
- * Returns the number of invalid entries.
+ * @brief Returns the number of invalid entries.
*/
size_t pubsub_serializationProvider_nrOfInvalidEntries(pubsub_serialization_provider_t *provider);
/**
- * Returns the log helper of the serialization provider.
+ * @brief Returns the log helper of the serialization provider.
*/
celix_log_helper_t* pubsub_serializationProvider_getLogHelper(pubsub_serialization_provider_t *provider);
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
index b873901..f2c58ac 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
@@ -21,10 +21,10 @@
#define CELIX_PUBSUB_SERIALIZER_HANDLER_H
#include <stdint.h>
+#include <sys/uio.h>
+#include "celix_log_helper.h"
#include "celix_api.h"
-#include "pubsub_message_serialization_service.h"
-
#ifdef __cplusplus
extern "C" {
@@ -34,8 +34,10 @@ typedef struct pubsub_serializer_handler pubsub_serializer_handler_t; //opaque t
/**
- * Creates a handler which track pubsub_custom_msg_serialization_service services with a (serialization.type=<serializerType)) filter.
- * If multiple pubsub_message_serialization_service for the same msg fqn (targeted.msg.fqn property) the highest ranking service will be used.
+ * @brief Creates a pubsub serializer handler which tracks pubsub_custom_msg_serialization_service services using the provided serialization type.
+ *
+ * If there are multiple pubsub_message_serialization_service services for the same msg fqn
+ * (targeted.msg.fqn property) the highest ranking service will be used.
*
* The message handler assumes (and checks) that all provided serialization services do not clash in message ids (so every msgId should have its own msgFqn)
* and that only one version for a message serialization is registered.
@@ -53,14 +55,36 @@ typedef struct pubsub_serializer_handler pubsub_serializer_handler_t; //opaque t
*/
pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible);
+/**
+ * @brief Creates a pubsub serializer handler which tracks pubsub_custom_msg_serialization_service services using the serialization type of the provided
+ * marker service.id
+ *
+ * If there are multiple pubsub_message_serialization_service services for the same msg fqn
+ * (targeted.msg.fqn property) the highest ranking service will be used.
+ *
+ * The message handler assumes (and checks) that all provided serialization services do not clash in message ids (so every msgId should have its own msgFqn)
+ * and that only one version for a message serialization is registered.
+ * This means that all bundles in a single celix container (a single process) should all use the same version of messages.
+ *
+ * If backwards compatibility is supported, when serialized message with a higher minor version when available in the serializer handler are used to
+ * deserialize. This could be supported for serialization like json.
+ * So when a json message of version 1.1.x with content {"field1":"value1", "field2":"value2"} is deserialized to a version 1.0 (which only has field1),
+ * the message can and will be deserialized
+ *
+ * @param ctx The bundle contest.
+ * @param pubsubSerializerMarkerSvcId The service.id of the pubsub_serialization_marker to use for deferring serializationType and backwardsCompatible.
+ * @param logHelper Optional log helper. If provided will be used to log issues whit creating a serializer handler for the provided marker svc id.
+ * @return A newly created pubsub serializer handler.
+ */
+pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(celix_bundle_context_t* ctx, long pubsubSerializerMarkerSvcId, celix_log_helper_t* logHelper);
+/**
+ * @brief destroy the pubsub_serializer_handler and free the used memory.
+ */
void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler);
-void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
-void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
-
/**
- * Serialize a message into iovec structs (set of structures with buffer pointer and length)
+ * @brief Serialize a message into iovec structs (set of structures with buffer pointer and length)
*
* The correct message serialization services will be selected based on the provided msgId.
*
@@ -74,12 +98,12 @@ void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handl
celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* handler, uint32_t msgId, const void* input, struct iovec** output, size_t* outputIovLen);
/**
- * Free the memory of for the serialized msg.
+ * @brief Free the memory of for the serialized msg.
*/
celix_status_t pubsub_serializerHandler_freeSerializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, struct iovec* input, size_t inputIovLen);
/**
- * Deserialize a message using the provided iovec buffers.
+ * @brief Deserialize a message using the provided iovec buffers.
*
* The deserialize function will also check if the target major/minor version of the message is valid with the version
* of the serialized data.
@@ -102,34 +126,79 @@ celix_status_t pubsub_serializerHandler_freeSerializedMsg(pubsub_serializer_hand
celix_status_t pubsub_serializerHandler_deserialize(pubsub_serializer_handler_t* handler, uint32_t msgId, int serializedMajorVersion, int serializedMinorVersion, const struct iovec* input, size_t inputIovLen, void** out);
/**
- * Free the memory for the deserialized message.
+ * @brief Free the memory for the deserialized message.
*/
celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, void* msg);
/**
- * Whether the msg is support. More specifically:
+ * @brief Whether the msg is support. More specifically:
* - msg id is known and
* - a serialized msg with the provided major and minor version can be deserialized.
*/
bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* handler, uint32_t msgId, int majorVersion, int minorVersion);
/**
- * Get msg fqn from a msg id.
- * @return msg fqn or NULL if msg id is not known.
+ * @brief Whether the serializer handler has found 1 or more pubsub_message_serialization_service for the provided msg id.
+ */
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
+ * @brief Get msg fqn from a msg id.
+ * @return msg fqn or NULL if msg id is not known. msg fqn is valid as long as the handler exists.
*/
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
/**
- * Get a msg id from a msgFqn.
+ * @brief Get a msg id from a msgFqn.
* @return msg id or 0 if msg fqn is not known.
*/
uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler, const char* msgFqn);
/**
- * nr of serialization services found.
+ * @brief nr of serialization services found.
*/
size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializer_handler_t* handler);
+
+/**
+ * @brief Get the serializer type for this hanlder.
+ *
+ * Valid as long as the handler exists.
+ */
+const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler);
+
+/**
+ * @brief Returns the major version part of a message version.
+ *
+ * Returns -1 if message cannot be found.
+ */
+int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
+ * @brief Returns the minor version part of a message version.
+ *
+ * Returns -1 if message cannot be found.
+ */
+int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+
+/**
+ * @brief Returns msg info (fqn, major version, minor version) in 1 call.
+ *
+ * @param handler The serializer handler
+ * @param msgId The msg id where to get the info for
+ * @param msgFqnOut If not NULL will be set to the msgFqn (valid as long as the serializer handler is valid)
+ * @param msgMajorVersionOut If not NULL will be set to the msg major version
+ * @param msgMinorVersionOut If not NULL will be set to the msg minor version
+ * @return CELIX_SUCCESS on success, CELIX_ILLEGAL_ARGUMENT if the message for the provided msg id cannot be found.
+ */
+celix_status_t pubsub_serializerHandler_getMsgInfo(
+ pubsub_serializer_handler_t* handler,
+ uint32_t msgId,
+ const char** msgFqnOut,
+ int* msgMajorVersionOut,
+ int* msgMinorVersionOut);
+
#ifdef __cplusplus
}
#endif
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
deleted file mode 100644
index c64e62e..0000000
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef CELIX_PUBSUB_SERIALIZER_PROVIDER_H
-#define CELIX_PUBSUB_SERIALIZER_PROVIDER_H
-
-typedef struct pubsub_serializer_provider pubsub_serializer_provider_t; //opaque type
-
-
-/**
- * Creates a handler which track bundles and registers pubsub_custom_msg_serialization_service
- * for every descriptor file found for json serialization.
- *
- * Added properties:
- * serialization.type=json
- * targeted.msg.fqn=<descriptor fqn>
- * targeted.msg.id=<msg fqn hash or msg id annotated in the descriptor>
- * targeted.msg.version=<msg version in descriptor if present> (optional)
- * service.ranking=0
- *
- * For descriptor found multiple times (same fqn and version) only the first one is registered
- *
- */
-pubsub_serializer_provider_t* pubsub_providerHandler_create(celix_bundle_context_t* ctx, const char *serializerType /* i.e. json */);
-
-void pubsub_providerHandler_destroy(pubsub_serializer_provider_t* handler);
-
-void pubsub_providerHandler_addBundle(pubsub_serializer_provider_t* handler, const celix_bundle_t *bnd);
-void pubsub_providerHandler_removeBundle(pubsub_serializer_provider_t* handler, const celix_bundle_t *bnd);
-
-//note can be used for shell commands
-void pubsub_providerHandler_printRegisteredSerializer(pubsub_serializer_provider_t* handler, FILE *stream);
-
-#endif //CELIX_PUBSUB_SERIALIZER_PROVIDER_H
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
index b2a78df..6c90743 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
@@ -7,10 +7,10 @@
#include "celix_bundle.h"
-#include <pubsub_endpoint.h>
-#include <pubsub_admin.h>
-#include <pubsub_protocol.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_endpoint.h"
+#include "pubsub_protocol.h"
+#include "pubsub_admin.h"
+#include "pubsub_message_serialization_marker.h"
struct ps_utils_serializer_selection_data {
@@ -36,22 +36,19 @@ typedef struct ps_utils_retrieve_topic_properties_data {
static long getPSSerializer(celix_bundle_context_t *ctx, const char *requested_serializer) {
long svcId = -1L;
+ celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+
if (requested_serializer != NULL) {
char filter[512];
- int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, requested_serializer);
+ int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, requested_serializer);
if (written > 512) {
fprintf(stderr, "Cannot create serializer filter. need more than 512 char array\n");
} else {
- celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
- opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
opts.filter = filter;
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
} else {
- celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
- opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.ignoreServiceLanguage = true;
-
//note findService will automatically return the highest ranking service id
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
index d008191..b8b8503 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
@@ -24,6 +24,7 @@
#include <dirent.h>
#include <string.h>
+#include "pubsub_message_serialization_marker.h"
#include "celix_constants.h"
#include "dyn_function.h"
#include "celix_version.h"
@@ -71,6 +72,9 @@ struct pubsub_serialization_provider {
celix_shell_command_t cmdSvc;
long cmdSvcId;
+ pubsub_message_serialization_marker_t markerSvc;
+ long markerSvcId;
+
celix_thread_mutex_t mutex; //protects below
celix_array_list_t *serializationSvcEntries; //key = pubsub_serialization_entry;
};
@@ -586,6 +590,7 @@ void pubsub_serializationProvider_onInstalledBundle(void *handle, const celix_bu
pubsub_serialization_provider_t *pubsub_serializationProvider_create(
celix_bundle_context_t *ctx,
const char* serializationType,
+ bool backwardsCompatible,
long serializationServiceRanking,
celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen),
void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen),
@@ -608,6 +613,7 @@ pubsub_serialization_provider_t *pubsub_serializationProvider_create(
dynCommon_logSetup(dfi_log, provider, 1);
{
+ //Start bundle tracker and register pubsub_message_serialization services
celix_bundle_tracking_options_t opts = CELIX_EMPTY_BUNDLE_TRACKING_OPTIONS;
opts.callbackHandle = provider;
opts.onInstalled = pubsub_serializationProvider_onInstalledBundle;
@@ -616,13 +622,13 @@ pubsub_serialization_provider_t *pubsub_serializationProvider_create(
}
{
+ //Register shell command to query serializers
provider->cmdSvc.handle = provider;
provider->cmdSvc.executeCommand = pubsub_serializationProvider_executeCommand;
char *name = NULL;
asprintf(&name,"celix::%s_message_serialization", provider->serializationType);
char *usage = NULL;
- //TODO add support for listing invalid entries
asprintf(&usage,"celix::%s_message_serialization [verbose | invalids | <msg id> | <msg fqn>]", provider->serializationType);
celix_properties_t* props = celix_properties_create();
@@ -640,11 +646,28 @@ pubsub_serialization_provider_t *pubsub_serializationProvider_create(
free(name);
free(usage);
}
+
+ {
+ //Register pubsub_message_serialization_marker service to indicate the availability of this message serialization type.
+ celix_properties_t* props = celix_properties_create();
+ provider->markerSvc.handle = provider;
+ celix_properties_set(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, provider->serializationType);
+ celix_properties_setBool(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE, backwardsCompatible);
+ celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+ opts.svc = &provider->markerSvc;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+ opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION;
+ opts.properties = props;
+ provider->markerSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+ }
+
return provider;
}
void pubsub_serializationProvider_destroy(pubsub_serialization_provider_t* provider) {
if (provider != NULL) {
+ celix_bundleContext_unregisterService(provider->ctx, provider->markerSvcId);
+
celix_bundleContext_stopTracker(provider->ctx, provider->bundleTrackerId);
celix_bundleContext_unregisterService(provider->ctx, provider->cmdSvcId);
@@ -652,6 +675,9 @@ void pubsub_serializationProvider_destroy(pubsub_serialization_provider_t* provi
for (int i = 0; i < celix_arrayList_size(provider->serializationSvcEntries); ++i) {
pubsub_serialization_entry_t *entry = celix_arrayList_get(provider->serializationSvcEntries, i);
celix_bundleContext_unregisterService(provider->ctx, entry->svcId);
+ if (entry->freeUserData) {
+ entry->freeUserData(entry->userData);
+ }
free(entry->descriptorContent);
free(entry->readFromEntryPath);
free(entry->msgVersionStr);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index f5962f3..e339795 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -22,9 +22,11 @@
#include <string.h>
+#include "pubsub_message_serialization_marker.h"
#include "celix_version.h"
#include "pubsub_message_serialization_service.h"
#include "celix_log_helper.h"
+#include "celix_constants.h"
#define L_DEBUG(...) \
celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
@@ -40,20 +42,26 @@ typedef struct pubsub_serialization_service_entry {
const celix_properties_t *properties;
uint32_t msgId;
celix_version_t* msgVersion;
- char* msgFqn;
+ const char* msgFqn;
pubsub_message_serialization_service_t* svc;
} pubsub_serialization_service_entry_t;
struct pubsub_serializer_handler {
celix_bundle_context_t* ctx;
+ char* filter;
+ char* serType;
bool backwardCompatible;
long serializationSvcTrackerId;
celix_log_helper_t *logHelper;
celix_thread_rwlock_t lock;
hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+ hash_map_t *msgFullyQualifiedNames; //key = msg id, value = msg fqn. Non destructive map with msg fqn
};
+static void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
+static void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
+
static void addSerializationService(void *handle, void* svc, const celix_properties_t *props) {
pubsub_serializer_handler_t* handler = handle;
pubsub_message_serialization_service_t* serSvc = svc;
@@ -66,7 +74,7 @@ static void removeSerializationService(void *handle, void* svc, const celix_prop
pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
}
-int compareEntries(const void *a, const void *b) {
+static int compareEntries(const void *a, const void *b) {
const pubsub_serialization_service_entry_t* aEntry = a;
const pubsub_serialization_service_entry_t* bEntry = b;
@@ -98,61 +106,101 @@ static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serializat
return compatible;
}
-static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
- //NOTE assumes mutex is locked
- const char *result = NULL;
- celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
- if (entries != NULL) {
- pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
- result = entry->msgFqn;
- }
- return result;
-}
-
pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
handler->ctx = ctx;
+ handler->serType = celix_utils_strdup(serializerType);
handler->backwardCompatible = backwardCompatible;
handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
celixThreadRwlock_create(&handler->lock, NULL);
handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+ handler->msgFullyQualifiedNames = hashMap_create(NULL, NULL, NULL, NULL);
- char *filter = NULL;
- asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+ asprintf(&handler->filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
- opts.filter.filter = filter;
+ opts.filter.filter = handler->filter;
opts.callbackHandle = handler;
opts.addWithProperties = addSerializationService;
opts.removeWithProperties = removeSerializationService;
- handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- free(filter);
+ handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptionsAsync(ctx, &opts);
+
return handler;
}
+struct pubsub_serializerHandler_callback_data {
+ celix_bundle_context_t* ctx;
+ celix_log_helper_t* logHelper;
+ pubsub_serializer_handler_t* handler;
+};
+
+static void pubsub_serializerHandler_useMarkerSvcCallback(void *handle, void* svc __attribute__((unused)), const celix_properties_t* props) {
+ struct pubsub_serializerHandler_callback_data* data = handle;
+ const char* serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, NULL);
+ bool backwardsCompatible = celix_properties_getAsBool(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE, false);
+ if (serType != NULL) {
+ data->handler = pubsub_serializerHandler_create(data->ctx, serType, backwardsCompatible);
+ } else if (data->logHelper != NULL) {
+ celix_logHelper_error(
+ data->logHelper,
+ "Cannot create serializer handler because service %s does not have a %s service property",
+ PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME,
+ PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY);
+ }
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(celix_bundle_context_t* ctx, long pubsubSerializerMarkerSvcId, celix_log_helper_t* logHelper) {
+ struct pubsub_serializerHandler_callback_data data;
+ memset(&data, 0, sizeof(data));
+ data.ctx = ctx;
+ data.logHelper = logHelper;
+
+ char filter[32];
+ snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, pubsubSerializerMarkerSvcId);
+ celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+ opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+ opts.filter.filter = filter;
+ opts.callbackHandle = &data;
+ opts.useWithProperties = pubsub_serializerHandler_useMarkerSvcCallback;
+ bool called = celix_bundleContext_useServiceWithOptions(ctx, &opts);
+ if (!called && logHelper != NULL) {
+ celix_logHelper_error(
+ logHelper,
+ "Cannot find %s service for service id %li",
+ PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME,
+ pubsubSerializerMarkerSvcId);
+ }
+ return data.handler;
+}
+
+static void pubsub_serializerHandler_destroyCallback(void* data) {
+ pubsub_serializer_handler_t* handler = data;
+ celixThreadRwlock_destroy(&handler->lock);
+ hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+ for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+ pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+ celix_version_destroy(entry->msgVersion);
+ free(entry);
+ }
+ celix_arrayList_destroy(entries);
+ }
+ hashMap_destroy(handler->serializationServices, false, false);
+ hashMap_destroy(handler->msgFullyQualifiedNames, false, true);
+ celix_logHelper_destroy(handler->logHelper);
+ free(handler->serType);
+ free(handler->filter);
+ free(handler);
+}
void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
if (handler != NULL) {
- celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
- celixThreadRwlock_destroy(&handler->lock);
- hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
- while (hashMapIterator_hasNext(&iter)) {
- celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
- for (int i = 0; i < celix_arrayList_size(entries); ++i) {
- pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
- free(entry->msgFqn);
- celix_version_destroy(entry->msgVersion);
- free(entry);
- }
- celix_arrayList_destroy(entries);
- }
- hashMap_destroy(handler->serializationServices, false, false);
- celix_logHelper_destroy(handler->logHelper);
- free(handler);
+ celix_bundleContext_stopTrackerAsync(handler->ctx, handler->serializationSvcTrackerId, handler, pubsub_serializerHandler_destroyCallback);
}
}
@@ -190,6 +238,12 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
}
if (valid) {
+ char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
+ if (fqn == NULL) {
+ fqn = celix_utils_strdup(msgFqn);
+ hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, fqn);
+ }
+
celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
if (entries == NULL) {
entries = celix_arrayList_create();
@@ -197,17 +251,18 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry));
entry->svcId = svcId;
entry->properties = svcProperties;
- entry->msgFqn = celix_utils_strdup(msgFqn);
+ entry->msgFqn = fqn;
entry->msgId = msgId;
entry->msgVersion = msgVersion;
entry->svc = svc;
celix_arrayList_add(entries, entry);
celix_arrayList_sort(entries, compareEntries);
- hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+ hashMap_put(handler->serializationServices, (void*)(uintptr_t)msgId, entries);
} else {
celix_version_destroy(msgVersion);
}
+
celixThreadRwlock_unlock(&handler->lock);
}
@@ -233,7 +288,6 @@ void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handl
}
}
if (found != NULL) {
- free(found->msgFqn);
celix_version_destroy(found->msgVersion);
free(found);
}
@@ -322,12 +376,18 @@ bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* ha
return compatible;
}
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId) {
celixThreadRwlock_readLock(&handler->lock);
- char *msgFqn = celix_utils_strdup(getMsgFqn(handler, msgId));
+ void* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
celixThreadRwlock_unlock(&handler->lock);
- return msgFqn;
+ return entries != NULL;
+}
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+ celixThreadRwlock_readLock(&handler->lock);
+ char *msgFqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
+ celixThreadRwlock_unlock(&handler->lock);
+ return msgFqn;
}
uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler, const char* msgFqn) {
@@ -344,6 +404,29 @@ uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler,
celixThreadRwlock_unlock(&handler->lock);
return result;
}
+int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+ celixThreadRwlock_readLock(&handler->lock);
+ int major = -1;
+ celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ if (entries != NULL) {
+ pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+ major = celix_version_getMajor(entry->msgVersion);
+ }
+ celixThreadRwlock_unlock(&handler->lock);
+ return major;
+}
+
+int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+ celixThreadRwlock_readLock(&handler->lock);
+ int major = -1;
+ celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ if (entries != NULL) {
+ pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+ major = celix_version_getMinor(entry->msgVersion);
+ }
+ celixThreadRwlock_unlock(&handler->lock);
+ return major;
+}
size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializer_handler_t* handler) {
size_t count = 0;
@@ -355,4 +438,35 @@ size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializ
}
celixThreadRwlock_unlock(&handler->lock);
return count;
+}
+
+const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler) {
+ return handler->serType;
+}
+
+int pubsub_serializerHandler_getMsgInfo(
+ pubsub_serializer_handler_t* handler,
+ uint32_t msgId,
+ const char** msgFqnOut,
+ int* msgMajorVersionOut,
+ int* msgMinorVersionOut) {
+ int result = CELIX_SUCCESS;
+ celixThreadRwlock_readLock(&handler->lock);
+ celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ if (entries != NULL) {
+ pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+ if (msgFqnOut != NULL) {
+ *msgFqnOut = entry->msgFqn;
+ }
+ if (msgMajorVersionOut != NULL) {
+ *msgMajorVersionOut = celix_version_getMajor(entry->msgVersion);
+ }
+ if (msgMinorVersionOut != NULL) {
+ *msgMinorVersionOut = celix_version_getMinor(entry->msgVersion);
+ }
+ } else {
+ result = CELIX_ILLEGAL_ARGUMENT;
+ }
+ celixThreadRwlock_unlock(&handler->lock);
+ return result;
}
\ No newline at end of file
diff --git a/cmake/cmake_celix/Generic.cmake b/cmake/cmake_celix/Generic.cmake
index ae82881..e44d3fa 100644
--- a/cmake/cmake_celix/Generic.cmake
+++ b/cmake/cmake_celix/Generic.cmake
@@ -30,7 +30,6 @@ add_celix_bundle_dependencies(<cmake_target>
function(add_celix_bundle_dependencies)
list(GET ARGN 0 TARGET)
list(REMOVE_AT ARGN 0)
- message("TEST")
foreach(BUNDLE_TARGET IN LISTS ARGN)
if (TARGET ${BUNDLE_TARGET})
get_target_property(IMPORT ${BUNDLE_TARGET} BUNDLE_IMPORTED)
diff --git a/libs/utils/include/celix_byteswap.h b/libs/utils/include/celix_byteswap.h
index 47c413a..aa60e09 100644
--- a/libs/utils/include/celix_byteswap.h
+++ b/libs/utils/include/celix_byteswap.h
@@ -24,7 +24,7 @@
#if defined(__APPLE__)
/* Swap bytes in 16 bit value. */
#define bswap_16(x) \
- ((((x) & 0xff00) << 8) | (((x) & 0x00ff) >> 8))
+ ((((x) & 0xff00) >> 8) | (((x) & 0x00ff) << 8))
#define __bswap_16 bswap_16
/* Swap bytes in 32 bit value. */