You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/31 20:37:43 UTC

celix git commit: CELIX-454: Update match algorithm in the pubsub topology manager, so that rematches (teardown followed by setup) are done if new psa are added.

Repository: celix
Updated Branches:
  refs/heads/develop 33a93db0a -> 68f69f893


CELIX-454: Update match algorithm in the pubsub topology manager, so that rematches (teardown followed by setup) are done if new psa are added.

Also removes the ignore of endpoints for own framework, these also need to be processed.


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/68f69f89
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/68f69f89
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/68f69f89

Branch: refs/heads/develop
Commit: 68f69f893d4572731470a6fdea1e37ae43459e0a
Parents: 33a93db
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Wed Oct 31 21:35:32 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Wed Oct 31 21:35:32 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_discovery_impl.c                 |  31 +++---
 .../pubsub_spi/include/pubsub_constants.h       |  15 +++
 .../pubsub/pubsub_spi/include/pubsub_endpoint.h |   6 +-
 .../src/pubsub_topology_manager.c               | 103 ++++++++++++-------
 .../src/pubsub_topology_manager.h               |   3 +-
 5 files changed, 96 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/68f69f89/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 9f3e89d..de708ff 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -395,7 +395,9 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
     const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
     const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
 
-    if (valid) {
+    const char *visibility = celix_properties_get(endpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_VISIBILITY_DEFAULT);
+
+    if (valid && strncmp(visibility, PUBSUB_ENDPOINT_SYSTEM_VISIBLITY, strlen(PUBSUB_ENDPOINT_SYSTEM_VISIBLITY)) == 0) {
         pubsub_announce_entry_t *entry = calloc(1, sizeof(*entry));
         clock_gettime(CLOCK_MONOTONIC, &entry->createTime);
         entry->isSet = false;
@@ -410,8 +412,12 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
         celixThreadMutex_lock(&disc->waitMutex);
         celixThreadCondition_broadcast(&disc->waitCond);
         celixThreadMutex_unlock(&disc->waitMutex);
-    } else {
-        printf("[PSD] Error cannot announce endpoint. missing some mandatory properties\n");
+    } else if (valid) {
+        L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBLITY, visibility);
+    }
+
+    if (!valid) {
+        L_ERROR("[PSD] Error cannot announce endpoint. missing some mandatory properties\n");
     }
 
     return status;
@@ -446,17 +452,6 @@ celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_propert
 
 static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, celix_properties_t *endpoint) {
     const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
-    const char *fwUUID = celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL);
-
-    //note endpoint should already be check to be valid pubsubEndpoint_isValid
-    assert(uuid != NULL);
-    assert(fwUUID != NULL);
-
-    if (fwUUID != NULL && strncmp(disc->fwUUID, fwUUID, strlen(disc->fwUUID)) == 0) {
-        //own endpoint -> ignore
-        celix_properties_destroy(endpoint);
-        return;
-    }
 
     celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
     bool exists = hashMap_containsKey(disc->discoveredEndpoints, (void*)uuid);
@@ -465,7 +460,6 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel
 
     if (!exists) {
         if (disc->verbose) {
-            const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
             const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
             const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
             const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
@@ -492,20 +486,19 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
     celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
 
     if (endpoint == NULL) {
-        //NOTE assuming this was a endpoint from this framework -> ignore
+        L_WARN("Cannot find endpoint with uuid %s\n", uuid);
         return;
     }
 
     if (disc->verbose) {
-        const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
         const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
         const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
         const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
-        printf("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s.\n",
+        L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s.\n",
                uuid, type, admin, ser);
     }
 
-    if (exists && endpoint != NULL) {
+    if (exists) {
         celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
         hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpointsListeners);
         while (hashMapIterator_hasNext(&iter)) {

http://git-wip-us.apache.org/repos/asf/celix/blob/68f69f89/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
index be47318..485391d 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -23,4 +23,19 @@
 #define PUBSUB_ADMIN_TYPE_KEY	   "pubsub.config"
 #define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer.type"
 
+/**
+ * Endpoints with the system visibility should be discoverable through the complete system
+ */
+#define PUBSUB_ENDPOINT_SYSTEM_VISIBLITY     "system"
+
+/**
+ * Endpoints with the system visibility are discoverable for a single host (i.e. IPC)
+ */
+#define PUBSUB_ENDPOINT_HOST_VISIBLITY       "host"
+
+/**
+ * Endpoints which are only visible within a single process
+ */
+#define PUBSUB_ENDPOINT_LOCAL_VISIBLITY      "local"
+
 #endif /* PUBSUB_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/68f69f89/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index b03109d..2d6d9be 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -35,12 +35,14 @@
 #define PUBSUB_ENDPOINT_UUID            "pubsub.endpoint.uuid" //required
 #define PUBSUB_ENDPOINT_FRAMEWORK_UUID  "pubsub.framework.uuid" //required
 #define PUBSUB_ENDPOINT_TYPE            "pubsub.endpoint.type" //PUBSUB_PUBLISHER_ENDPOINT_TYPE or PUBSUB_SUBSCRIBER_ENDPOINT_TYPE
+#define PUBSUB_ENDPOINT_VISIBILITY      "pubsub.endpoint.visiblity" //local, host or system. e.g. for IPC host
 #define PUBSUB_ENDPOINT_ADMIN_TYPE       PUBSUB_ADMIN_TYPE_KEY
 #define PUBSUB_ENDPOINT_SERIALIZER       PUBSUB_SERIALIZER_TYPE_KEY
 
 
-#define PUBSUB_PUBLISHER_ENDPOINT_TYPE  "pubsub.publisher"
-#define PUBSUB_SUBSCRIBER_ENDPOINT_TYPE "pubsub.subscriber"
+#define PUBSUB_PUBLISHER_ENDPOINT_TYPE      "pubsub.publisher"
+#define PUBSUB_SUBSCRIBER_ENDPOINT_TYPE     "pubsub.subscriber"
+#define PUBSUB_ENDPOINT_VISIBILITY_DEFAULT  PUBSUB_ENDPOINT_SYSTEM_VISIBLITY
 
 
 celix_properties_t* pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, const char* pubsubType, const char* adminType, const char *serType, celix_properties_t *topic_props);

http://git-wip-us.apache.org/repos/asf/celix/blob/68f69f89/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index df7b005..bfd038b 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -206,7 +206,7 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
 				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
 			}
 
-			entry->setup = false;
+			entry->needsMatch = true;
 	        entry->selectedSerializerSvcId = -1L;
 	        entry->selectedPsaSvcId = -1L;
 	        if (entry->endpoint != NULL) {
@@ -233,7 +233,7 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
 				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
 			}
 
-            entry->setup = false;
+            entry->needsMatch = true;
             entry->selectedSerializerSvcId = -1L;
             entry->selectedPsaSvcId = -1L;
             if (entry->endpoint != NULL) {
@@ -283,7 +283,7 @@ void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute
 		entry->usageCount = 1;
 		entry->selectedPsaSvcId = -1L;
 		entry->selectedSerializerSvcId = -1L;
-		entry->setup = false;
+		entry->needsMatch = true;
 		entry->bndId = bndId;
 		entry->subscriberProperties = celix_properties_copy(props);
 		hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
@@ -403,7 +403,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
 		entry->scope = scope; //taking ownership
 		entry->topic = topic; //taking ownership
 		entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership
-		entry->setup = false;
+		entry->needsMatch = true;
 		entry->publisherFilter = celix_filter_create(info->filter->filterStr);
 		entry->bndId = info->bundleId;
 		hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
@@ -556,9 +556,8 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
 
-        if (entry != NULL && entry->usageCount <= 0) {
-            hashMapIterator_remove(&iter);
-            if (manager->verbose) {
+        if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) {
+            if (manager->verbose && entry->endpoint != NULL) {
                 logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
                         "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic);
             }
@@ -578,16 +577,28 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
 
 
             //cleanup entry
-            free(entry->scopeAndTopicKey);
-            free(entry->scope);
-            free(entry->topic);
-            if (entry->publisherFilter != NULL) {
-				celix_filter_destroy(entry->publisherFilter);
-			}
-			if (entry->endpoint != NULL) {
-				celix_properties_destroy(entry->endpoint);
-			}
-            free(entry);
+            if (entry->usageCount <= 0) {
+            	//no usage -> remove
+				hashMapIterator_remove(&iter);
+				free(entry->scopeAndTopicKey);
+				free(entry->scope);
+				free(entry->topic);
+				if (entry->publisherFilter != NULL) {
+					celix_filter_destroy(entry->publisherFilter);
+				}
+				if (entry->endpoint != NULL) {
+					celix_properties_destroy(entry->endpoint);
+				}
+				free(entry);
+			} else {
+            	//still usage, setup for new match
+            	if (entry->endpoint != NULL) {
+					celix_properties_destroy(entry->endpoint);
+				}
+            	entry->endpoint = NULL;
+            	entry->selectedPsaSvcId = -1L;
+            	entry->selectedSerializerSvcId = -1L;
+            }
         }
     }
     celixThreadMutex_unlock(&manager->topicSenders.mutex);
@@ -604,10 +615,8 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL && entry->usageCount <= 0) {
-            hashMapIterator_remove(&iter);
-
-            if (manager->verbose) {
+        if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) {
+            if (manager->verbose && entry->endpoint != NULL) {
                 const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
                 const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
                 logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
@@ -628,17 +637,29 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
 				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
 			}
 
-            //cleanup entry
-            free(entry->scopeAndTopicKey);
-            free(entry->scope);
-            free(entry->topic);
-            if (entry->subscriberProperties != NULL) {
-				celix_properties_destroy(entry->subscriberProperties);
-            }
-            if (entry->endpoint != NULL) {
-				celix_properties_destroy(entry->endpoint);
+			if (entry->usageCount <= 0) {
+				//no usage -> remove
+				hashMapIterator_remove(&iter);
+				//cleanup entry
+				free(entry->scopeAndTopicKey);
+				free(entry->scope);
+				free(entry->topic);
+				if (entry->subscriberProperties != NULL) {
+					celix_properties_destroy(entry->subscriberProperties);
+				}
+				if (entry->endpoint != NULL) {
+					celix_properties_destroy(entry->endpoint);
+				}
+				free(entry);
+			} else {
+				//still usage -> setup for rematch
+				if (entry->endpoint != NULL) {
+					celix_properties_destroy(entry->endpoint);
+				}
+				entry->endpoint = NULL;
+				entry->selectedPsaSvcId = -1L;
+				entry->selectedSerializerSvcId = -1L;
 			}
-            free(entry);
         }
     }
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
@@ -698,8 +719,8 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
 	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
 	while (hashMapIterator_hasNext(&iter)) {
 		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (entry != NULL && !entry->setup) {
-			//new topic receiver needed, requesting match with current psa
+		if (entry != NULL && entry->needsMatch) {
+			//new topic sender needed, requesting match with current psa
 			double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
 			long serializerSvcId = -1L;
 			long selectedPsaSvcId = -1L;
@@ -727,7 +748,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
 				bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
 
 				if (called && entry->endpoint != NULL) {
-					entry->setup = true;
+					entry->needsMatch = false;
 
 					//announce new endpoint through the network
 					celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
@@ -739,7 +760,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
 				}
 			}
 
-			if (!entry->setup) {
+			if (entry->needsMatch) {
 				logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic);
 			}
 		}
@@ -758,7 +779,7 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
 	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
 	while (hashMapIterator_hasNext(&iter)) {
 		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (!entry->setup) {
+		if (entry->needsMatch) {
 
 			double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
 			long serializerSvcId = -1L;
@@ -791,7 +812,7 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
 													 pstm_setupTopicReceiverCallback);
 
 				if (called && entry->endpoint != NULL) {
-				    entry->setup = true;
+				    entry->needsMatch = false;
 					//announce new endpoint through the network
 					celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
 					for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
@@ -804,7 +825,7 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
 			}
 
 
-			if (!entry->setup) {
+			if (entry->needsMatch) {
 				logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic);
 			}
 		}
@@ -820,9 +841,11 @@ static void* pstm_psaHandlingThread(void *data) {
     celixThreadMutex_unlock(&manager->psaHandling.mutex);
 
     while (running) {
+    	//first teardown -> also if rematch is needed
         pstm_teardownTopicSenders(manager);
         pstm_teardownTopicReceivers(manager);
 
+        //then see if any topic sender/receiver are needed
         pstm_setupTopicSenders(manager);
         pstm_setupTopicReceivers(manager);
 
@@ -877,7 +900,7 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
 	iter = hashMapIterator_construct(manager->topicSenders.map);
 	while (hashMapIterator_hasNext(&iter)) {
 		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (!entry->setup) {
+		if (entry->endpoint == NULL) {
 			continue;
 		}
 		const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
@@ -902,7 +925,7 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
 	iter = hashMapIterator_construct(manager->topicReceivers.map);
 	while (hashMapIterator_hasNext(&iter)) {
 		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (!entry->setup) {
+		if (entry->endpoint == NULL) {
 			continue;
 		}
 		const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");

http://git-wip-us.apache.org/repos/asf/celix/blob/68f69f89/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index d6db117..39277fe 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -84,6 +84,8 @@ typedef struct pstm_discovered_endpoint_entry {
 } pstm_discovered_endpoint_entry_t;
 
 typedef struct pstm_topic_receiver_or_sender_entry {
+	bool needsMatch; //true if a psa needs to be selected or if a new psa has to be considered.
+
 	char *scopeAndTopicKey; //key of the combined value of the scope and topic
 	celix_properties_t *endpoint;
 	char *topic;
@@ -92,7 +94,6 @@ typedef struct pstm_topic_receiver_or_sender_entry {
 	long selectedPsaSvcId;
 	long selectedSerializerSvcId;
 	long bndId;
-	bool setup;
 
 	//for sender entry
 	celix_filter_t *publisherFilter;