You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rl...@apache.org on 2019/09/29 16:06:46 UTC

[celix] branch develop updated: Fix for possible deadlock of pubsub topology manager (#117)

This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2d6a777  Fix for possible deadlock of pubsub topology manager (#117)
2d6a777 is described below

commit 2d6a777ba33b4e808d28b094446bb0857755ed3d
Author: dhbfischer <52...@users.noreply.github.com>
AuthorDate: Sun Sep 29 18:06:40 2019 +0200

    Fix for possible deadlock of pubsub topology manager (#117)
    
    Based on similar fix done in INAETICS pubsub for Java:
    https://github.com/INAETICS/pub-sub-admin-java/commit/313076e752ab5248bb5575f2b3c1716280a8e313
---
 .../src/pubsub_topology_manager.c                     | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index dc11806..b6bd10a 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -320,7 +320,6 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
         return;
     }
 
-    bool triggerCondition = false;
     long bndId = celix_bundle_getId(bnd);
     char *scopeAndTopicKey = NULL;
     scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
@@ -342,10 +341,9 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
         entry->bndId = bndId;
         entry->subscriberProperties = celix_properties_copy(props);
         hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
-
-        //signal psa handling thread
-        triggerCondition = true;
     }
+    //signal psa handling thread
+    bool triggerCondition = (entry->usageCount == 1);
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 
     if (triggerCondition) {
@@ -372,7 +370,7 @@ void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribut
     celixThreadMutex_lock(&manager->topicReceivers.mutex);
     pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
     if (entry != NULL) {
-        entry->usageCount -= 0;
+        entry->usageCount -= 1;
     }
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
     free(scopeAndTopicKey);
@@ -434,7 +432,6 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
     //3) signal psaHandling thread to find a psa and setup TopicSender
 
 
-    bool triggerCondition = false;
     char *topicFromFilter = NULL;
     char *scopeFromFilter = NULL;
     pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter);
@@ -469,10 +466,10 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
         entry->publisherFilter = celix_filter_create(info->filter->filterStr);
         entry->bndId = info->bundleId;
         hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
-
-        //new entry -> wakeup psaHandling thread
-        triggerCondition = true;
     }
+    //new entry -> wakeup psaHandling thread
+    bool triggerCondition = (entry->usageCount == 1);
+
     celixThreadMutex_unlock(&manager->topicSenders.mutex);
 
     if (triggerCondition) {
@@ -800,7 +797,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL && entry->needsMatch) {
+        if (entry != NULL && entry->needsMatch && entry->usageCount > 0) {
             //new topic sender needed, requesting match with current psa
             double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
             long serializerSvcId = -1L;
@@ -869,7 +866,7 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry->needsMatch) {
+        if (entry != NULL && entry->needsMatch && entry->usageCount > 0) {
 
             double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
             long serializerSvcId = -1L;