You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/30 16:31:12 UTC
[celix] 01/02: Refactors usage locking in pstm so that recursive
lock are not needed.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch bugfix/pstm_deadlock_work_outside_of_locks
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 2411b40ae226df154d1bb01106d462dfe3c31bf1
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Tue Jun 30 16:03:35 2020 +0200
Refactors usage locking in pstm so that recursive lock are not needed.
---
.../src/pubsub_topology_manager.c | 456 +++++++++++++--------
.../src/pubsub_topology_manager.h | 14 +-
2 files changed, 300 insertions(+), 170 deletions(-)
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index da1ebbf..dfdda22 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -60,22 +60,11 @@ celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, ce
manager->context = context;
- celix_thread_mutexattr_t psaAttr;
- celixThreadMutexAttr_create(&psaAttr);
- celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
- status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
- celixThreadMutexAttr_destroy(&psaAttr);
-
+ status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, NULL);
status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
-
- celix_thread_mutexattr_t attr;
- status |= celixThreadMutexAttr_create(&attr);
- status |= celixThreadMutexAttr_settype(&attr, CELIX_THREAD_MUTEX_RECURSIVE);
- status |= celixThreadMutex_create(&manager->topicSenders.mutex, &attr);
- status |= celixThreadMutexAttr_destroy(&attr);
-
+ status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
status |= celixThreadMutex_create(&manager->psaMetrics.mutex, NULL);
status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
@@ -190,7 +179,8 @@ void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_proper
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
- celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG, "PSTM: Added PSA");
+ const char* psaType = celix_properties_get(props, PUBSUB_ADMIN_SERVICE_TYPE, "!Error!");
+ celix_logHelper_debug(manager->loghelper, "Added %s PSA", psaType);
if (svcId >= 0) {
celixThreadMutex_lock(&manager->pubsubadmins.mutex);
@@ -200,14 +190,15 @@ void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_proper
//NOTE new psa, so no endpoints announce yet
- //new PSA -> every topic sender/receiver entry needs a rematch
+ //Because this is a new PSA every topic sender/receiver entry needs a rematch.
+ //This is needed because the new PSA can be a better match than currently used.
int needsRematchCount = 0;
celixThreadMutex_lock(&manager->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- entry->needsMatch = true;
+ entry->matching.needsMatch = true;
++needsRematchCount;
}
celixThreadMutex_unlock(&manager->topicSenders.mutex);
@@ -215,14 +206,14 @@ void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_proper
iter = hashMapIterator_construct(manager->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- entry->needsMatch = true;
+ entry->matching.needsMatch = true;
++needsRematchCount;
}
celixThreadMutex_unlock(&manager->topicReceivers.mutex);
if (needsRematchCount > 0) {
- celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_INFO,
- "A PSA is added after at least one active publisher/provided. \
+ celix_logHelper_info(manager->loghelper,
+ "A new PSA is added after at least one active publisher/provided. \
It is preferred that all PSA are started before publiser/subscriber are started!\n\
Current topic/sender count is %i", needsRematchCount);
}
@@ -233,6 +224,8 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
pubsub_topology_manager_t *manager = handle;
//pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+ const char* psaType = celix_properties_get(props, PUBSUB_ADMIN_SERVICE_TYPE, "!Error!");
+ celix_logHelper_debug(manager->loghelper, "Removing %s PSA", psaType);
// Remove the svcId from the hashmap, because the service is not available
celixThreadMutex_lock(&manager->pubsubadmins.mutex);
@@ -254,30 +247,21 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
//de-setup all topic receivers/senders for the removed psa.
//the next psaHandling run will try to find new psa.
+ celix_array_list_t* revokedEndpoints = celix_arrayList_create();
celixThreadMutex_lock(&manager->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry->selectedPsaSvcId == svcId) {
- /* de-announce all senders */
+ if (entry->matching.selectedPsaSvcId == svcId) {
if (entry->endpoint != NULL) {
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
- pubsub_announce_endpoint_listener_t *listener;
- listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
- listener->revokeEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celix_arrayList_add(revokedEndpoints, entry->endpoint);
}
- entry->needsMatch = true;
- entry->selectedSerializerSvcId = -1L;
- entry->selectedProtocolSvcId = -1L;
- entry->selectedPsaSvcId = -1L;
- if (entry->endpoint != NULL) {
- celix_properties_destroy(entry->endpoint);
- entry->endpoint = NULL;
- }
+ entry->matching.needsMatch = true;
+ entry->matching.selectedSerializerSvcId = -1L;
+ entry->matching.selectedProtocolSvcId = -1L;
+ entry->matching.selectedPsaSvcId = -1L;
+ entry->endpoint = NULL;
}
}
celixThreadMutex_unlock(&manager->topicSenders.mutex);
@@ -286,32 +270,33 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
iter = hashMapIterator_construct(manager->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry->selectedPsaSvcId == svcId) {
- /* de-announce all receivers */
+ if (entry->matching.selectedPsaSvcId == svcId) {
if (entry->endpoint != NULL) {
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
- pubsub_announce_endpoint_listener_t *listener;
- listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
- listener->revokeEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celix_arrayList_add(revokedEndpoints, entry->endpoint);
}
- entry->needsMatch = true;
- entry->selectedSerializerSvcId = -1L;
- entry->selectedProtocolSvcId = -1L;
- entry->selectedPsaSvcId = -1L;
- if (entry->endpoint != NULL) {
- celix_properties_destroy(entry->endpoint);
- entry->endpoint = NULL;
- }
+ entry->matching.needsMatch = true;
+ entry->matching.selectedSerializerSvcId = -1L;
+ entry->matching.selectedProtocolSvcId = -1L;
+ entry->matching.selectedPsaSvcId = -1L;
+ entry->endpoint = NULL;
}
}
celixThreadMutex_unlock(&manager->topicReceivers.mutex);
-
- celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG, "PSTM: Removed PSA");
+ /* de-announce all senders & receiver endpoints */
+ for (int i = 0; i < celix_arrayList_size(revokedEndpoints); ++i) {
+ celix_properties_t* endpoint = celix_arrayList_get(revokedEndpoints, i);
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+ listener->revokeEndpoint(listener->handle, endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celix_properties_destroy(endpoint);
+ }
+ celix_arrayList_destroy(revokedEndpoints);
}
void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
@@ -325,11 +310,12 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (topic == NULL) {
- celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_WARNING,
- "[PSTM] Warning found subscriber service without mandatory '%s' property.",
+ celix_logHelper_warning(manager->loghelper,
+ "Warning found subscriber service without mandatory '%s' property.",
PUBSUB_SUBSCRIBER_TOPIC);
return;
}
+ celix_logHelper_trace(manager->loghelper, "Adding subscriber %s/%s", scope, topic);
long bndId = celix_bundle_getId(bnd);
char *scopeAndTopicKey = NULL;
@@ -346,12 +332,13 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
entry->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
entry->topic = strndup(topic, 1024 * 1024);
entry->usageCount = 1;
- entry->selectedPsaSvcId = -1L;
- entry->selectedSerializerSvcId = -1L;
- entry->needsMatch = true;
+ entry->matching.selectedPsaSvcId = -1L;
+ entry->matching.selectedSerializerSvcId = -1L;
+ entry->matching.needsMatch = true;
entry->bndId = bndId;
entry->subscriberProperties = celix_properties_copy(props);
hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
+ celix_logHelper_trace(manager->loghelper, "Created new topic receiver entry %s", entry->scopeAndTopicKey);
}
//signal psa handling thread
bool triggerCondition = (entry->usageCount == 1);
@@ -376,6 +363,7 @@ void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribut
if (topic == NULL) {
return;
}
+ celix_logHelper_trace(manager->loghelper, "Removing subscriber %s/%s", scope, topic);
char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&manager->topicReceivers.mutex);
@@ -393,15 +381,22 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, vo
pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle;
pubsub_announce_endpoint_listener_t *listener = svc;
+ celix_logHelper_trace(manager->loghelper, "Added EndpointListener");
+
+
//1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
//2) Add listener to manager->announceEndpointListeners
+ celix_array_list_t* announceEndpoints = celix_arrayList_create();
+
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+
celixThreadMutex_lock(&manager->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL && entry->endpoint != NULL) {
- listener->announceEndpoint(listener->handle, entry->endpoint);
+ celix_arrayList_add(announceEndpoints, celix_properties_copy(entry->endpoint));
}
}
celixThreadMutex_unlock(&manager->topicSenders.mutex);
@@ -411,14 +406,20 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, vo
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL && entry->endpoint != NULL) {
- listener->announceEndpoint(listener->handle, entry->endpoint);
+ celix_arrayList_add(announceEndpoints, celix_properties_copy(entry->endpoint));
}
}
celixThreadMutex_unlock(&manager->topicReceivers.mutex);
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
celix_arrayList_add(manager->announceEndpointListeners.list, listener);
celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+ for (int i = 0; i < celix_arrayList_size(announceEndpoints); ++i) {
+ celix_properties_t* endpoint = celix_arrayList_get(announceEndpoints, i);
+ listener->announceEndpoint(listener->handle, endpoint);
+ celix_properties_destroy(endpoint);
+ }
+ celix_arrayList_destroy(announceEndpoints);
}
@@ -426,8 +427,7 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void *handle,
pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle;
pubsub_announce_endpoint_listener_t *listener = svc;
- //1) Remove listener from manager->announceEndpointListeners
- //2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints)
+ celix_logHelper_trace(manager->loghelper, "Removing EndpointListener");
celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
@@ -456,6 +456,8 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
PUBSUB_SUBSCRIBER_TOPIC);
return;
}
+ celix_logHelper_trace(manager->loghelper, "Adding new request for publisher %s/%s", scope, topic);
+
scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&manager->topicSenders.mutex);
@@ -470,16 +472,17 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
} else {
entry = calloc(1, sizeof(*entry));
entry->usageCount = 1;
- entry->selectedSerializerSvcId = -1L;
- entry->selectedProtocolSvcId = -1L;
- entry->selectedPsaSvcId = -1L;
+ entry->matching.needsMatch = true;
+ entry->matching.selectedSerializerSvcId = -1L;
+ entry->matching.selectedProtocolSvcId = -1L;
+ entry->matching.selectedPsaSvcId = -1L;
entry->scope = scope; //taking ownership
entry->topic = topic; //taking ownership
entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership
- entry->needsMatch = true;
entry->publisherFilter = celix_filter_create(info->filter->filterStr);
entry->bndId = info->bundleId;
hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
+ celix_logHelper_trace(manager->loghelper, "Created new topic sender entry %s", entry->scopeAndTopicKey);
}
//new entry -> wakeup psaHandling thread
bool triggerCondition = (entry->usageCount == 1);
@@ -510,6 +513,7 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
}
return;
}
+ celix_logHelper_trace(manager->loghelper, "Removing request for publisher %s/%s", scope, topic);
char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
@@ -540,8 +544,8 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
bool triggerCondition = false;
if (manager->verbose) {
- celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG,
- "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+ celix_logHelper_trace(manager->loghelper,
+ "Adding discovered endpoint for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"),
celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
@@ -562,6 +566,7 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet
hashMap_put(manager->discoveredEndpoints.map, (void *) entry->uuid, entry);
+ celix_logHelper_trace(manager->loghelper, "Created new discovered endpoint entry %s", uuid);
//waking up psa handling thread to select psa
triggerCondition = true;
@@ -595,8 +600,8 @@ celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, con
assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
if (manager->verbose) {
- celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG,
- "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+ celix_logHelper_trace(manager->loghelper,
+ "Removing discovered endpoint for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "(null)"),
celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"),
celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "(null)"),
@@ -625,6 +630,7 @@ celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, con
} else {
celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG, "No selected psa for endpoint %s\n", entry->uuid);
}
+ celix_logHelper_trace(manager->loghelper, "Destroying discovered endpoint entry %s", uuid);
celix_properties_destroy(entry->endpoint);
free(entry);
}
@@ -633,39 +639,42 @@ celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, con
return CELIX_SUCCESS;
}
+struct pstm_teardown_entry {
+ char* scope;
+ char* topic;
+ long psaSvcId;
+};
static void pstm_teardownTopicSenderCallback(void *handle, void *svc) {
- pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ struct pstm_teardown_entry* entry = handle;
pubsub_admin_service_t *psa = svc;
psa->teardownTopicSender(psa->handle, entry->scope, entry->topic);
}
+//Note called on pstm update thread
static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
+ celix_array_list_t* revokeEndpoints = celix_arrayList_create();
+ celix_array_list_t* teardownEntries = celix_arrayList_create();
+
celixThreadMutex_lock(&manager->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) {
+ if (entry != NULL && (entry->usageCount <= 0 || entry->matching.needsMatch)) {
if (manager->verbose && entry->endpoint != NULL) {
celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG,
- "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
+ "Tearing down TopicSender for scope/topic %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
}
-
if (entry->endpoint != NULL) {
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener;
- listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
- listener->revokeEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
- PUBSUB_ADMIN_SERVICE_NAME,
- entry, pstm_teardownTopicSenderCallback);
+ celix_arrayList_add(revokeEndpoints, celix_properties_copy(entry->endpoint));
+ struct pstm_teardown_entry* teardownEntry = malloc(sizeof(*teardownEntry));
+ teardownEntry->scope = celix_utils_strdup(entry->scope);
+ teardownEntry->topic = celix_utils_strdup(entry->topic);
+ teardownEntry->psaSvcId = entry->matching.selectedPsaSvcId;
+ celix_arrayList_add(teardownEntries, teardownEntry);
}
-
//cleanup entry
if (entry->usageCount <= 0) {
//no usage -> remove
@@ -691,46 +700,71 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
celix_properties_destroy(entry->endpoint);
}
entry->endpoint = NULL;
- entry->selectedPsaSvcId = -1L;
- entry->selectedSerializerSvcId = -1L;
- entry->selectedProtocolSvcId = -1L;
+ entry->matching.selectedPsaSvcId = -1L;
+ entry->matching.selectedSerializerSvcId = -1L;
+ entry->matching.selectedProtocolSvcId = -1L;
}
}
}
celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+
+
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
+ celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
+ listener->revokeEndpoint(listener->handle, endpoint);
+ celix_properties_destroy(endpoint);
+ }
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celix_arrayList_destroy(revokeEndpoints);
+
+ for (int i = 0; i < celix_arrayList_size(teardownEntries); ++i) {
+ struct pstm_teardown_entry* entry = celix_arrayList_get(teardownEntries, i);
+ celix_bundleContext_useServiceWithId(manager->context, entry->psaSvcId,
+ PUBSUB_ADMIN_SERVICE_NAME,
+ entry, pstm_teardownTopicSenderCallback);
+ free(entry->scope);
+ free(entry->topic);
+ free(entry);
+ }
+ celix_arrayList_destroy(teardownEntries);
}
static void pstm_teardownTopicReceiverCallback(void *handle, void *svc) {
- pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ struct pstm_teardown_entry* entry = handle;
pubsub_admin_service_t *psa = svc;
psa->teardownTopicReceiver(psa->handle, entry->scope, entry->topic);
}
static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
+ celix_array_list_t* revokeEndpoints = celix_arrayList_create();
+ celix_array_list_t* teardownEntries = celix_arrayList_create();
+
celixThreadMutex_lock(&manager->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) {
+ if (entry != NULL && (entry->usageCount <= 0 || entry->matching.needsMatch)) {
if (manager->verbose && entry->endpoint != NULL) {
const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG,
- "[PSTM] Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n",
+ "Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n",
entry->scope == NULL ? "(null)" : entry->scope, entry->topic, adminType, serType);
}
if (entry->endpoint != NULL) {
- celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
- PUBSUB_ADMIN_SERVICE_NAME, entry,
- pstm_teardownTopicReceiverCallback);
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
- manager->announceEndpointListeners.list, i);
- listener->revokeEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celix_arrayList_add(revokeEndpoints, celix_properties_copy(entry->endpoint));
+ struct pstm_teardown_entry* teardownEntry = malloc(sizeof(*teardownEntry));
+ teardownEntry->scope = celix_utils_strdup(entry->scope);
+ teardownEntry->topic = celix_utils_strdup(entry->topic);
+ teardownEntry->psaSvcId = entry->matching.selectedPsaSvcId;
+ celix_arrayList_add(teardownEntries, teardownEntry);
}
if (entry->usageCount <= 0) {
@@ -758,13 +792,37 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
celix_properties_destroy(entry->endpoint);
}
entry->endpoint = NULL;
- entry->selectedPsaSvcId = -1L;
- entry->selectedSerializerSvcId = -1L;
- entry->selectedProtocolSvcId = -1L;
+ entry->matching.selectedPsaSvcId = -1L;
+ entry->matching.selectedSerializerSvcId = -1L;
+ entry->matching.selectedProtocolSvcId = -1L;
}
}
}
celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
+ celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
+ listener->revokeEndpoint(listener->handle, endpoint);
+ celix_properties_destroy(endpoint);
+ }
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celix_arrayList_destroy(revokeEndpoints);
+
+ for (int i = 0; i < celix_arrayList_size(teardownEntries); ++i) {
+ struct pstm_teardown_entry* entry = celix_arrayList_get(teardownEntries, i);
+ celix_bundleContext_useServiceWithId(manager->context, entry->psaSvcId,
+ PUBSUB_ADMIN_SERVICE_NAME,
+ entry, pstm_teardownTopicReceiverCallback);
+ free(entry->scope);
+ free(entry->topic);
+ free(entry);
+ }
+ celix_arrayList_destroy(teardownEntries);
}
static void pstm_addEndpointCallback(void *handle, void *svc) {
@@ -788,6 +846,7 @@ static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
long svcId = (long) hashMapEntry_getKey(mapEntry);
bool match = false;
+ //NOTE assuming match is safe to call within a lock
psa->matchDiscoveredEndpoint(psa->handle, entry->endpoint, &match);
if (match) {
psaSvcId = svcId;
@@ -797,6 +856,7 @@ static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
if (psaSvcId >= 0) {
+ //NOTE assuming adding discovered endpoint is safe to call within a lock
celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
(void *) entry->endpoint, pstm_addEndpointCallback);
} else {
@@ -809,18 +869,31 @@ static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
}
+struct pstm_setup_entry {
+ char* scope;
+ char* topic;
+ char* key;
+ celix_properties_t* topicProperties;
+ long selectedSerializerSvcId;
+ long selectedProtocolSvcId;
+ long psaSvcId;
+ celix_properties_t* endpointResult;
+};
+
static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
- pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ struct pstm_setup_entry *entry = handle;
pubsub_admin_service_t *psa = svc;
- psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpoint);
+ psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpointResult);
}
static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
+ celix_array_list_t* setupEntries = celix_arrayList_create();
+
celixThreadMutex_lock(&manager->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL && entry->needsMatch && entry->usageCount > 0) {
+ if (entry != NULL && entry->matching.needsMatch && entry->usageCount > 0) {
//new topic sender needed, requesting match with current psa
double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
long serializerSvcId = -1L;
@@ -838,7 +911,9 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
long serSvcId = -1L;
long protSvcId = -1L;
celix_properties_t *topicProps = NULL;
- psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId, &protSvcId);
+ //NOTE assuming matchPublisher is safe to call within lock
+ psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId,
+ &protSvcId);
if (score > highestScore) {
if (topicPropertiesForHighestMatch != NULL) {
celix_properties_destroy(topicPropertiesForHighestMatch);
@@ -855,45 +930,73 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
- entry->selectedPsaSvcId = selectedPsaSvcId;
- entry->selectedSerializerSvcId = serializerSvcId;
- entry->selectedProtocolSvcId = protocolSvcId;
- entry->topicProperties = topicPropertiesForHighestMatch;
- bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
-
- if (called && entry->endpoint != NULL) {
- entry->needsMatch = false;
-
- //announce new endpoint through the network
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
- listener->announceEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- }
+ entry->matching.needsMatch = false;
+ //NOTE the needsMatch can be updated as soon as the lock manager->topicSenders.mutex is released.
+ //This is ok, because the next pstm update loop will then do a new teardown/setup
+
+ struct pstm_setup_entry *setupEntry = malloc(sizeof(*setupEntry));
+ setupEntry->endpointResult = NULL;
+ setupEntry->scope = celix_utils_strdup(entry->scope);
+ setupEntry->topic = celix_utils_strdup(entry->topic);
+ setupEntry->key = pubsubEndpoint_createScopeTopicKey(entry->scope, entry->topic);
+ setupEntry->topicProperties = topicPropertiesForHighestMatch;
+ setupEntry->psaSvcId = selectedPsaSvcId;
+ setupEntry->selectedSerializerSvcId = serializerSvcId;
+ setupEntry->selectedProtocolSvcId = protocolSvcId;
+ celix_arrayList_add(setupEntries, setupEntry);
+ }
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+ for (int i = 0; i < celix_arrayList_size(setupEntries); ++i) {
+ struct pstm_setup_entry* setupEntry = celix_arrayList_get(setupEntries, i);
+ bool called = celix_bundleContext_useServiceWithId(manager->context, setupEntry->psaSvcId, PUBSUB_ADMIN_SERVICE_NAME, setupEntry, pstm_setupTopicSenderCallback);
+ if (called && setupEntry->endpointResult != NULL) {
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int k = 0; k < celix_arrayList_size(manager->announceEndpointListeners.list); ++k) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->announceEndpoint(listener->handle, setupEntry->endpointResult);
}
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- if (entry->needsMatch) {
- celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ pstm_topic_receiver_or_sender_entry_t* entry = hashMap_get(manager->topicSenders.map, setupEntry->key);
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
}
+ entry->endpoint = setupEntry->endpointResult;
+ entry->topicProperties = setupEntry->topicProperties;
+ entry->matching.selectedPsaSvcId = setupEntry->psaSvcId;
+ entry->matching.selectedSerializerSvcId = setupEntry->selectedSerializerSvcId;
+ entry->matching.selectedProtocolSvcId = setupEntry->selectedProtocolSvcId;
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ } else {
+ celix_logHelper_warning(manager->loghelper, "Cannot setup TopicSender for %s/%s\n", setupEntry->scope == NULL ? "(null)" : setupEntry->scope, setupEntry->topic);
}
+ free(setupEntry->scope);
+ free(setupEntry->topic);
+ free(setupEntry->key);
+ free(setupEntry);
}
- celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+ celix_arrayList_destroy(setupEntries);
}
static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
- pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ struct pstm_setup_entry *entry = handle;
pubsub_admin_service_t *psa = svc;
- psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpoint);
+ psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpointResult);
}
static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
+ celix_array_list_t* setupEntries = celix_arrayList_create();
+
celixThreadMutex_lock(&manager->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL && entry->needsMatch && entry->usageCount > 0) {
+ if (entry != NULL && entry->matching.needsMatch && entry->usageCount > 0) {
double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
long serializerSvcId = -1L;
@@ -912,7 +1015,8 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
long protSvcId = -1L;
celix_properties_t *topicProps = NULL;
- psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score, &serSvcId, &protSvcId);
+ psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score,
+ &serSvcId, &protSvcId);
if (score > highestScore) {
if (highestMatchTopicProperties != NULL) {
celix_properties_destroy(highestMatchTopicProperties);
@@ -929,35 +1033,59 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
- entry->selectedPsaSvcId = selectedPsaSvcId;
- entry->selectedSerializerSvcId = serializerSvcId;
- entry->selectedProtocolSvcId = protocolSvcId;
- entry->topicProperties = highestMatchTopicProperties;
-
- bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
- entry,
- pstm_setupTopicReceiverCallback);
-
- if (called && entry->endpoint != NULL) {
- entry->needsMatch = false;
- //announce new endpoint through the network
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
- manager->announceEndpointListeners.list, i);
- listener->announceEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- }
+ entry->matching.needsMatch = false;
+ //NOTE the needsMatch can be updated as soon as the lock manager->topicSenders.mutex is released.
+ //This is ok, because the next pstm update loop will then do a new teardown/setup
+
+ struct pstm_setup_entry *setupEntry = malloc(sizeof(*setupEntry));
+ setupEntry->endpointResult = NULL;
+ setupEntry->scope = celix_utils_strdup(entry->scope);
+ setupEntry->topic = celix_utils_strdup(entry->topic);
+ setupEntry->key = pubsubEndpoint_createScopeTopicKey(entry->scope, entry->topic);
+ setupEntry->topicProperties = highestMatchTopicProperties;
+ setupEntry->psaSvcId = selectedPsaSvcId;
+ setupEntry->selectedSerializerSvcId = serializerSvcId;
+ setupEntry->selectedProtocolSvcId = protocolSvcId;
+ celix_arrayList_add(setupEntries, setupEntry);
}
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
- if (entry->needsMatch) {
- celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
+ for (int i = 0; i < celix_arrayList_size(setupEntries); ++i) {
+ struct pstm_setup_entry* setupEntry = celix_arrayList_get(setupEntries, i);
+ bool called = celix_bundleContext_useServiceWithId(manager->context, setupEntry->psaSvcId, PUBSUB_ADMIN_SERVICE_NAME, setupEntry, pstm_setupTopicReceiverCallback);
+ if (called && setupEntry->endpointResult != NULL) {
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int k = 0; k < celix_arrayList_size(manager->announceEndpointListeners.list); ++k) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->announceEndpoint(listener->handle, setupEntry->endpointResult);
}
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ pstm_topic_receiver_or_sender_entry_t* entry = hashMap_get(manager->topicReceivers.map, setupEntry->key);
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ }
+ entry->endpoint = setupEntry->endpointResult;
+ entry->topicProperties = setupEntry->topicProperties;
+ entry->matching.selectedPsaSvcId = setupEntry->psaSvcId;
+ entry->matching.selectedSerializerSvcId = setupEntry->selectedSerializerSvcId;
+ entry->matching.selectedProtocolSvcId = setupEntry->selectedProtocolSvcId;
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ } else {
+ celix_logHelper_warning(manager->loghelper, "Cannot setup TopicReceiver for %s/%s\n", setupEntry->scope == NULL ? "(null)" : setupEntry->scope, setupEntry->topic);
}
+ free(setupEntry->scope);
+ free(setupEntry->topic);
+ free(setupEntry->key);
+ free(setupEntry);
}
- celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+ celix_arrayList_destroy(setupEntries);
}
static void *pstm_psaHandlingThread(void *data) {
@@ -1058,9 +1186,9 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
fprintf(os, " |- serializer = %s\n", serType);
fprintf(os, " |- protocol = %s\n", protType);
if (manager->verbose) {
- fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
- fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
- fprintf(os, " |- prot svc id = %li\n", entry->selectedProtocolSvcId);
+ fprintf(os, " |- psa svc id = %li\n", entry->matching.selectedPsaSvcId);
+ fprintf(os, " |- ser svc id = %li\n", entry->matching.selectedSerializerSvcId);
+ fprintf(os, " |- prot svc id = %li\n", entry->matching.selectedProtocolSvcId);
fprintf(os, " |- usage count = %i\n", entry->usageCount);
}
}
@@ -1088,9 +1216,9 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
fprintf(os, " |- serializer = %s\n", serType);
fprintf(os, " |- protocol = %s\n", protType);
if (manager->verbose) {
- fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
- fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
- fprintf(os, " |- prot svc id = %li\n", entry->selectedProtocolSvcId);
+ fprintf(os, " |- psa svc id = %li\n", entry->matching.selectedPsaSvcId);
+ fprintf(os, " |- ser svc id = %li\n", entry->matching.selectedSerializerSvcId);
+ fprintf(os, " |- prot svc id = %li\n", entry->matching.selectedProtocolSvcId);
fprintf(os, " |- usage count = %i\n", entry->usageCount);
}
}
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 853811b..9f5ef49 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -87,16 +87,11 @@ typedef struct pstm_discovered_endpoint_entry {
} pstm_discovered_endpoint_entry_t;
typedef struct pstm_topic_receiver_or_sender_entry {
- bool needsMatch; //true if a psa needs to be selected or if a new psa has to be considered.
-
char *scopeAndTopicKey; //key of the combined value of the scope and topic
celix_properties_t *endpoint;
char *topic;
char *scope;
- int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
- long selectedPsaSvcId;
- long selectedSerializerSvcId;
- long selectedProtocolSvcId;
+ int usageCount; //nr of provided subscriber services for the topic receiver (matching scope & topic) or nr of publisher requested for matching scope & topic.
long bndId;
celix_properties_t *topicProperties; //found in META-INF/(pub|sub)/(topic).properties
@@ -105,6 +100,13 @@ typedef struct pstm_topic_receiver_or_sender_entry {
//for receiver entry
celix_properties_t *subscriberProperties;
+
+ struct {
+ bool needsMatch; //true if a psa needs to be selected or if a new psa has to be considered.
+ long selectedPsaSvcId;
+ long selectedSerializerSvcId;
+ long selectedProtocolSvcId;
+ } matching;
} pstm_topic_receiver_or_sender_entry_t;
celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, celix_log_helper_t *logHelper, pubsub_topology_manager_t **manager);