You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/05/26 18:04:14 UTC
[celix] 01/02: Adds use of pubsub serializer handler to pubsub v2
tcp and v2 zmq
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 14f3bc1a81ad4d4a672f899f360d93ee0603bfd4
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed May 26 19:05:17 2021 +0200
Adds use of pubsub serializer handler to pubsub v2 tcp and v2 zmq
---
.../pubsub/pubsub_admin_tcp/v2/src/psa_activator.c | 15 --
.../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c | 247 +++++----------------
.../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h | 4 -
.../v2/src/pubsub_tcp_topic_receiver.c | 70 ++----
.../v2/src/pubsub_tcp_topic_receiver.h | 7 +-
.../v2/src/pubsub_tcp_topic_sender.c | 72 +++---
.../v2/src/pubsub_tcp_topic_sender.h | 4 +-
.../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c | 20 --
.../v2/src/pubsub_zmq_topic_receiver.c | 12 +-
.../v2/src/pubsub_zmq_topic_receiver.h | 2 +-
.../v2/src/pubsub_zmq_topic_sender.c | 22 +-
.../v2/src/pubsub_zmq_topic_sender.h | 2 +-
.../src/PubSubSerializationHandlerTestSuite.cc | 23 +-
.../include/pubsub_serializer_handler.h | 27 ++-
.../pubsub_utils/src/pubsub_serializer_handler.c | 108 ++++++---
15 files changed, 248 insertions(+), 387 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
index ec9badb..ec3f853 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
@@ -20,7 +20,6 @@
#include <stdlib.h>
#include "celix_api.h"
-#include "pubsub_serializer.h"
#include "pubsub_protocol.h"
#include "celix_log_helper.h"
@@ -34,7 +33,6 @@ typedef struct psa_tcp_activator {
pubsub_tcp_admin_t *admin;
- long serializersTrackerId;
long protocolsTrackerId;
pubsub_admin_service_t adminService;
@@ -50,7 +48,6 @@ typedef struct psa_tcp_activator {
int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
act->adminSvcId = -1L;
act->cmdSvcId = -1L;
- act->serializersTrackerId = -1L;
act->protocolsTrackerId = -1L;
act->logHelper = celix_logHelper_create(ctx, "celix_psa_admin_tcp_v2");
@@ -58,17 +55,6 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
act->admin = pubsub_tcpAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
- //track serializers
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_tcpAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_tcpAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//track protocols
if (status == CELIX_SUCCESS) {
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
@@ -132,7 +118,6 @@ int psa_tcp_stop(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->adminSvcId);
celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
- celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
pubsub_tcpAdmin_destroy(act->admin);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
index 66c92ef..f5f200f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
@@ -55,11 +55,6 @@ struct pubsub_tcp_admin {
bool verbose;
struct {
- celix_thread_rwlock_t mutex;
- hash_map_t *map; //key = svcId, value = psa_tcp_serializer_entry_t*
- } serializers;
-
- struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = svcId, value = psa_tcp_protocol_entry_t*
} protocols;
@@ -79,6 +74,11 @@ struct pubsub_tcp_admin {
hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+ } serializationHandlers;
+
pubsub_tcp_endPointStore_t endpointStore;
};
@@ -101,11 +101,6 @@ static bool pubsub_tcpAdmin_endpointIsPublisher(const celix_properties_t *endpoi
return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
}
-static void pubsub_tcpAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
- const char** out = handle;
- *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) {
pubsub_tcp_admin_t *psa = calloc(1, sizeof(*psa));
psa->ctx = ctx;
@@ -120,11 +115,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_CONTROL_SCORE_KEY,
PSA_TCP_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadRwlock_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
celixThreadMutex_create(&psa->protocols.mutex, NULL);
psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
+
celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -137,6 +130,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo
celixThreadMutex_create(&psa->endpointStore.mutex, NULL);
psa->endpointStore.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+ psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
return psa;
}
@@ -177,13 +173,13 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- iter = hashMapIterator_construct(psa->serializers.map);
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ iter = hashMapIterator_construct(psa->serializationHandlers.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
- free(entry);
+ pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+ pubsub_serializerHandler_destroy(entry);
}
- celixThreadRwlock_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
celixThreadMutex_lock(&psa->protocols.mutex);
iter = hashMapIterator_construct(psa->protocols.map);
@@ -205,8 +201,9 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, false, false);
- celixThreadRwlock_destroy(&psa->serializers.mutex);
- hashMap_destroy(psa->serializers.map, false, false);
+ celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+ hashMap_destroy(psa->serializationHandlers.map, false, false);
+
celixThreadMutex_destroy(&psa->protocols.mutex);
hashMap_destroy(psa->protocols.map, false, false);
@@ -215,101 +212,6 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
free(psa);
}
-void pubsub_tcpAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_tcp_admin_t *psa = handle;
-
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
- const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
- const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
- if (serType == NULL || msgId == -1L || msgFqn == NULL) {
- L_INFO("[PSA_TCP_V2] Ignoring serializer service without one of the following properties: %s or %s or %s",
- PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
- L_INFO("[PSA_TCP_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
- return;
- }
- L_INFO("[PSA_TCP_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries == NULL) {
- typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
- hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
- L_INFO("[PSA_TCP_V2] typeEntries added %p %s", psa->serializers.map, serType);
- }
- psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
- if (entry == NULL) {
- entry = calloc(1, sizeof(psa_tcp_serializer_entry_t));
- entry->svc = svc;
- entry->fqn = celix_utils_strdup(msgFqn);
- entry->version = celix_utils_strdup(msgVersion);
- hashMap_put(typeEntries, (void*)msgId, entry);
- L_INFO("[PSA_TCP_V2] entry added");
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_tcp_admin_t *psa = handle;
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
- //remove serializer
- // 1) First find entry and
- // 2) loop and destroy all topic sender using the serializer and
- // 3) loop and destroy all topic receivers using the serializer
- // Note that it is the responsibility of the topology manager to create new topic senders/receivers
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries != NULL) {
- psa_tcp_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
- free((void*)entry->fqn);
- free((void*)entry->version);
- free(entry);
-
- // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
- if(hashMap_size(typeEntries) == 0) {
- hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_tcp_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
- if (sender != NULL && strncmp(serType, pubsub_tcpTopicSender_serializerType(sender), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_tcpTopicSender_destroy(sender);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter);
- pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry);
- if (receiver != NULL && strncmp(serType, pubsub_tcpTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(receiverEntry);
- hashMapIterator_remove(&iter);
- pubsub_tcpTopicReceiver_destroy(receiver);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
-}
-
void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
pubsub_tcp_admin_t *psa = handle;
@@ -421,30 +323,40 @@ pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *
return status;
}
+static pubsub_serializer_handler_t* pubsub_tcpAdmin_getSerializationHandler(pubsub_tcp_admin_t* psa, long msgSerializationMarkerSvcId) {
+ pubsub_serializer_handler_t* handler = NULL;
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+ if (handler == NULL) {
+ handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+ if (handler != NULL) {
+ hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+ }
+ }
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+ return handler;
+}
+
celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
const celix_properties_t *topicProperties, long serializerSvcId,
long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_tcp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
- //1) Create TopicSender
- //2) Store TopicSender
- //3) Connect existing endpoints
- //4) set outPublisherEndpoint
+ //1) Get serialization handler
+ //2) Create TopicSender
+ //3) Store TopicSender
+ //4) Connect existing endpoints
+ //5) set outPublisherEndpoint
+
+ pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic sender without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_tcpAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
@@ -453,14 +365,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
if (sender == NULL) {
psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
if (protEntry != NULL) {
- sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+ sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle, topicProperties,
&psa->endpointStore, protocolSvcId,
protEntry->svc);
}
if (sender != NULL) {
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *protType = protEntry->protType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+ pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
@@ -525,21 +438,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_tcp_admin_t *psa = handle;
- celix_properties_t *newEndpoint = NULL;
+ pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic receiver without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
+ celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_tcpAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
@@ -547,7 +454,8 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
if (receiver == NULL) {
psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
if (protEntry != NULL) {
- receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+ receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic,
+ handler, handle, topicProperties,
&psa->endpointStore, protocolSvcId, protEntry->svc);
} else {
L_ERROR("[PSA_TCP_V2] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
@@ -556,7 +464,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -795,53 +703,6 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
return status;
}
-
-
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
- pubsub_tcp_admin_t *psa = handle;
- psa_tcp_serializer_entry_t *serializer = NULL;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
- }
-
- return serializer;
-}
-
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer __attribute__((unused))) {
- pubsub_tcp_admin_t *psa = handle;
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
- pubsub_tcp_admin_t *psa = handle;
- int64_t id = -1L;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
- while(hashMapIterator_hasNext(&iterator)) {
- void *key = hashMapIterator_nextKey(&iterator);
- psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, key);
- L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn);
- if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
- id = (uint32_t)(uintptr_t)key;
- break;
- }
- }
- } else {
- L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn);
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
-
- return id;
-}
-
pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle) {
return NULL;
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
index 2440fbb..513a934 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
@@ -82,10 +82,6 @@ void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_propert
void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId);
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer);
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn);
-
pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle);
#endif //CELIX_PUBSUB_TCP_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 853e49d..b02768a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -56,7 +56,7 @@ struct pubsub_tcp_topic_receiver {
pubsub_protocol_service_t *protocol;
char *scope;
char *topic;
- char *serType;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
size_t timeout;
bool isPassive;
@@ -104,13 +104,12 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
static void processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime);
static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -119,7 +118,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
- receiver->serType = celix_utils_strdup(serType);
+ receiver->serializerHandler = serializerHandler;
receiver->admin = admin;
receiver->protocolSvcId = protocolSvcId;
receiver->protocol = protocol;
@@ -269,7 +268,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
}
}
hashMap_destroy(receiver->subscribers.map, false, false);
-
celixThreadMutex_unlock(&receiver->subscribers.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -299,7 +297,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
free(receiver->scope);
}
free(receiver->topic);
- free(receiver->serType);
}
free(receiver);
}
@@ -313,7 +310,7 @@ const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver)
}
const char *pubsub_tcpTopicReceiver_serializerType(pubsub_tcp_topic_receiver_t *receiver) {
- return receiver->serType;
+ return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver) {
@@ -460,47 +457,39 @@ static inline void
processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
//NOTE receiver->subscribers.mutex locked
- psa_tcp_serializer_entry_t *msgSer = pubsub_tcpAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, message->header.msgId);
- if(msgSer == NULL) {
- pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
- L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X. Received payload size is %u.", message->header.msgId, message->payload.length);
- return;
- }
+ const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
void *deSerializedMsg = NULL;
- celix_version_t *version = celix_version_createVersionFromString(msgSer->version);
- bool validVersion = psa_tcp_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion);
- celix_version_destroy(version);
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
if (validVersion) {
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = message->payload.payload;
deSerializeBuffer.iov_len = message->payload.length;
- celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
// When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver
if (message->payload.payload == deSerializedMsg) {
*releaseMsg = true;
}
- const char *msgType = msgSer->fqn;
if (status == CELIX_SUCCESS) {
uint32_t msgId = message->header.msgId;
celix_properties_t *metadata = message->metadata.metadata;
- bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata);
bool release = true;
if (cont) {
hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
while (hashMapIterator_hasNext(&iter)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle,msgType, msgId, deSerializedMsg, message->metadata.metadata, &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
+ svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
if (!release && hashMapIterator_hasNext(&iter)) {
//receive function has taken ownership and still more receive function to come ..
//deserialize again for new message
- status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+ status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
if (status != CELIX_SUCCESS) {
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
- msgType,
+ msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);
break;
@@ -509,19 +498,25 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
}
}
if (release) {
- msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deSerializedMsg);
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
}
if (message->metadata.metadata) {
celix_properties_destroy(message->metadata.metadata);
}
}
} else {
- L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgType,
+ L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
+ } else {
+ L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+ msgFqn,
+ pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+ (int)message->header.msgMajorVersion,
+ (int)message->header.msgMinorVersion,
+ pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
+ pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
}
-
- pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
}
static void
@@ -664,24 +659,3 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
- bool check = false;
-
- if (major == 0 && minor == 0) {
- //no check
- return true;
- }
-
- int versionMajor;
- int versionMinor;
- if (msgVersion!=NULL) {
- version_getMajor(msgVersion, &versionMajor);
- version_getMinor(msgVersion, &versionMinor);
- if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
- check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
- }
- }
-
- return check;
-}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
index a7de405..d06fe4a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
@@ -20,10 +20,11 @@
#ifndef CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
#define CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
-#include <pubsub_admin_metrics.h>
+#include "pubsub_admin_metrics.h"
#include "celix_bundle_context.h"
-#include <pubsub_protocol.h>
+#include "pubsub_protocol.h"
#include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_tcp_topic_receiver pubsub_tcp_topic_receiver_t;
@@ -31,7 +32,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 847d538..11d73d9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -57,12 +57,12 @@ struct pubsub_tcp_topic_sender {
pubsub_tcpHandler_t *socketHandler;
pubsub_tcpHandler_t *sharedSocketHandler;
pubsub_interceptors_handler_t *interceptorsHandler;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
char *scope;
char *topic;
char *url;
- char *serializerType;
bool isStatic;
bool isPassive;
bool verbose;
@@ -85,7 +85,6 @@ typedef struct psa_tcp_send_msg_entry {
uint8_t major;
uint8_t minor;
unsigned char originUUID[16];
-// pubsub_msg_serializer_t *msgSer;
pubsub_protocol_service_t *protSer;
struct iovec *serializedIoVecOutput;
size_t serializedIoVecOutputLen;
@@ -118,7 +117,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -127,7 +126,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serializerType = celix_utils_strdup(serializerType);
+ sender->serializerHandler = serializerHandler;
sender->admin = admin;
sender->protocolSvcId = protocolSvcId;
sender->protocol = protocol;
@@ -189,7 +188,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
- // Hhen passiveKey is specified, enable receive event for full-duplex connection using key.
+ // When passiveKey is specified, enable receive event for full-duplex connection using key.
// Because the topic receiver is already started, enable the receive event.
pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
@@ -301,7 +300,6 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
}
free(sender->topic);
free(sender->url);
- free(sender->serializerType);
free(sender);
}
}
@@ -319,7 +317,7 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
}
const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender) {
- return sender->serializerType;
+ return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
}
const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
@@ -337,15 +335,6 @@ bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
return sender->isPassive;
}
-static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
- psa_tcp_bounded_service_entry_t *entry = (psa_tcp_bounded_service_entry_t *) handle;
- int64_t rc = pubsub_tcpAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType);
- if(rc >= 0) {
- *msgTypeId = (unsigned int)rc;
- }
- return 0;
-}
-
static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties __attribute__((unused))) {
pubsub_tcp_topic_sender_t *sender = handle;
@@ -406,39 +395,35 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
psa_tcp_bounded_service_entry_t *bound = handle;
pubsub_tcp_topic_sender_t *sender = bound->parent;
- psa_tcp_serializer_entry_t *serializer = pubsub_tcpAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId);
-
- if(serializer == NULL) {
- pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
- L_WARN("[PSA_TCP_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- return CELIX_SERVICE_EXCEPTION;
- }
psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
- if(entry == NULL) {
- entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
- entry->protSer = sender->protocol;
- entry->type = msgTypeId;
- entry->fqn = serializer->fqn;
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- entry->major = (uint8_t)celix_version_getMajor(version);
- entry->minor = (uint8_t)celix_version_getMinor(version);
- celix_version_destroy(version);
- uuid_copy(entry->originUUID, sender->fwUUID);
- hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+ if (entry == NULL) {
+ const char* fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
+ if (fqn != NULL) {
+ entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
+ entry->protSer = sender->protocol;
+ entry->type = msgTypeId;
+ entry->fqn = fqn;
+ entry->major = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId);
+ entry->minor = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId);
+ uuid_copy(entry->originUUID, sender->fwUUID);
+ hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+ } else {
+ L_WARN("Cannot find message serialization for msg id %i", (int)msgTypeId);
+ }
}
delay_first_send_for_late_joiners(sender);
size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
struct iovec *serializedIoVecOutput = NULL;
- status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
+ status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
entry->serializedIoVecOutputLen = MAX(serializedIoVecOutputLen, entry->serializedIoVecOutputLen);
bool cont = false;
if (status == CELIX_SUCCESS) /*ser ok*/ {
- cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata);
+ cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata);
}
if (cont) {
pubsub_protocol_message_t message;
@@ -468,12 +453,12 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
status = -1;
sendOk = false;
}
- pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata);
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata);
if (message.metadata.metadata) {
celix_properties_destroy(message.metadata.metadata);
}
if (serializedIoVecOutput) {
- serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedIoVecOutput, serializedIoVecOutputLen);
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
serializedIoVecOutput = NULL;
}
}
@@ -482,12 +467,10 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
}
} else {
- L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn,
+ L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn,
sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
}
- pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
-
return status;
}
@@ -502,4 +485,11 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender)
usleep(sender->send_delay * 1000);
firstSend = false;
}
+}
+
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
+ psa_tcp_bounded_service_entry_t* entry = handle;
+ uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
+ *msgTypeId = (unsigned int)msgId;
+ return 0;
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
index dfb5014..29c8f7a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
@@ -24,6 +24,7 @@
#include "pubsub_admin_metrics.h"
#include "pubsub_protocol.h"
#include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_tcp_topic_sender pubsub_tcp_topic_sender_t;
@@ -32,7 +33,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -46,7 +47,6 @@ const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender);
bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
-long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
/**
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index e30403e..cc6de56 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -66,11 +66,6 @@ struct pubsub_zmq_admin {
bool verbose;
struct {
- celix_thread_rwlock_t mutex;
- hash_map_t *map; //key = svcId, value = psa_zmq_serializer_entry_t*
- } serializers;
-
- struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = svcId, value = psa_zmq_protocol_entry_t*
} protocols;
@@ -189,9 +184,6 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_lo
psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE);
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadRwlock_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
celixThreadMutex_create(&psa->protocols.mutex, NULL);
psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
@@ -241,15 +233,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- iter = hashMapIterator_construct(psa->serializers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_t *entry = hashMapIterator_nextValue(&iter);
- hashMap_destroy(entry, false, true);
- }
- hashMap_clear(psa->serializers.map, false, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
celixThreadMutex_lock(&psa->protocols.mutex);
iter = hashMapIterator_construct(psa->protocols.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -275,9 +258,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, false, false);
- celixThreadRwlock_destroy(&psa->serializers.mutex);
- hashMap_destroy(psa->serializers.map, false, false);
-
celixThreadMutex_destroy(&psa->protocols.mutex);
hashMap_destroy(psa->protocols.map, false, false);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 1051d77..a00ce34 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -95,7 +95,6 @@ struct pubsub_zmq_topic_receiver {
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
- hash_map_t *msgFqns; //key = msg id, value = char* msgFqn
bool allInitialized;
} subscribers;
};
@@ -207,7 +206,6 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
- receiver->subscribers.msgFqns = hashMap_create(NULL, NULL, NULL, NULL);
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
}
@@ -288,7 +286,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
}
}
hashMap_destroy(receiver->subscribers.map, false, false);
- hashMap_destroy(receiver->subscribers.msgFqns, false, true);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -454,11 +451,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
int updateReceiveCount = 0;
int updateSerError = 0;
- char* msgFqn = hashMap_get(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId);
- if (msgFqn == NULL) {
- msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
- hashMap_put(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId, msgFqn);
- }
+ const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
void *deserializedMsg = NULL;
bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
@@ -505,8 +498,9 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
} else {
- L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s', version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
msgFqn,
+ pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
(int)message->header.msgMajorVersion,
(int)message->header.msgMinorVersion,
pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
index a2f953b..3900f55 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
@@ -32,7 +32,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- pubsub_serializer_handler_t* serHandler,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *protocol);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 9996c53..8ae4489 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -52,7 +52,7 @@
struct pubsub_zmq_topic_sender {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
- pubsub_serializer_handler_t* serializationHandler;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
@@ -137,7 +137,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- pubsub_serializer_handler_t* serializationHandler,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *prot,
@@ -148,7 +148,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serializationHandler = serializationHandler;
+ sender->serializerHandler = serializerHandler;
sender->admin = admin;
sender->protocolSvcId = protocolSvcId;
sender->protocol = prot;
@@ -359,7 +359,7 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
}
const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
- return pubsub_serializerHandler_getSerializationType(sender->serializationHandler);
+ return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
}
long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) {
@@ -384,7 +384,7 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender) {
static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t *) handle;
- *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializationHandler, msgType);
+ *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
return 0;
}
@@ -513,9 +513,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t));
entry->protSer = sender->protocol;
entry->type = msgTypeId;
- entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializationHandler, msgTypeId);
- entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializationHandler, msgTypeId);
- entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializationHandler, msgTypeId);
+ entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
+ entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId);
+ entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId);
uuid_copy(entry->originUUID, sender->fwUUID);
celixThreadMutex_create(&entry->metrics.mutex, NULL);
hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
@@ -528,7 +528,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
}
size_t serializedOutputLen = 0;
struct iovec *serializedOutput = NULL;
- status = pubsub_serializerHandler_serialize(sender->serializationHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
+ status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
if (monitor) {
clock_gettime(CLOCK_REALTIME, &serializationEnd);
}
@@ -588,7 +588,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
zmq_msg_t msg4; // Footer
void *socket = zsock_resolve(sender->zmq.socket);
psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
- freeMsgEntry->serHandler = sender->serializationHandler;
+ freeMsgEntry->serHandler = sender->serializerHandler;
freeMsgEntry->msgId = msgTypeId;
freeMsgEntry->serializedOutput = serializedOutput;
freeMsgEntry->serializedOutputLen = serializedOutputLen;
@@ -665,7 +665,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
celix_properties_destroy(message.metadata.metadata);
}
if (!bound->parent->zeroCopyEnabled && serializedOutput) {
- pubsub_serializerHandler_freeSerializedMsg(sender->serializationHandler, msgTypeId, serializedOutput, serializedOutputLen);
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen);
}
if (sendOk) {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
index c2c0d57..584b88d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
@@ -32,7 +32,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- pubsub_serializer_handler_t* serializationHandler,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *prot,
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
index 550c85f..77a2fa0 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
@@ -111,7 +111,6 @@ TEST_F(PubSubSerializationHandlerTestSuite, SerializationServiceFound) {
EXPECT_EQ(42, pubsub_serializerHandler_getMsgId(handler, "example::Msg"));
auto *fqn = pubsub_serializerHandler_getMsgFqn(handler, 42);
EXPECT_STREQ("example::Msg", fqn);
- free(fqn);
EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 0));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
@@ -293,3 +292,25 @@ TEST_F(PubSubSerializationHandlerTestSuite, CreateHandlerFromMarker) {
celix_logHelper_destroy(logHelper);
}
+
+TEST_F(PubSubSerializationHandlerTestSuite, GetMsgInfo) {
+ auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
+ EXPECT_FALSE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+ EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+
+ long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+ EXPECT_TRUE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+ EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+ const char* msgFqn;
+ int major;
+ int minor;
+ EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, &msgFqn, &major, &minor));
+ EXPECT_STREQ("example::Msg1", msgFqn);
+ EXPECT_EQ(1, major);
+ EXPECT_EQ(0, minor);
+
+ celix_bundleContext_unregisterService(ctx.get(), svcId1);
+ pubsub_serializerHandler_destroy(handler);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
index 0519891..f2c58ac 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
@@ -138,10 +138,15 @@ celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_ha
bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* handler, uint32_t msgId, int majorVersion, int minorVersion);
/**
+ * @brief Whether the serializer handler has found 1 or more pubsub_message_serialization_service for the provided msg id.
+ */
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
* @brief Get msg fqn from a msg id.
- * @return msg fqn or NULL if msg id is not known.
+ * @return msg fqn or NULL if msg id is not known. msg fqn is valid as long as the handler exists.
*/
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
/**
* @brief Get a msg id from a msgFqn.
@@ -176,6 +181,24 @@ int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* han
*/
int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
+ * @brief Returns msg info (fqn, major version, minor version) in 1 call.
+ *
+ * @param handler The serializer handler
+ * @param msgId The msg id where to get the info for
+ * @param msgFqnOut If not NULL will be set to the msgFqn (valid as long as the serializer handler is valid)
+ * @param msgMajorVersionOut If not NULL will be set to the msg major version
+ * @param msgMinorVersionOut If not NULL will be set to the msg minor version
+ * @return CELIX_SUCCESS on success, CELIX_ILLEGAL_ARGUMENT if the message for the provided msg id cannot be found.
+ */
+celix_status_t pubsub_serializerHandler_getMsgInfo(
+ pubsub_serializer_handler_t* handler,
+ uint32_t msgId,
+ const char** msgFqnOut,
+ int* msgMajorVersionOut,
+ int* msgMinorVersionOut);
+
#ifdef __cplusplus
}
#endif
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index 7bfa5e0..418b4b5 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -48,6 +48,7 @@ typedef struct pubsub_serialization_service_entry {
struct pubsub_serializer_handler {
celix_bundle_context_t* ctx;
+ char* filter;
char* serType;
bool backwardCompatible;
long serializationSvcTrackerId;
@@ -55,6 +56,7 @@ struct pubsub_serializer_handler {
celix_thread_rwlock_t lock;
hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+ hash_map_t *msgFullyQualifiedNames; //key = msg id, value = msg fqn. Non destructive map with msg fqn
};
static void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
@@ -104,17 +106,6 @@ static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serializat
return compatible;
}
-static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
- //NOTE assumes mutex is locked
- const char *result = NULL;
- celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
- if (entries != NULL) {
- pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
- result = entry->msgFqn;
- }
- return result;
-}
-
pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
handler->ctx = ctx;
@@ -125,18 +116,18 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_contex
celixThreadRwlock_create(&handler->lock, NULL);
handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+ handler->msgFullyQualifiedNames = hashMap_create(NULL, NULL, NULL, NULL);
- char *filter = NULL;
- asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+ asprintf(&handler->filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
- opts.filter.filter = filter;
+ opts.filter.filter = handler->filter;
opts.callbackHandle = handler;
opts.addWithProperties = addSerializationService;
opts.removeWithProperties = removeSerializationService;
- handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- free(filter);
+ handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptionsAsync(ctx, &opts);
+
return handler;
}
@@ -186,26 +177,31 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(cel
return data.handler;
}
+static void pubsub_serializerHandler_destroyCallback(void* data) {
+ pubsub_serializer_handler_t* handler = data;
+ celixThreadRwlock_destroy(&handler->lock);
+ hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+ for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+ pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+ free(entry->msgFqn);
+ celix_version_destroy(entry->msgVersion);
+ free(entry);
+ }
+ celix_arrayList_destroy(entries);
+ }
+ hashMap_destroy(handler->serializationServices, false, false);
+ hashMap_destroy(handler->msgFullyQualifiedNames, false, true);
+ celix_logHelper_destroy(handler->logHelper);
+ free(handler->serType);
+ free(handler->filter);
+ free(handler);
+}
void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
if (handler != NULL) {
- celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
- celixThreadRwlock_destroy(&handler->lock);
- hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
- while (hashMapIterator_hasNext(&iter)) {
- celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
- for (int i = 0; i < celix_arrayList_size(entries); ++i) {
- pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
- free(entry->msgFqn);
- celix_version_destroy(entry->msgVersion);
- free(entry);
- }
- celix_arrayList_destroy(entries);
- }
- hashMap_destroy(handler->serializationServices, false, false);
- celix_logHelper_destroy(handler->logHelper);
- free(handler->serType);
- free(handler);
+ celix_bundleContext_stopTrackerAsync(handler->ctx, handler->serializationSvcTrackerId, handler, pubsub_serializerHandler_destroyCallback);
}
}
@@ -257,10 +253,16 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
celix_arrayList_add(entries, entry);
celix_arrayList_sort(entries, compareEntries);
- hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+ hashMap_put(handler->serializationServices, (void*)(uintptr_t)msgId, entries);
} else {
celix_version_destroy(msgVersion);
}
+
+ char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
+ if (fqn == NULL) {
+ hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, celix_utils_strdup(msgFqn));
+ }
+
celixThreadRwlock_unlock(&handler->lock);
}
@@ -375,9 +377,16 @@ bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* ha
return compatible;
}
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+ celixThreadRwlock_readLock(&handler->lock);
+ void* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ celixThreadRwlock_unlock(&handler->lock);
+ return entries != NULL;
+}
+
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
celixThreadRwlock_readLock(&handler->lock);
- char *msgFqn = celix_utils_strdup(getMsgFqn(handler, msgId));
+ char *msgFqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
celixThreadRwlock_unlock(&handler->lock);
return msgFqn;
}
@@ -434,4 +443,31 @@ size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializ
const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler) {
return handler->serType;
+}
+
+int pubsub_serializerHandler_getMsgInfo(
+ pubsub_serializer_handler_t* handler,
+ uint32_t msgId,
+ const char** msgFqnOut,
+ int* msgMajorVersionOut,
+ int* msgMinorVersionOut) {
+ int result = CELIX_SUCCESS;
+ celixThreadRwlock_readLock(&handler->lock);
+ celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ if (entries != NULL) {
+ pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+ if (msgFqnOut != NULL) {
+ *msgFqnOut = entry->msgFqn;
+ }
+ if (msgMinorVersionOut != NULL) {
+ *msgMajorVersionOut = celix_version_getMajor(entry->msgVersion);
+ }
+ if (msgMinorVersionOut != NULL) {
+ *msgMinorVersionOut = celix_version_getMinor(entry->msgVersion);
+ }
+ } else {
+ result = CELIX_ILLEGAL_ARGUMENT;
+ }
+ celixThreadRwlock_unlock(&handler->lock);
+ return result;
}
\ No newline at end of file