You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:26 UTC

[09/34] celix git commit: CELIX-454: More PubSub. The UDPMC is now somewhat working again, but still needs some testing.

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 85c67e9..1ebd74a 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -36,8 +36,9 @@
 #include "pubsub_listeners.h"
 #include "pubsub_topology_manager.h"
 #include "pubsub_admin.h"
+#include "../../pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h"
 
-#define PSTM_CLEANUP_SLEEPTIME_IN_SECONDS       5L
+#define PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS       30L
 
 static void* pstm_psaHandlingThread(void *data);
 
@@ -161,22 +162,53 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
 		celix_arrayList_destroy(endpointsList);
 	}
 
-	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA");
-}
+	//NOTE psa shutdown will teardown topic receivers / topic senders
+	//de-setup all topic receivers/senders for the removed psa.
+	//the next psaHandling run will try to find new psa.
 
-static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
-	pstm_topic_receiver_or_sender_entry_t *entry = handle;
-	pubsub_admin_service_t *psa = svc;
-	psa->setupTopicReciever(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+	while (hashMapIterator_hasNext(&iter)) {
+	    pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+	    if (entry->selectedPsaSvcId == svcId) {
+	        entry->setup = false;
+	        entry->selectedSerializerSvcId = -1L;
+	        entry->selectedPsaSvcId = -1L;
+	        if (entry->endpoint != NULL) {
+	            celix_properties_destroy(entry->endpoint);
+                entry->endpoint = NULL;
+            }
+	    }
+	}
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    iter = hashMapIterator_construct(manager->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry->selectedPsaSvcId == svcId) {
+            entry->setup = false;
+            entry->selectedSerializerSvcId = -1L;
+            entry->selectedPsaSvcId = -1L;
+            if (entry->endpoint != NULL) {
+                celix_properties_destroy(entry->endpoint);
+                entry->endpoint = NULL;
+            }
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+
+
+    logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA");
 }
 
 void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
 	pubsub_topology_manager_t *manager = handle;
+
 	//NOTE new local subscriber service register
 	//1) First trying to see if a TopicReceiver already exists for this subscriber, if found
 	//2) update the usage count. if not found
-	//3) Create new entry, find matching psa and serializer and broadcast cond, so that the psaHandling thread will
-	//   call the psa to setup the topic receiver and announce the endpoint.
+	//3) signal psaHandling thread to setup topic receiver
 
 	const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
 	const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -196,75 +228,28 @@ void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute
 	if (entry != NULL) {
 		entry->usageCount += 1;
 		free(scopeAndTopicKey);
-	}
-	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
-
-	if (entry == NULL) {
-		//new TopicReceiver needed -> matching for psa/serializer
+	} else {
 		entry = calloc(1, sizeof(*entry));
 		entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
+		entry->scope = strndup(scope, 1024 * 1024);
+		entry->topic = strndup(topic, 1024 * 1024);
+		entry->usageCount = 1;
 		entry->selectedPsaSvcId = -1L;
 		entry->selectedSerializerSvcId = -1L;
-		entry->usageCount = 1;
-
-		double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
-		long serializerSvcId = -1L;
-		long selectedPsasvcId = -1L;
-
-		celixThreadMutex_lock(&manager->pubsubadmins.mutex);
-		hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
-		while (hashMapIterator_hasNext(&iter)) {
-			hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter);
-			long svcId = (long) hashMapEntry_getKey(mapEntry);
-			pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
-			double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
-			long serSvcId = -1L;
-
-			psa->matchSubscriber(psa->handle, bndId, props, &score, &serSvcId);
-			if (score > highestScore) {
-				highestScore = score;
-				serializerSvcId = serSvcId;
-				selectedPsasvcId = svcId;
-			}
-		}
-		celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
-		if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
-			entry->selectedPsaSvcId = selectedPsasvcId;
-			entry->selectedSerializerSvcId = serializerSvcId;
-
-			celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry,
-												 pstm_setupTopicReceiverCallback);
+		entry->setup = false;
+		entry->bndId = bndId;
+		entry->subscriberProperties = celix_properties_copy(props);
+		hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
 
-			if (entry->endpoint != NULL) {
-				entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-				entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-				entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
-
-				//announce new endpoint through the network
-				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-				for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-					pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
-					listener->announceEndpoint(listener->handle, entry->endpoint);
-				}
-				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-
-				//store topic receiver.
-				//TODO race condition if multiple scope/topic combinations are request -> broader the lock?
-				celixThreadMutex_lock(&manager->topicReceivers.mutex);
-				hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
-				celixThreadMutex_unlock(&manager->topicReceivers.mutex);
-			} else {
-				free(entry->scopeAndTopicKey);
-				free(entry);
-				//ignore -> psa unregistered in meantime
-			}
-		}
+		//signal psa handling thread
+		celixThreadCondition_broadcast(&manager->psaHandling.cond);
 	}
+	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 }
 
 void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
 	pubsub_topology_manager_t *manager = handle;
+
 	//NOTE local subscriber service unregister
 	//1) Find topic receiver and decrease count
 
@@ -334,20 +319,13 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle,
 	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
 }
 
-
-static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
-	pstm_topic_receiver_or_sender_entry_t *entry = handle;
-	pubsub_admin_service_t *psa = svc;
-	psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
-}
-
 void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
 	pubsub_topology_manager_t *manager = handle;
 
 	//NOTE new local subscriber service register
 	//1) First trying to see if a TopicReceiver already exists for this subscriber, if found
 	//2) update the usage count. if not found
-	//3) Try to find a matching psa and create a new TopicReceiver.
+	//3) signal psaHandling thread to find a psa and setup TopicSender
 
 	//TODO FIXME
 	if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) {
@@ -356,96 +334,45 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
 	}
 
 
-	char *topic = NULL;
+	char *topicFromFilter = NULL;
 	char *scopeFromFilter = NULL;
-	pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
-	const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+	pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter);
+	char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter;
+	char *topic = topicFromFilter;
 
 	char *scopeAndTopicKey = NULL;
 	if (topic == NULL) {
 		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
-		        "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.", PUBSUB_SUBSCRIBER_TOPIC);
-	} else {
-		scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
-		celixThreadMutex_lock(&manager->topicSenders.mutex);
-		pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
-		if (entry != NULL) {
-			entry->usageCount += 1;
-		}
-		celixThreadMutex_unlock(&manager->topicSenders.mutex);
-
-		if (entry == NULL) {
-			//new topic receiver needed, requesting match with current psa
-			double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
-			long serializerSvcId = -1L;
-			long selectedPsasvcId = -1L;
-
-			celixThreadMutex_lock(&manager->pubsubadmins.mutex);
-			hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
-			while (hashMapIterator_hasNext(&iter)) {
-				hash_map_entry_t *entry = hashMapIterator_nextEntry(&iter);
-				long svcId = (long)hashMapEntry_getKey(entry);
-				pubsub_admin_service_t *psa = hashMapEntry_getValue(entry);
-				double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
-				long serSvcId = -1L;
-				psa->matchPublisher(psa->handle, info->bundleId, info->filter, &score, &serSvcId);
-				if (score > highestScore) {
-					highestScore = score;
-					serializerSvcId = serSvcId;
-					selectedPsasvcId = svcId;
-				}
-			}
-			celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
-			if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
-				entry = calloc(1, sizeof(*entry));
-				entry->scopeAndTopicKey = scopeAndTopicKey;
-				entry->usageCount = 1;
-				entry->selectedPsaSvcId = selectedPsasvcId;
-				entry->selectedSerializerSvcId = serializerSvcId;
-				entry->topic = topic; //NOTE tmp
-				entry->scope = scope; //NOTE tmp
-
-				celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
-
-				if (entry->endpoint != NULL) {
-					//note psa->setupTopicSender has created the endpoint.
-					entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-					entry->topic = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-					entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
-				} else {
-					free(entry->scopeAndTopicKey);
-					free(entry);
-					entry = NULL;
-					//ignore -> psa unregistered in meantime
-				}
-
-				if (entry != NULL) {
-					//announce new endpoint through the network
-					celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-					for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-						pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
-						listener->announceEndpoint(listener->handle, entry->endpoint);
-					}
-					celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-
-					//store topic sender.
-					celixThreadMutex_lock(&manager->topicSenders.mutex);
-					hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
-					celixThreadMutex_unlock(&manager->topicSenders.mutex);
+					  "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.",
+					  PUBSUB_SUBSCRIBER_TOPIC);
+		return;
+	}
 
-					const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-					const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-					logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
-								  "[PSTM] setting up new TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n",
-								  entry->scope, entry->topic, adminType, serType);
-				}
-			}
-		} else {
-			free(scopeAndTopicKey);
-		}
+	scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+	celixThreadMutex_lock(&manager->topicSenders.mutex);
+	pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+	if (entry != NULL) {
+		entry->usageCount += 1;
+		free(scope);
+		free(topic);
+		free(scopeAndTopicKey);
+	} else {
+		entry = calloc(1, sizeof(*entry));
+		entry->usageCount = 1;
+		entry->selectedSerializerSvcId = -1L;
+		entry->selectedPsaSvcId = -1L;
+		entry->scope = scope; //taking ownership
+		entry->topic = topic; //taking ownership
+		entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership
+		entry->setup = false;
+		entry->publisherFilter = celix_filter_create(info->filter->filterStr);
+		entry->bndId = info->bundleId;
+		hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
+
+		//new entry -> wakeup psaHandling thread
+		celixThreadCondition_broadcast(&manager->psaHandling.cond);
 	}
-	free(scopeFromFilter);
+	celixThreadMutex_unlock(&manager->topicSenders.mutex);
 }
 
 void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) {
@@ -483,12 +410,6 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
 	free(scopeFromFilter);
 }
 
-static void pstm_addEndpointCallback(void *handle, void *svc) {
-	celix_properties_t *endpoint = handle;
-	pubsub_admin_service_t *psa = svc;
-	psa->addEndpoint(psa->handle, endpoint);
-}
-
 celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint){
 	celix_status_t status = CELIX_SUCCESS;
     pubsub_topology_manager_t *manager = handle;
@@ -515,51 +436,20 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
 	if (entry != NULL) {
 		//already existing endpoint -> increase usage
 		entry->usageCount += 1;
-	}
-	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
-
-	if (entry == NULL) {
-
+	} else {
 		//new endpoint -> new entry
 		entry = calloc(1, sizeof(*entry));
 		entry->usageCount = 1;
 		entry->endpoint = celix_properties_copy(endpoint);
 		entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
-		entry->selectedPsaSvcId = -1L;
-
-		double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
-		long psaSvcId = -1L;
-
-		celixThreadMutex_lock(&manager->pubsubadmins.mutex);
-		hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
-		while (hashMapIterator_hasNext(&iter)) {
-			hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter);
-			pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
-			long svcId = (long) hashMapEntry_getKey(mapEntry);
-			double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
-			psa->matchEndpoint(psa->handle, endpoint, &score);
-			if (score > highestScore) {
-				highestScore = score;
-				psaSvcId = svcId;
-			}
-		}
-		celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+		entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet
+		hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry);
 
-		if (psaSvcId >= 0) {
-			//psa called outside of mutex, this means the it can happen that addEndpointCallback is not called.
-			//for now this is expected behaviour;
-			//You need to start the pubsub admin stuff before the bundles using pubsub.
-			celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
-												 (void *) endpoint, pstm_addEndpointCallback);
-		} else {
-			logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
-		}
+		//waking up psa handling thread to select psa
+		celixThreadCondition_broadcast(&manager->psaHandling.cond);
 
-		entry->selectedPsaSvcId = psaSvcId;
-		celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
-		hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry);
-		celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
 	}
+	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
 
     return status;
 }
@@ -635,28 +525,34 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
         if (entry != NULL && entry->usageCount <= 0) {
             hashMapIterator_remove(&iter);
             if (manager->verbose) {
-                const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-                const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
                 logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
-                              "[PSTM] Tearing down TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n",
-                              entry->scope, entry->topic, adminType, serType);
-            }
-
-            celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-            for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-                pubsub_announce_endpoint_listener_t *listener;
-                listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
-                listener->removeEndpoint(listener->handle, entry->endpoint);
+                        "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic);
             }
-            celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
 
-            celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
-                                                 entry, pstm_teardownTopicSenderCallback);
+            if (entry->endpoint != NULL) {
+				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+				for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+					pubsub_announce_endpoint_listener_t *listener;
+					listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+					listener->removeEndpoint(listener->handle, entry->endpoint);
+				}
+				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+				celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
+													 PUBSUB_ADMIN_SERVICE_NAME,
+													 entry, pstm_teardownTopicSenderCallback);
+			}
 
 
             //cleanup entry
             free(entry->scopeAndTopicKey);
-            celix_properties_destroy(entry->endpoint);
+            free(entry->scope);
+            free(entry->topic);
+            if (entry->publisherFilter != NULL) {
+				celix_filter_destroy(entry->publisherFilter);
+			}
+			if (entry->endpoint != NULL) {
+				celix_properties_destroy(entry->endpoint);
+			}
             free(entry);
         }
     }
@@ -685,23 +581,203 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
                               entry->scope, entry->topic, adminType, serType);
             }
 
-            celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_teardownTopicReceiverCallback);
-            celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-            for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-                pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
-                listener->removeEndpoint(listener->handle, entry->endpoint);
-            }
-            celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+            if (entry->endpoint != NULL) {
+				celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
+													 PUBSUB_ADMIN_SERVICE_NAME, entry,
+													 pstm_teardownTopicReceiverCallback);
+				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+				for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+					pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
+							manager->announceEndpointListeners.list, i);
+					listener->removeEndpoint(listener->handle, entry->endpoint);
+				}
+				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+			}
 
             //cleanup entry
             free(entry->scopeAndTopicKey);
-            celix_properties_destroy(entry->endpoint);
+            free(entry->scope);
+            free(entry->topic);
+            if (entry->subscriberProperties != NULL) {
+				celix_properties_destroy(entry->subscriberProperties);
+            }
+            if (entry->endpoint != NULL) {
+				celix_properties_destroy(entry->endpoint);
+			}
             free(entry);
         }
     }
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 }
 
+static void pstm_addEndpointCallback(void *handle, void *svc) {
+	celix_properties_t *endpoint = handle;
+	pubsub_admin_service_t *psa = svc;
+	psa->addEndpoint(psa->handle, endpoint);
+}
+
+static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
+	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+	hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+	while (hashMapIterator_hasNext(&iter)) {
+		pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
+		if (entry != NULL && entry->selectedPsaSvcId < 0) {
+			long psaSvcId = -1L;
+
+			celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+			hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+			while (hashMapIterator_hasNext(&iter2)) {
+				hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+				pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+				long svcId = (long) hashMapEntry_getKey(mapEntry);
+				bool match = false;
+				psa->matchEndpoint(psa->handle, entry->endpoint, &match);
+				if (match) {
+					psaSvcId = svcId;
+					break;
+				}
+			}
+			celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+			if (psaSvcId >= 0) {
+				celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+													 (void *)entry->endpoint, pstm_addEndpointCallback);
+			} else {
+				logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
+			}
+
+			entry->selectedPsaSvcId = psaSvcId;
+		}
+	}
+	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+}
+
+
+static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
+	pstm_topic_receiver_or_sender_entry_t *entry = handle;
+	pubsub_admin_service_t *psa = svc;
+	psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+}
+
+static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
+	celixThreadMutex_lock(&manager->topicSenders.mutex);
+	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+	while (hashMapIterator_hasNext(&iter)) {
+		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+		if (entry != NULL && !entry->setup) {
+			//new topic receiver needed, requesting match with current psa
+			double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+			long serializerSvcId = -1L;
+			long selectedPsaSvcId = -1L;
+
+			celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+			hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+			while (hashMapIterator_hasNext(&iter2)) {
+				hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+				long svcId = (long)hashMapEntry_getKey(mapEntry);
+				pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+				double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+				long serSvcId = -1L;
+				psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &score, &serSvcId);
+				if (score > highestScore) {
+					highestScore = score;
+					serializerSvcId = serSvcId;
+                    selectedPsaSvcId = svcId;
+				}
+			}
+			celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+			if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+			    entry->selectedPsaSvcId = selectedPsaSvcId;
+			    entry->selectedSerializerSvcId = serializerSvcId;
+				bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
+
+				if (called && entry->endpoint != NULL) {
+					entry->setup = true;
+
+					//announce new endpoint through the network
+					celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+					for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+						pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+						listener->announceEndpoint(listener->handle, entry->endpoint);
+					}
+					celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+				}
+			}
+
+			if (!entry->setup) {
+				logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic);
+			}
+		}
+	}
+	celixThreadMutex_unlock(&manager->topicSenders.mutex);
+}
+
+static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
+    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    pubsub_admin_service_t *psa = svc;
+    psa->setupTopicReciever(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+}
+
+static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
+	celixThreadMutex_lock(&manager->topicReceivers.mutex);
+	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
+	while (hashMapIterator_hasNext(&iter)) {
+		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+		if (!entry->setup) {
+
+			double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+			long serializerSvcId = -1L;
+			long selectedPsaSvcId = -1L;
+
+			celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+			hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+			while (hashMapIterator_hasNext(&iter2)) {
+				hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+				long svcId = (long) hashMapEntry_getKey(mapEntry);
+				pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+				double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+				long serSvcId = -1L;
+
+				psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &score, &serSvcId);
+				if (score > highestScore) {
+					highestScore = score;
+					serializerSvcId = serSvcId;
+                    selectedPsaSvcId = svcId;
+				}
+			}
+			celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+			if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+				entry->selectedPsaSvcId = selectedPsaSvcId;
+				entry->selectedSerializerSvcId = serializerSvcId;
+
+				bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+													 entry,
+													 pstm_setupTopicReceiverCallback);
+
+				if (called && entry->endpoint != NULL) {
+				    entry->setup = true;
+					//announce new endpoint through the network
+					celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+					for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+						pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
+								manager->announceEndpointListeners.list, i);
+						listener->announceEndpoint(listener->handle, entry->endpoint);
+					}
+					celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+				}
+			}
+
+
+			if (!entry->setup) {
+				logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic);
+			}
+		}
+	}
+	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+}
+
 static void* pstm_psaHandlingThread(void *data) {
     pubsub_topology_manager_t *manager = data;
 
@@ -713,8 +789,13 @@ static void* pstm_psaHandlingThread(void *data) {
         pstm_teardownTopicSenders(manager);
         pstm_teardownTopicReceivers(manager);
 
+        pstm_setupTopicSenders(manager);
+        pstm_setupTopicReceivers(manager);
+
+		pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
+
         celixThreadMutex_lock(&manager->psaHandling.mutex);
-        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_CLEANUP_SLEEPTIME_IN_SECONDS, 0L);
+        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS, 0L);
         running = manager->psaHandling.running;
         celixThreadMutex_unlock(&manager->psaHandling.mutex);
     }
@@ -733,18 +814,24 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
 	hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
 	while (hashMapIterator_hasNext(&iter)) {
 		pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter);
+        const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!");
+        const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!");
+        const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
 		const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
 		const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
 		const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
 		const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
 		fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
-		fprintf(os, "   |- scope       = %s\n", scope);
-		fprintf(os, "   |- topic       = %s\n", topic);
-		fprintf(os, "   |- admin type  = %s\n", adminType);
-		fprintf(os, "   |- serializer  = %s\n", serType);
+        fprintf(os, "   |- container name = %s\n", cn);
+        fprintf(os, "   |- fw uuid        = %s\n", fwuuid);
+        fprintf(os, "   |- type           = %s\n", type);
+		fprintf(os, "   |- scope          = %s\n", scope);
+		fprintf(os, "   |- topic          = %s\n", topic);
+		fprintf(os, "   |- admin type     = %s\n", adminType);
+		fprintf(os, "   |- serializer     = %s\n", serType);
 		if (manager->verbose) {
-			fprintf(os, "   |- psa svc id  = %li\n", discovered->selectedPsaSvcId);
-			fprintf(os, "   |- usage count = %i\n", discovered->usageCount);
+			fprintf(os, "   |- psa svc id     = %li\n", discovered->selectedPsaSvcId);
+			fprintf(os, "   |- usage count    = %i\n", discovered->usageCount);
 		}
 	}
 	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
@@ -756,9 +843,13 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
 	iter = hashMapIterator_construct(manager->topicSenders.map);
 	while (hashMapIterator_hasNext(&iter)) {
 		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+		if (!entry->setup) {
+			continue;
+		}
+		const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
 		const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
 		const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-		fprintf(os, "|- Topic Sender for endpoint %s:\n", entry->endpointUUID);
+		fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
 		fprintf(os, "   |- scope       = %s\n", entry->scope);
 		fprintf(os, "   |- topic       = %s\n", entry->topic);
 		fprintf(os, "   |- admin type  = %s\n", adminType);
@@ -777,9 +868,13 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
 	iter = hashMapIterator_construct(manager->topicReceivers.map);
 	while (hashMapIterator_hasNext(&iter)) {
 		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+		if (!entry->setup) {
+			continue;
+		}
+		const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
 		const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
 		const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-		fprintf(os, "|- Topic Receiver for endpoint %s:\n", entry->endpointUUID);
+		fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
 		fprintf(os, "   |- scope       = %s\n", entry->scope);
 		fprintf(os, "   |- topic       = %s\n", entry->topic);
 		fprintf(os, "   |- admin type  = %s\n", adminType);
@@ -793,5 +888,7 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
 	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 	fprintf(os,"\n");
 
+	fprintf(os, "TODO pending topic senders/receivers\n");
+
 	return CELIX_SUCCESS;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 105b797..3cca612 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -83,7 +83,7 @@ typedef struct pubsub_topology_manager {
 
 typedef struct pstm_discovered_endpoint_entry {
 	const char *uuid;
-	long selectedPsaSvcId;
+	long selectedPsaSvcId; // -1L, indicates no selected psa
 	int usageCount; //note that discovered endpoints can be found multiple times by different pubsub discovery components
 	celix_properties_t *endpoint;
 } pstm_discovered_endpoint_entry_t;
@@ -91,12 +91,19 @@ typedef struct pstm_discovered_endpoint_entry {
 typedef struct pstm_topic_receiver_or_sender_entry {
 	char *scopeAndTopicKey; //key of the combined value of the scope and topic
 	celix_properties_t *endpoint;
-	const char *topic;
-	const char *scope;
-	const char *endpointUUID;
+	char *topic;
+	char *scope;
 	int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
 	long selectedPsaSvcId;
 	long selectedSerializerSvcId;
+	long bndId;
+	bool setup;
+
+	//for sender entry
+	celix_filter_t *publisherFilter;
+
+	//for receiver entry
+	celix_properties_t *subscriberProperties;
 } pstm_topic_receiver_or_sender_entry_t;
 
 celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager);

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/libs/framework/include/celix_bundle_context.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_bundle_context.h b/libs/framework/include/celix_bundle_context.h
index 76af9a5..6ecc445 100644
--- a/libs/framework/include/celix_bundle_context.h
+++ b/libs/framework/include/celix_bundle_context.h
@@ -275,7 +275,7 @@ long celix_bundleContext_trackServices(
 /**
  * Service Tracker Options used to fine tune which services to track and the callback to be used for the tracked services.
  */
-typedef struct celix_service_tracker_options {
+typedef struct celix_service_tracking_options {
     /**
      * The service filter options, used to setup the filter for the service to track.
      */