You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:22 UTC

[05/34] celix git commit: CELIX-454: More PubSub refactoring. Started creating a new skeleton for psa udpmc based on a updated pubsub spi.

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 834e3a8..85c67e9 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -16,24 +16,19 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * pubsub_topology_manager.c
- *
- *  \date       Sep 29, 2011
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
+
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <stdbool.h>
 #include <celix_api.h>
+#include <pubsub_utils.h>
+#include <assert.h>
 
 #include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
+#include "celix_array_list.h"
+#include "celix_bundle_context.h"
 #include "constants.h"
-#include "listener_hook_service.h"
 #include "utils.h"
 #include "log_service.h"
 #include "log_helper.h"
@@ -42,697 +37,761 @@
 #include "pubsub_topology_manager.h"
 #include "pubsub_admin.h"
 
-static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) {
-	for(hash_map_iterator_t iter = hashMapIterator_construct(endpoints); hashMapIterator_hasNext(&iter);) {
-		const char* key = (const char*)hashMapIterator_nextKey(&iter);
-		fprintf(outStream, "    Topic=%s\n", key);
-		array_list_pt ep_list = hashMap_get(endpoints, key);
-		for(unsigned int i = 0; i < arrayList_size(ep_list); ++i) {
-			pubsub_endpoint_pt ep = arrayList_get(ep_list, i);
-			fprintf(outStream, "        Endpoint %d\n", i);
-			fprintf(outStream, "            Endpoint properties\n");
-			const char *propKey;
-			if(ep->properties) {
-				PROPERTIES_FOR_EACH(ep->properties, propKey) {
-					fprintf(outStream, "                %s => %s\n", propKey, celix_properties_get(ep->properties, propKey, NULL));
-				}
-			}
-		}
-	}
+#define PSTM_CLEANUP_SLEEPTIME_IN_SECONDS       5L
 
-}
+static void* pstm_psaHandlingThread(void *data);
 
-static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
-	pubsub_topology_manager_t *manager = (pubsub_topology_manager_t*) handle;
-	if (manager->publications && !hashMap_isEmpty(manager->publications)) {
-		fprintf(outStream, "Publications:\n");
-		print_endpoint_info(manager->publications, outStream);
-	}
-	if (manager->subscriptions && !hashMap_isEmpty(manager->subscriptions)) {
-		fprintf(outStream, "Subscriptions:\n");
-		print_endpoint_info(manager->subscriptions, outStream);
-	}
-	return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager) {
+celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **out) {
 	celix_status_t status = CELIX_SUCCESS;
 
-	*manager = calloc(1, sizeof(**manager));
-	if (!*manager) {
+	pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager));
+	if (manager == NULL) {
+		*out = NULL;
 		return CELIX_ENOMEM;
+	} else {
+		*out = manager;
 	}
 
-	(*manager)->context = context;
+	manager->context = context;
 
 	celix_thread_mutexattr_t psaAttr;
 	celixThreadMutexAttr_create(&psaAttr);
 	celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-	status |= celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
+	status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
 	celixThreadMutexAttr_destroy(&psaAttr);
 
-	status |= celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
-	status |= celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
-	status |= celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
+	status |= celixThreadMutex_create(&manager->announcedEndpoints.mutex, NULL);
+	status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
+	status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
+	status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
+	status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
+	status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
 
-	arrayList_create(&(*manager)->psaList);
+	status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
 
-	(*manager)->discoveryList = hashMap_create(NULL, NULL, NULL, NULL);
-	(*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-	(*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+	manager->announcedEndpoints.map = hashMap_create(NULL, NULL, NULL, NULL);
+	manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+	manager->announceEndpointListeners.list = celix_arrayList_create();
+	manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL);
+	manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+	manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
-	(*manager)->loghelper = logHelper;
-	(*manager)->shellCmdService.handle = *manager;
-	(*manager)->shellCmdService.executeCommand = shellCommand;
+	manager->loghelper = logHelper;
+	manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
 
-	(*manager)->verbose = PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE;
-	const char *verboseStr = NULL;
-	bundleContext_getProperty(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, &verboseStr);
-	if (verboseStr != NULL) {
-		(*manager)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
-	}
+	manager->psaHandling.running = true;
+	celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
+	celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager");
 
-	properties_pt shellProps = properties_create();
-	properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info");
-	properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info");
-	properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "ps_info: Overview of PubSub");
-	bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &((*manager)->shellCmdService), shellProps, &((*manager)->shellCmdReg));
 	return status;
 }
 
 celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) {
 	celix_status_t status = CELIX_SUCCESS;
 
-	celixThreadMutex_lock(&manager->discoveryListLock);
-	hashMap_destroy(manager->discoveryList, false, false);
-	celixThreadMutex_unlock(&manager->discoveryListLock);
-	celixThreadMutex_destroy(&manager->discoveryListLock);
-
-	celixThreadMutex_lock(&manager->psaListLock);
-	arrayList_destroy(manager->psaList);
-	celixThreadMutex_unlock(&manager->psaListLock);
-	celixThreadMutex_destroy(&manager->psaListLock);
-
-	celixThreadMutex_lock(&manager->publicationsLock);
-	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
-	while(hashMapIterator_hasNext(pubit)){
-		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit);
-		unsigned int i;
-		for(i=0;i<arrayList_size(l);i++){
-			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
-		}
-		arrayList_destroy(l);
-	}
-	hashMapIterator_destroy(pubit);
-	hashMap_destroy(manager->publications, true, false);
-	celixThreadMutex_unlock(&manager->publicationsLock);
-	celixThreadMutex_destroy(&manager->publicationsLock);
-
-	celixThreadMutex_lock(&manager->subscriptionsLock);
-	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
-	while(hashMapIterator_hasNext(subit)){
-		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit);
-		unsigned int i;
-		for(i=0;i<arrayList_size(l);i++){
-			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
-		}
-		arrayList_destroy(l);
-	}
-	hashMapIterator_destroy(subit);
-	hashMap_destroy(manager->subscriptions, true, false);
-	celixThreadMutex_unlock(&manager->subscriptionsLock);
-	celixThreadMutex_destroy(&manager->subscriptionsLock);
-	serviceRegistration_unregister(manager->shellCmdReg);
-	free(manager);
+	celixThreadMutex_lock(&manager->psaHandling.mutex);
+	manager->psaHandling.running = false;
+	celixThreadCondition_broadcast(&manager->psaHandling.cond);
+	celixThreadMutex_unlock(&manager->psaHandling.mutex);
+	celixThread_join(manager->psaHandling.thread, NULL);
 
-	return status;
-}
 
-void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
-	pubsub_topology_manager_t *manager = handle;
-	unsigned int i;
+	celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+	hashMap_destroy(manager->announcedEndpoints.map, false, false);
+	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
+	celixThreadMutex_destroy(&manager->announcedEndpoints.mutex);
 
-	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc;
-	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
+	celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+	hashMap_destroy(manager->pubsubadmins.map, false, false);
+	celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+	celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
 
-	celixThreadMutex_lock(&manager->psaListLock);
-	arrayList_add(manager->psaList, psa);
-	celixThreadMutex_unlock(&manager->psaListLock);
+	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+	hashMap_destroy(manager->discoveredEndpoints.map, true, false);
+	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+	celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex);
 
-	// Add already detected subscriptions to new PSA
-	celixThreadMutex_lock(&manager->subscriptionsLock);
-	hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions);
-
-	//TODO FIXME no matching used, should only add unmatched subscribers ?
-	//NOTE this is a bug which occurs when psa are started after bundles that uses the PSA
-	while (hashMapIterator_hasNext(subscriptionsIterator)) {
-		array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
-		for(i=0;i<arrayList_size(sub_ep_list);i++){
-			psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
-		}
-	}
+	free(manager);
 
-	hashMapIterator_destroy(subscriptionsIterator);
+	return status;
+}
 
-	celixThreadMutex_unlock(&manager->subscriptionsLock);
+void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+	pubsub_topology_manager_t *manager = handle;
+	pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
 
-	// Add already detected publications to new PSA
-	celixThreadMutex_lock(&manager->publicationsLock);
-	hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications);
+	long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA");
 
-	//TODO FIXME no matching used, should only add unmatched publications ?
-	//NOTE this is a bug which occurs when psa are started after bundles that uses the PSA
-	while (hashMapIterator_hasNext(publicationsIterator)) {
-		array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
-		for(i=0;i<arrayList_size(pub_ep_list);i++){
-			psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
-		}
+	if (svcId >= 0) {
+		celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+		hashMap_put(manager->pubsubadmins.map, (void*)svcId, psa);
+		celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
 	}
 
-	hashMapIterator_destroy(publicationsIterator);
-
-	celixThreadMutex_unlock(&manager->publicationsLock);
+	/* NOTE for now it assumed PSA / PST and PSD are started before subscribers/publisher
+	 * so no retroactively adding subscribers
+	 *
+	 * TODO future extension?
+	 */
 }
 
-void pubsub_topologyManager_psaRemoved(void * handle, void *svc, const celix_properties_t *props  __attribute__((unused))) {
-	celix_status_t status = CELIX_SUCCESS;
+void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
 	pubsub_topology_manager_t *manager = handle;
+	//pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
+	long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
 
-	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc;
-
-	/* Deactivate all publications */
-	celixThreadMutex_lock(&manager->publicationsLock);
-
-	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
-	while(hashMapIterator_hasNext(pubit)){
-		hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit);
-		char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry);
-		// Extract scope/topic name from key
-		char scope[MAX_SCOPE_LEN];
-		char topic[MAX_TOPIC_LEN];
-		sscanf(scope_topic_key, "%[^:]:%s", scope, topic );
-		array_list_pt pubEP_list = (array_list_pt)hashMapEntry_getValue(pub_entry);
-
-		status = psa->closeAllPublications(psa->admin,scope,topic);
-
-		if(status==CELIX_SUCCESS){
-			celixThreadMutex_lock(&manager->discoveryListLock);
-			hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-			while(hashMapIterator_hasNext(iter)){
-				service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-				pubsub_announce_endpoint_listener_t *disc = NULL;
-				bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-				const char* fwUUID = NULL;
-				bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-				unsigned int i;
-				for(i=0;i<arrayList_size(pubEP_list);i++){
-					pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-					if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
-						disc->removeEndpoint(disc->handle,pubEP->properties);
-					}
-				}
-				bundleContext_ungetService(manager->context, disc_sr, NULL);
+	/* de-announce all publications */
+	celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+	celix_array_list_t *endpointsList = hashMap_remove(manager->announcedEndpoints.map, (void*)svcId);
+	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
+
+	if (endpointsList != NULL) {
+		for (int i = 0; i < celix_arrayList_size(endpointsList); ++i) {
+			celix_properties_t *endpoint = celix_arrayList_get(endpointsList, i);
+			celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+			for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+				pubsub_announce_endpoint_listener_t *listener;
+				listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+				listener->removeEndpoint(listener->handle, endpoint);
 			}
-			hashMapIterator_destroy(iter);
-			celixThreadMutex_unlock(&manager->discoveryListLock);
+			celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+			celix_properties_destroy(endpoint);
 		}
+		celix_arrayList_destroy(endpointsList);
 	}
-	hashMapIterator_destroy(pubit);
-
-	celixThreadMutex_unlock(&manager->publicationsLock);
-
-	/* Deactivate all subscriptions */
-	celixThreadMutex_lock(&manager->subscriptionsLock);
-	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
-	while(hashMapIterator_hasNext(subit)){
-		// TODO do some error checking
-		char* scope_topic = (char*)hashMapIterator_nextKey(subit);
-		char scope[MAX_TOPIC_LEN];
-		char topic[MAX_TOPIC_LEN];
-		memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char));
-		memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char));
-		sscanf(scope_topic, "%[^:]:%s", scope, topic );
-		status += psa->closeAllSubscriptions(psa->admin,scope, topic);
-	}
-	hashMapIterator_destroy(subit);
-	celixThreadMutex_unlock(&manager->subscriptionsLock);
 
-	celixThreadMutex_lock(&manager->psaListLock);
-	arrayList_removeElement(manager->psaList, psa);
-	celixThreadMutex_unlock(&manager->psaListLock);
+	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA");
+}
 
-	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA");
+static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
+	pstm_topic_receiver_or_sender_entry_t *entry = handle;
+	pubsub_admin_service_t *psa = svc;
+	psa->setupTopicReciever(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
 }
 
-void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
 	pubsub_topology_manager_t *manager = handle;
-	//subscriber_service_pt subscriber = (subscriber_service_pt)service;
+	//NOTE new local subscriber service register
+	//1) First trying to see if a TopicReceiver already exists for this subscriber, if found
+	//2) update the usage count. if not found
+	//3) Create new entry, find matching psa and serializer and broadcast cond, so that the psaHandling thread will
+	//   call the psa to setup the topic receiver and announce the endpoint.
+
+	const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+	const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+	if (topic == NULL) {
+		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+					  "[PSTM] Warning found subscriber service without mandatory %s property.",
+					  PUBSUB_SUBSCRIBER_TOPIC);
+		return;
+	}
 
-	pubsub_endpoint_pt sub = NULL;
-	if(pubsubEndpoint_createFromSvc(manager->context, bnd, props,false, &sub) == CELIX_SUCCESS) {
-		celixThreadMutex_lock(&manager->subscriptionsLock);
-		char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(sub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+	long bndId = celix_bundle_getId(bnd);
+	char *scopeAndTopicKey = NULL;
+	scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
-		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
-		if(sub_list_by_topic==NULL){
-			arrayList_create(&sub_list_by_topic);
-			hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic);
-		}
-		free(sub_key);
-		arrayList_add(sub_list_by_topic,sub);
-
-		celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-		unsigned int j;
-		double score = 0;
-		double best_score = 0;
-		pubsub_admin_service_pt best_psa = NULL;
-		celixThreadMutex_lock(&manager->psaListLock);
-		for(j=0;j<arrayList_size(manager->psaList);j++){
-			pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-			psa->matchEndpoint(psa->admin,sub,&score);
-			if (score > best_score) { /* We have a new winner! */
-				best_score = score;
-				best_psa = psa;
+	celixThreadMutex_lock(&manager->topicReceivers.mutex);
+	pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
+	if (entry != NULL) {
+		entry->usageCount += 1;
+		free(scopeAndTopicKey);
+	}
+	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+	if (entry == NULL) {
+		//new TopicReceiver needed -> matching for psa/serializer
+		entry = calloc(1, sizeof(*entry));
+		entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
+		entry->selectedPsaSvcId = -1L;
+		entry->selectedSerializerSvcId = -1L;
+		entry->usageCount = 1;
+
+		double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+		long serializerSvcId = -1L;
+		long selectedPsasvcId = -1L;
+
+		celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+		hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
+		while (hashMapIterator_hasNext(&iter)) {
+			hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter);
+			long svcId = (long) hashMapEntry_getKey(mapEntry);
+			pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+			double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+			long serSvcId = -1L;
+
+			psa->matchSubscriber(psa->handle, bndId, props, &score, &serSvcId);
+			if (score > highestScore) {
+				highestScore = score;
+				serializerSvcId = serSvcId;
+				selectedPsasvcId = svcId;
 			}
 		}
+		celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
 
-		if (best_psa != NULL && best_score>0) {
-			best_psa->addSubscription(best_psa->admin,sub);
-		}
-
-		// Inform discoveries for interest in the topic
-		celixThreadMutex_lock(&manager->discoveryListLock);
-		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-		while(hashMapIterator_hasNext(iter)){
-			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-			pubsub_announce_endpoint_listener_t *disc = NULL;
-			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-			disc->announceEndpoint(disc->handle, sub->properties);
-			bundleContext_ungetService(manager->context, disc_sr, NULL);
-		}
-		hashMapIterator_destroy(iter);
-		celixThreadMutex_unlock(&manager->discoveryListLock);
-
-		celixThreadMutex_unlock(&manager->psaListLock);
-	}
-}
-
-
-void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
-	pubsub_topology_manager_t *manager = handle;
-
-	pubsub_endpoint_pt subcmp = NULL;
-	if (pubsubEndpoint_createFromSvc(manager->context, bnd, props, false, &subcmp) == CELIX_SUCCESS){
-
-		unsigned int j,k;
+		if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+			entry->selectedPsaSvcId = selectedPsasvcId;
+			entry->selectedSerializerSvcId = serializerSvcId;
 
-		// Inform discoveries that we not interested in the topic any more
-		celixThreadMutex_lock(&manager->discoveryListLock);
-		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-		while(hashMapIterator_hasNext(iter)){
-			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-			pubsub_announce_endpoint_listener_t *disc = NULL;
-			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-			disc->removeEndpoint(disc->handle, subcmp->properties);
-			bundleContext_ungetService(manager->context, disc_sr, NULL);
-		}
-		hashMapIterator_destroy(iter);
-		celixThreadMutex_unlock(&manager->discoveryListLock);
-
-		celixThreadMutex_lock(&manager->subscriptionsLock);
-		celixThreadMutex_lock(&manager->psaListLock);
-
-		char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
-		free(sub_key);
-		if(sub_list_by_topic!=NULL){
-			for(j=0;j<arrayList_size(sub_list_by_topic);j++){
-				pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j);
-				if(pubsubEndpoint_equals(sub,subcmp)){
-					for(k=0;k<arrayList_size(manager->psaList);k++){
-						/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
-						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-						psa->removeSubscription(psa->admin,sub);
-					}
+			celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry,
+												 pstm_setupTopicReceiverCallback);
 
-				}
-				arrayList_remove(sub_list_by_topic,j);
+			if (entry->endpoint != NULL) {
+				entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+				entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+				entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
 
-				/* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */
-				if(arrayList_size(sub_list_by_topic)==0){
-					for(k=0;k<arrayList_size(manager->psaList);k++){
-						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-						psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-					}
+				//announce new endpoint through the network
+				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+				for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+					pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+					listener->announceEndpoint(listener->handle, entry->endpoint);
 				}
-
-				pubsubEndpoint_destroy(sub);
-
+				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+				//store topic receiver.
+				//TODO race condition if multiple scope/topic combinations are request -> broader the lock?
+				celixThreadMutex_lock(&manager->topicReceivers.mutex);
+				hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
+				celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+			} else {
+				free(entry->scopeAndTopicKey);
+				free(entry);
+				//ignore -> psa unregistered in meantime
 			}
 		}
-
-		celixThreadMutex_unlock(&manager->psaListLock);
-		celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-		pubsubEndpoint_destroy(subcmp);
-
 	}
 }
 
-void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, const celix_properties_t *props) {
-	pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
-	pubsub_announce_endpoint_listener_t *disc = svc;
+void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
+	pubsub_topology_manager_t *manager = handle;
+	//NOTE local subscriber service unregister
+	//1) Find topic receiver and decrease count
 
-	const char* fwUUID = NULL;
+	const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+	const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
 
-	bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-	if(fwUUID==NULL){
-		printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
+	if (topic == NULL) {
 		return;
 	}
 
-	celixThreadMutex_lock(&manager->publicationsLock);
+	char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+	celixThreadMutex_lock(&manager->topicReceivers.mutex);
+	pstm_topic_receiver_or_sender_entry_t *entry = hashMap_remove(manager->topicReceivers.map, scopeAndTopicKey);
+	if (entry != NULL) {
+		entry->usageCount -= 0;
+	}
+	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+	free(scopeAndTopicKey);
 
-	celixThreadMutex_lock(&manager->discoveryListLock);
-	long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
-	hashMap_put(manager->discoveryList, (void*)svcId, NULL);
-	celixThreadMutex_unlock(&manager->discoveryListLock);
-
-	hash_map_iterator_pt iter = hashMapIterator_create(manager->publications);
-	while(hashMapIterator_hasNext(iter)){
-		array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
-		for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
-			pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-			if( (strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0)) {
-				disc->announceEndpoint(disc->handle,pubEP->properties);
-			}
+	//NOTE not waking up psaHandling thread, topic receiver does not need to be removed immediately.
+}
+
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+	pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
+	pubsub_announce_endpoint_listener_t *listener = svc;
+
+	//1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
+	//2) Add listener to manager->announceEndpointListeners
+
+	celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+	hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
+	while (hashMapIterator_hasNext(&iter)) {
+		celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
+		for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
+			celix_properties_t *ep = celix_arrayList_get(endpoints, i);
+			listener->announceEndpoint(listener->handle, ep);
 		}
 	}
-	hashMapIterator_destroy(iter);
-
-	celixThreadMutex_unlock(&manager->publicationsLock);
+	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
 
-	celixThreadMutex_lock(&manager->subscriptionsLock);
-	iter = hashMapIterator_create(manager->subscriptions);
+	celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+	celix_arrayList_add(manager->announceEndpointListeners.list, listener);
+	celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+}
 
-	while(hashMapIterator_hasNext(iter)) {
-		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
-		unsigned int i;
-		for(i=0;i<arrayList_size(l);i++){
-			pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
 
-			disc->announceEndpoint(disc->handle, subEp->properties);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+	pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
+	pubsub_announce_endpoint_listener_t *listener = svc;
+
+	//1) Remove listener from manager->announceEndpointListeners
+	//2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints)
+
+	celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+	celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
+	celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+	celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+	hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
+	while (hashMapIterator_hasNext(&iter)) {
+		celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
+		for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
+			celix_properties_t *ep = celix_arrayList_get(endpoints, i);
+			listener->removeEndpoint(listener->handle, ep);
 		}
 	}
-	hashMapIterator_destroy(iter);
-	celixThreadMutex_unlock(&manager->subscriptionsLock);
+	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
 }
 
 
-void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, const celix_properties_t *props) {
-	pubsub_topology_manager_t *manager = handle;
+static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
+	pstm_topic_receiver_or_sender_entry_t *entry = handle;
+	pubsub_admin_service_t *psa = svc;
+	psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+}
 
-	celixThreadMutex_lock(&manager->discoveryListLock);
+void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
+	pubsub_topology_manager_t *manager = handle;
 
+	//NOTE new local subscriber service register
+	//1) First trying to see if a TopicReceiver already exists for this subscriber, if found
+	//2) update the usage count. if not found
+	//3) Try to find a matching psa and create a new TopicReceiver.
 
-	long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
-	if (hashMap_remove(manager->discoveryList, (void*)svcId)) {
-		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed");
+	//TODO FIXME
+	if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) {
+	    logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Bug. trackServiceTracker should only trigger for %s. Now triggering on %s", PUBSUB_PUBLISHER_SERVICE_NAME, info->serviceName);
+	    return;
 	}
 
-	celixThreadMutex_unlock(&manager->discoveryListLock);
-}
-
-static void tm_callAnnounce(void *handle, void *svc) {
-    pubsub_endpoint_t *pub = handle;
-    pubsub_announce_endpoint_listener_t *listener = svc;
-    listener->announceEndpoint(listener->handle, pub->properties);
-}
-
-void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
-	pubsub_topology_manager_t *manager = handle;
 
-	pubsub_endpoint_pt pub = NULL;
-	celix_status_t status = pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pub);
-	if (status == CELIX_SUCCESS) {
+	char *topic = NULL;
+	char *scopeFromFilter = NULL;
+	pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
+	const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
 
-		celixThreadMutex_lock(&manager->publicationsLock);
-		char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-		array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
-		if(pub_list_by_topic==NULL){
-			arrayList_create(&pub_list_by_topic);
-			hashMap_put(manager->publications,pub_key,pub_list_by_topic);
-		} else {
-		    free(pub_key);
-        }
-		arrayList_add(pub_list_by_topic,pub);
-
-		celixThreadMutex_unlock(&manager->publicationsLock);
-
-		unsigned int j;
-		double score = 0;
-		double best_score = 0;
-		pubsub_admin_service_pt best_psa = NULL;
-		celixThreadMutex_lock(&manager->psaListLock);
-
-		int size = celix_arrayList_size(manager->psaList);
-		for (j=0; j<size; j++) {
-			pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-			psa->matchEndpoint(psa->admin,pub,&score);
-			if(score>best_score){ /* We have a new winner! */
-				best_score = score;
-				best_psa = psa;
-			}
+	char *scopeAndTopicKey = NULL;
+	if (topic == NULL) {
+		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+		        "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.", PUBSUB_SUBSCRIBER_TOPIC);
+	} else {
+		scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+		celixThreadMutex_lock(&manager->topicSenders.mutex);
+		pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+		if (entry != NULL) {
+			entry->usageCount += 1;
 		}
+		celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+		if (entry == NULL) {
+			//new topic receiver needed, requesting match with current psa
+			double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+			long serializerSvcId = -1L;
+			long selectedPsasvcId = -1L;
+
+			celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+			hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
+			while (hashMapIterator_hasNext(&iter)) {
+				hash_map_entry_t *entry = hashMapIterator_nextEntry(&iter);
+				long svcId = (long)hashMapEntry_getKey(entry);
+				pubsub_admin_service_t *psa = hashMapEntry_getValue(entry);
+				double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+				long serSvcId = -1L;
+				psa->matchPublisher(psa->handle, info->bundleId, info->filter, &score, &serSvcId);
+				if (score > highestScore) {
+					highestScore = score;
+					serializerSvcId = serSvcId;
+					selectedPsasvcId = svcId;
+				}
+			}
+			celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+			if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+				entry = calloc(1, sizeof(*entry));
+				entry->scopeAndTopicKey = scopeAndTopicKey;
+				entry->usageCount = 1;
+				entry->selectedPsaSvcId = selectedPsasvcId;
+				entry->selectedSerializerSvcId = serializerSvcId;
+				entry->topic = topic; //NOTE tmp
+				entry->scope = scope; //NOTE tmp
+
+				celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
+
+				if (entry->endpoint != NULL) {
+					//note psa->setupTopicSender has created the endpoint.
+					entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+					entry->topic = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+					entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+				} else {
+					free(entry->scopeAndTopicKey);
+					free(entry);
+					entry = NULL;
+					//ignore -> psa unregistered in meantime
+				}
 
-		if (best_psa != NULL && best_score > 0) {
-			celix_status_t status = best_psa->addPublication(best_psa->admin,pub);
-			if(status==CELIX_SUCCESS){
-				celixThreadMutex_lock(&manager->discoveryListLock);
-				hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveryList);
-				while(hashMapIterator_hasNext(&iter)) {
-				    long svcId = (long)hashMapIterator_nextKey(&iter);
-				    celix_bundleContext_useServiceWithId(manager->context, svcId, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, pub, tm_callAnnounce);
+				if (entry != NULL) {
+					//announce new endpoint through the network
+					celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+					for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+						pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+						listener->announceEndpoint(listener->handle, entry->endpoint);
+					}
+					celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+					//store topic sender.
+					celixThreadMutex_lock(&manager->topicSenders.mutex);
+					hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
+					celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+					const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+					const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+					logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+								  "[PSTM] setting up new TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n",
+								  entry->scope, entry->topic, adminType, serType);
 				}
-				celixThreadMutex_unlock(&manager->discoveryListLock);
 			}
+		} else {
+			free(scopeAndTopicKey);
 		}
-
-		celixThreadMutex_unlock(&manager->psaListLock);
-
 	}
+	free(scopeFromFilter);
 }
 
-
 void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) {
 	pubsub_topology_manager_t *manager = handle;
 
-	pubsub_endpoint_pt pubcmp = NULL;
-	if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pubcmp) == CELIX_SUCCESS){
-		unsigned int j,k;
-		celixThreadMutex_lock(&manager->psaListLock);
-		celixThreadMutex_lock(&manager->publicationsLock);
-
-		char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-		array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
-		if(pub_list_by_topic!=NULL){
-			for(j=0;j<arrayList_size(pub_list_by_topic);j++){
-				pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
-				if(pubsubEndpoint_equals(pub,pubcmp)){
-					for(k=0;k<arrayList_size(manager->psaList);k++){
-						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-						celix_status_t  status = psa->removePublication(psa->admin,pub);
-						if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
-							celixThreadMutex_lock(&manager->discoveryListLock);
-							hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-							while(hashMapIterator_hasNext(iter)){
-								service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-								pubsub_announce_endpoint_listener_t *disc = NULL;
-								bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-								disc->removeEndpoint(disc->handle,pub->properties);
-								bundleContext_ungetService(manager->context, disc_sr, NULL);
-							}
-							hashMapIterator_destroy(iter);
-							celixThreadMutex_unlock(&manager->discoveryListLock);
-						}
-						else if(status ==  CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */
-							status = CELIX_SUCCESS;
-						}
-					}
-					//}
-					arrayList_remove(pub_list_by_topic,j);
-
-					/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */
-					if(arrayList_size(pub_list_by_topic)==0){
-						for(k=0;k<arrayList_size(manager->psaList);k++){
-							pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-							psa->closeAllPublications(psa->admin, (char*) properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-						}
-					}
-
-					pubsubEndpoint_destroy(pub);
-				}
+	//NOTE local subscriber service unregister
+	//1) Find topic sender and decrease count
 
-			}
-		}
+    //TODO FIXME
+    if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) {
+        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Bug. trackServiceTracker should only trigger for %s. Now triggering on %s", PUBSUB_PUBLISHER_SERVICE_NAME, info->serviceName);
+        return;
+    }
 
-		celixThreadMutex_unlock(&manager->publicationsLock);
-		celixThreadMutex_unlock(&manager->psaListLock);
+	char *topic = NULL;
+	char *scopeFromFilter = NULL;
+	pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
+	const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
 
-		free(pub_key);
+	if (topic == NULL) {
+		free(scopeFromFilter);
+		return;
+	}
 
-		pubsubEndpoint_destroy(pubcmp);
 
+	char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+	celixThreadMutex_lock(&manager->topicSenders.mutex);
+	pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+	if (entry != NULL) {
+		entry->usageCount -= 1;
 	}
+	celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+	free(scopeAndTopicKey);
+	free(scopeFromFilter);
+}
+
+static void pstm_addEndpointCallback(void *handle, void *svc) {
+	celix_properties_t *endpoint = handle;
+	pubsub_admin_service_t *psa = svc;
+	psa->addEndpoint(psa->handle, endpoint);
 }
 
-static celix_status_t pubsub_topologyManager_addDiscoveredPublisher(void *handle, const celix_properties_t *pubProperties){
+celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint){
 	celix_status_t status = CELIX_SUCCESS;
     pubsub_topology_manager_t *manager = handle;
 
-    const char *topic = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-	const char *scope = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-	const char *fwUid = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL);
-	const char *uuid = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_UUID, NULL);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+    assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
 
+    // 1) See if endpoint is already discovered, if so increase usage count.
+    // 1) If not, find matching psa using the matchEndpoint
+    // 2) if found call addEndpoint of the matching psa
 
-    if (manager->verbose) {
-        printf("PSTM: New publisher discovered for scope/topic %s/%s [fwUUID=%s, epUUID=%s]\n",
-               scope, topic, fwUid, uuid);
-    }
-
-
-	celixThreadMutex_lock(&manager->psaListLock);
-	celixThreadMutex_lock(&manager->publicationsLock);
+	if (manager->verbose) {
+		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+					  "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+					  uuid);
+	}
 
-	char *pub_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
-	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
-	if(pub_list_by_topic==NULL){
-		arrayList_create(&pub_list_by_topic);
-		hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+	pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
+	if (entry != NULL) {
+		//already existing endpoint -> increase usage
+		entry->usageCount += 1;
 	}
-	free(pub_key);
-
-	/* Shouldn't be any other duplicate, since it's filtered out by the discovery */
-	pubsub_endpoint_pt p = NULL;
-	pubsubEndpoint_createFromProperties(pubProperties, &p);
-	arrayList_add(pub_list_by_topic , p);
-
-	unsigned int j;
-	double score = 0;
-	double best_score = 0;
-	pubsub_admin_service_pt best_psa = NULL;
-
-	for(j=0;j<arrayList_size(manager->psaList);j++){
-		pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-		psa->matchEndpoint(psa->admin , p, &score);
-		if (score>best_score) { /* We have a new winner! */
-			best_score = score;
-			best_psa = psa;
+	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+
+	if (entry == NULL) {
+
+		//new endpoint -> new entry
+		entry = calloc(1, sizeof(*entry));
+		entry->usageCount = 1;
+		entry->endpoint = celix_properties_copy(endpoint);
+		entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+		entry->selectedPsaSvcId = -1L;
+
+		double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+		long psaSvcId = -1L;
+
+		celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+		hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
+		while (hashMapIterator_hasNext(&iter)) {
+			hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter);
+			pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+			long svcId = (long) hashMapEntry_getKey(mapEntry);
+			double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+			psa->matchEndpoint(psa->handle, endpoint, &score);
+			if (score > highestScore) {
+				highestScore = score;
+				psaSvcId = svcId;
+			}
+		}
+		celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+		if (psaSvcId >= 0) {
+			//psa called outside of mutex, this means the it can happen that addEndpointCallback is not called.
+			//for now this is expected behaviour;
+			//You need to start the pubsub admin stuff before the bundles using pubsub.
+			celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+												 (void *) endpoint, pstm_addEndpointCallback);
+		} else {
+			logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
 		}
-	}
 
-	if(best_psa != NULL && best_score>0) {
-        //TODO FIXME this the same call as used by publisher of service trackers. This is confusing.
-        //remote discovered publication can be handle different.
-		best_psa->addPublication(best_psa->admin,p);
-	}
-	else{
-		status = CELIX_ILLEGAL_STATE;
+		entry->selectedPsaSvcId = psaSvcId;
+		celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+		hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry);
+		celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
 	}
 
-	celixThreadMutex_unlock(&manager->publicationsLock);
-	celixThreadMutex_unlock(&manager->psaListLock);
+    return status;
+}
 
-	return status;
+static void pstm_removeEndpointCallback(void *handle, void *svc) {
+	celix_properties_t *endpoint = handle;
+	pubsub_admin_service_t *psa = svc;
+	psa->removeEndpoint(psa->handle, endpoint);
 }
 
-static celix_status_t pubsub_topologyManager_removeDiscoveredPublisher(void *handle, const celix_properties_t *props) {
+celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_topology_manager_t *manager = handle;
 
-    if (manager->verbose) {
-        printf("PSTM: Publisher removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
-			   celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
-               celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
-			   celix_properties_get(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
-			   celix_properties_get(props, PUBSUB_ENDPOINT_UUID, NULL));
-    }
+	// 1) See if endpoint is already discovered, if so decrease usage count.
+	// 1) If usage count becomes 0, find matching psa using the matchEndpoint
+	// 2) if found call disconnectEndpoint of the matching psa
 
-	celixThreadMutex_lock(&manager->psaListLock);
-	celixThreadMutex_lock(&manager->publicationsLock);
-	unsigned int i;
+	const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+	assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
 
-	char *pub_key = pubsubEndpoint_createScopeTopicKey(celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_NAME, NULL));
-	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
-	if(pub_list_by_topic==NULL){
-		printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,celix_properties_get(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),celix_properties_get(props, "pubsub.url", NULL));
+	if (manager->verbose) {
+		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+					  "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+					  uuid);
 	}
-	else{
-
-		pubsub_endpoint_pt p = NULL;
-		bool found = false;
 
-		for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
-			p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
-			found = pubsubEndpoint_equalsWithProperties(p,props);
+	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+	pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
+	if (entry != NULL) {
+		//already existing endpoint -> decrease usage
+		entry->usageCount-= 1;
+		if (entry->usageCount <= 0) {
+			hashMap_remove(manager->discoveredEndpoints.map, entry->uuid);
+		} else {
+			entry = NULL; //still used (usage count > 0) -> do nothing
+		}
+	}
+	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+
+	if (entry != NULL) {
+		//note entry is removed from manager->discoveredEndpoints, also inform used psa
+		if (entry->selectedPsaSvcId >= 0) {
+			//note that it is possible that the psa is already gone, in that case the call is also not needed anymore.
+			celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+												 (void *) endpoint, pstm_removeEndpointCallback);
+		} else {
+			logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid);
 		}
+		celix_properties_destroy(entry->endpoint);
+		free(entry);
+	}
 
-		if(found && p !=NULL){
 
-			for(i=0;i<arrayList_size(manager->psaList);i++){
-				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-				/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
-				psa->removePublication(psa->admin,p);
-			}
+	return CELIX_SUCCESS;
+}
 
-			arrayList_removeElement(pub_list_by_topic,p);
 
-			/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket */
-			if(arrayList_size(pub_list_by_topic)==0){
+static void pstm_teardownTopicSenderCallback(void *handle, void *svc) {
+    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    pubsub_admin_service_t *psa = svc;
+    psa->teardownTopicSender(psa->handle, entry->scope, entry->topic);
+}
 
-				for(i=0;i<arrayList_size(manager->psaList);i++){
-					pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-					psa->closeAllPublications(psa->admin, (char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), (char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL));
-				}
-			}
+static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+
+        if (entry != NULL && entry->usageCount <= 0) {
+            hashMapIterator_remove(&iter);
+            if (manager->verbose) {
+                const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+                const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+                              "[PSTM] Tearing down TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n",
+                              entry->scope, entry->topic, adminType, serType);
+            }
+
+            celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+            for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                pubsub_announce_endpoint_listener_t *listener;
+                listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                listener->removeEndpoint(listener->handle, entry->endpoint);
+            }
+            celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+            celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+                                                 entry, pstm_teardownTopicSenderCallback);
+
+
+            //cleanup entry
+            free(entry->scopeAndTopicKey);
+            celix_properties_destroy(entry->endpoint);
+            free(entry);
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
+}
 
-			pubsubEndpoint_destroy(p);
-		}
+static void pstm_teardownTopicReceiverCallback(void *handle, void *svc) {
+    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    pubsub_admin_service_t *psa = svc;
+    psa->teardownTopicSender(psa->handle, entry->scope, entry->topic);
+}
+
+static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->usageCount <= 0) {
+            hashMapIterator_remove(&iter);
+
+            if (manager->verbose) {
+                const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+                const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+                              "[PSTM] Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n",
+                              entry->scope, entry->topic, adminType, serType);
+            }
+
+            celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_teardownTopicReceiverCallback);
+            celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+            for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                listener->removeEndpoint(listener->handle, entry->endpoint);
+            }
+            celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+            //cleanup entry
+            free(entry->scopeAndTopicKey);
+            celix_properties_destroy(entry->endpoint);
+            free(entry);
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+}
 
+static void* pstm_psaHandlingThread(void *data) {
+    pubsub_topology_manager_t *manager = data;
 
-	}
-	free(pub_key);
-	celixThreadMutex_unlock(&manager->publicationsLock);
-	celixThreadMutex_unlock(&manager->psaListLock);
+    celixThreadMutex_lock(&manager->psaHandling.mutex);
+    bool running = manager->psaHandling.running;
+    celixThreadMutex_unlock(&manager->psaHandling.mutex);
 
+    while (running) {
+        pstm_teardownTopicSenders(manager);
+        pstm_teardownTopicReceivers(manager);
 
-	return CELIX_SUCCESS;
+        celixThreadMutex_lock(&manager->psaHandling.mutex);
+        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_CLEANUP_SLEEPTIME_IN_SECONDS, 0L);
+        running = manager->psaHandling.running;
+        celixThreadMutex_unlock(&manager->psaHandling.mutex);
+    }
+    return NULL;
 }
 
-celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *properties) {
-	const char *type = celix_properties_get(properties, PUBSUB_ENDPOINT_TYPE, NULL);
-	if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-		return pubsub_topologyManager_addDiscoveredPublisher(handle, properties);
-	} else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) {
-		//nop //TODO add subscription to pubsub admins
-	} else {
-		fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint type (key: %s)\n", PUBSUB_ENDPOINT_TYPE);
-	}
-	return CELIX_SUCCESS;
-}
 
-celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *properties) {
-	const char *type = celix_properties_get(properties, PUBSUB_ENDPOINT_TYPE, NULL);
-	if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-		pubsub_topologyManager_removeDiscoveredPublisher(handle, properties);
-	} else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) {
-		//nop //TODO remove subscription from pubsub admins
-	} else {
-		fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint type (key: %s)\n", PUBSUB_ENDPOINT_TYPE);
+celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
+	pubsub_topology_manager_t *manager = handle;
+	//TODO add support for searching based on scope and topic
+
+	fprintf(os, "\n");
+
+	fprintf(os, "Discovered Endpoints: \n");
+	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+	hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+	while (hashMapIterator_hasNext(&iter)) {
+		pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter);
+		const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+		const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
+		const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+		const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+		fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
+		fprintf(os, "   |- scope       = %s\n", scope);
+		fprintf(os, "   |- topic       = %s\n", topic);
+		fprintf(os, "   |- admin type  = %s\n", adminType);
+		fprintf(os, "   |- serializer  = %s\n", serType);
+		if (manager->verbose) {
+			fprintf(os, "   |- psa svc id  = %li\n", discovered->selectedPsaSvcId);
+			fprintf(os, "   |- usage count = %i\n", discovered->usageCount);
+		}
+	}
+	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+	fprintf(os,"\n");
+
+
+	fprintf(os, "Active Topic Senders:\n");
+	celixThreadMutex_lock(&manager->topicSenders.mutex);
+	iter = hashMapIterator_construct(manager->topicSenders.map);
+	while (hashMapIterator_hasNext(&iter)) {
+		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+		const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+		const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+		fprintf(os, "|- Topic Sender for endpoint %s:\n", entry->endpointUUID);
+		fprintf(os, "   |- scope       = %s\n", entry->scope);
+		fprintf(os, "   |- topic       = %s\n", entry->topic);
+		fprintf(os, "   |- admin type  = %s\n", adminType);
+		fprintf(os, "   |- serializer  = %s\n", serType);
+		if (manager->verbose) {
+			fprintf(os, "   |- psa svc id  = %li\n", entry->selectedPsaSvcId);
+			fprintf(os, "   |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
+			fprintf(os, "   |- usage count = %i\n", entry->usageCount);
+		}
 	}
+	celixThreadMutex_unlock(&manager->topicSenders.mutex);
+	fprintf(os,"\n");
+
+	fprintf(os, "Active Topic Receivers:\n");
+	celixThreadMutex_lock(&manager->topicReceivers.mutex);
+	iter = hashMapIterator_construct(manager->topicReceivers.map);
+	while (hashMapIterator_hasNext(&iter)) {
+		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+		const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+		const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+		fprintf(os, "|- Topic Receiver for endpoint %s:\n", entry->endpointUUID);
+		fprintf(os, "   |- scope       = %s\n", entry->scope);
+		fprintf(os, "   |- topic       = %s\n", entry->topic);
+		fprintf(os, "   |- admin type  = %s\n", adminType);
+		fprintf(os, "   |- serializer  = %s\n", serType);
+		if (manager->verbose) {
+			fprintf(os, "    |- psa svc id  = %li\n", entry->selectedPsaSvcId);
+			fprintf(os, "    |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
+			fprintf(os, "    |- usage count = %i\n", entry->usageCount);
+		}
+	}
+	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+	fprintf(os,"\n");
+
 	return CELIX_SUCCESS;
 }
-
-

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index dda84a0..105b797 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -36,31 +36,68 @@
 #define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE		false
 
 
-struct pubsub_topology_manager {
+typedef struct pubsub_topology_manager {
 	bundle_context_pt context;
 
-	celix_thread_mutex_t psaListLock;
-	array_list_pt psaList;
-
-	celix_thread_mutex_t discoveryListLock;
-	hash_map_pt discoveryList; //<svcId,NULL>
-
-	celix_thread_mutex_t publicationsLock;
-	hash_map_pt publications; //<topic(string),list<pubsub_ep>>
-
-	celix_thread_mutex_t subscriptionsLock;
-	hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
-
-	command_service_t shellCmdService;
-	service_registration_pt  shellCmdReg;
-
+	struct {
+		celix_thread_mutex_t mutex;
+		hash_map_t *map; //key = svcId, value = pubsub_admin_t*
+	} pubsubadmins;
+
+	struct {
+		celix_thread_mutex_t mutex;
+		hash_map_t *map; //key = psa svc id, value = list<celix_properties_t /*endpoint*/>
+	} announcedEndpoints;
+
+	struct {
+		celix_thread_mutex_t mutex;
+		hash_map_t *map; //key = uuid , value = pstm_discovered_endpoint_entry_t
+	} discoveredEndpoints;
+
+	struct {
+		celix_thread_mutex_t mutex;
+		hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t*
+	} topicReceivers;
+
+	struct {
+		celix_thread_mutex_t mutex;
+		hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t*
+	} topicSenders;
+
+	struct {
+		celix_thread_mutex_t mutex;
+		celix_array_list_t *list; //<pubsub_announce_endpoint_listener_t*>
+	} announceEndpointListeners;
+
+	struct {
+		celix_thread_t thread;
+		celix_thread_mutex_t mutex; //protect running and condition
+		celix_thread_cond_t cond;
+		bool running;
+	} psaHandling;
 
 	log_helper_pt loghelper;
 
 	bool verbose;
-};
-
-typedef struct pubsub_topology_manager pubsub_topology_manager_t;
+} pubsub_topology_manager_t;
+
+typedef struct pstm_discovered_endpoint_entry {
+	const char *uuid;
+	long selectedPsaSvcId;
+	int usageCount; //note that discovered endpoints can be found multiple times by different pubsub discovery components
+	celix_properties_t *endpoint;
+} pstm_discovered_endpoint_entry_t;
+
+typedef struct pstm_topic_receiver_or_sender_entry {
+	char *scopeAndTopicKey; //key of the combined value of the scope and topic
+	celix_properties_t *endpoint;
+	const char *topic;
+	const char *scope;
+	const char *endpointUUID;
+	int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
+	long selectedPsaSvcId;
+	long selectedSerializerSvcId;
+} pstm_topic_receiver_or_sender_entry_t;
 
 celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager);
 celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager);
@@ -69,8 +106,8 @@ celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_t *ma
 void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props);
 void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const celix_properties_t *props);
 
-void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, const celix_properties_t *props);
-void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, const celix_properties_t *props);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, void *svc, const celix_properties_t *props);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle, void *svc, const celix_properties_t *props);
 
 void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
 void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
@@ -81,4 +118,6 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
 celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *properties);
 celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *properties);
 
+celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream);
+
 #endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/include/celix_api.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_api.h b/libs/framework/include/celix_api.h
index 5129a65..b941a78 100644
--- a/libs/framework/include/celix_api.h
+++ b/libs/framework/include/celix_api.h
@@ -21,32 +21,26 @@
 #define CELIX_CELIX_API_H_
 
 #include "properties.h"
-
 #include "array_list.h"
-#include "celix_array_list.h"
-
 #include "constants.h"
+#include "bundle.h"
+#include "bundle_context.h"
+#include "framework.h"
 
+#include "celix_properties.h"
+#include "celix_array_list.h"
+//#include "celix_constants.h"
 #include "celix_utils_api.h"
-
-#include "bundle.h"
 #include "celix_bundle.h"
-
-#include "bundle_context.h"
 #include "celix_bundle_context.h"
 
-#include "service_registration.h"
-#include "service_factory.h"
-#include "service_reference.h"
-#include "service_tracker.h"
-#include "service_tracker_customizer.h"
-#include "listener_hook_service.h"
-
-#include "framework.h"
+#include "celix_framework.h"
 #include "celix_framework_factory.h"
 #include "celix_launcher.h"
 
 #include "dm_dependency_manager.h"
 #include "dm_service_dependency.h"
 
+#include "celix_bundle_activator.h"
+
 #endif //CELIX_CELIX_API_H_

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/include/celix_bundle_context.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_bundle_context.h b/libs/framework/include/celix_bundle_context.h
index 5bb6faa..76af9a5 100644
--- a/libs/framework/include/celix_bundle_context.h
+++ b/libs/framework/include/celix_bundle_context.h
@@ -357,6 +357,7 @@ typedef struct celix_service_tracker_options {
     .filter.versionRange = NULL, \
     .filter.filter = NULL, \
     .filter.serviceLanguage = NULL, \
+    .filter.ignoreServiceLanguage = false, \
     .callbackHandle = NULL, \
     .set = NULL, \
     .add = NULL, \
@@ -797,6 +798,16 @@ const char* celix_bundleContext_getProperty(celix_bundle_context_t *ctx, const c
 long celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const char *key, long defaultValue);
 
 /**
+ * Gets the config property as converts it to double. If the property is not a valid double, the defaultValue will be returned.
+ * The rest of the behaviour is the same as celix_bundleContext_getProperty.
+
+ * @param key The key of the property to receive.
+ * @param defaultVal The default value to use if the property is not found.
+ * @return The property value for the provided key or the provided defaultValue is the key is not found.
+ */
+double celix_bundleContext_getPropertyAsDouble(celix_bundle_context_t *ctx, const char *key, double defaultValue);
+
+/**
  * Gets the config property as converts it to bool. If the property is not a valid bool, the defaultValue will be returned.
  * The rest of the behaviour is the same as celix_bundleContext_getProperty.
 

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/src/bundle_context.c
----------------------------------------------------------------------
diff --git a/libs/framework/src/bundle_context.c b/libs/framework/src/bundle_context.c
index b8b1552..1e86f2a 100644
--- a/libs/framework/src/bundle_context.c
+++ b/libs/framework/src/bundle_context.c
@@ -1038,6 +1038,20 @@ long celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const ch
     return result;
 }
 
+double celix_bundleContext_getPropertyAsDouble(celix_bundle_context_t *ctx, const char *key, double defaultValue) {
+    double result = defaultValue;
+    const char *val = celix_bundleContext_getProperty(ctx, key, NULL);
+    if (val != NULL) {
+        char *enptr = NULL;
+        errno = 0;
+        double r = strtod(val, &enptr);
+        if (enptr != val && errno == 0) {
+            result = r;
+        }
+    }
+    return result;
+}
+
 
 bool celix_bundleContext_getPropertyAsBool(celix_bundle_context_t *ctx, const char *key, bool defaultValue) {
     bool result = defaultValue;

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/src/framework.c
----------------------------------------------------------------------
diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c
index 4059e70..b0d505d 100644
--- a/libs/framework/src/framework.c
+++ b/libs/framework/src/framework.c
@@ -2822,7 +2822,7 @@ void celix_framework_useBundle(framework_t *fw, bool onlyActive, long bundleId,
         bundle_t *bnd = framework_getBundleById(fw, bundleId);
         if (bnd != NULL) {
             celix_bundle_state_e bndState = celix_bundle_getState(bnd);
-            if (onlyActive && bndState == OSGI_FRAMEWORK_BUNDLE_ACTIVE) {
+            if (onlyActive && (bndState == OSGI_FRAMEWORK_BUNDLE_ACTIVE || bndState == OSGI_FRAMEWORK_BUNDLE_STARTING)) {
                 use(callbackHandle, bnd);
             } else if (!onlyActive) {
                 use(callbackHandle, bnd);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/include/celix_properties.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_properties.h b/libs/utils/include/celix_properties.h
index e325222..f6e871b 100644
--- a/libs/utils/include/celix_properties.h
+++ b/libs/utils/include/celix_properties.h
@@ -60,12 +60,16 @@ void celix_properties_unset(celix_properties_t *properties, const char *key);
 celix_properties_t* celix_properties_copy(const celix_properties_t *properties);
 
 long celix_properties_getAsLong(const celix_properties_t *props, const char *key, long defaultValue);
-
 void celix_properties_setLong(celix_properties_t *props, const char *key, long value);
 
 bool celix_properties_getAsBool(celix_properties_t *props, const char *key, bool defaultValue);
 void celix_properties_setBool(celix_properties_t *props, const char *key, bool val);
 
+
+void celix_properties_setDouble(celix_properties_t *props, const char *key, double val);
+double celix_properties_getAsDouble(const celix_properties_t *props, const char *key, double defaultValue);
+
+
 #define CELIX_PROPERTIES_FOR_EACH(props, key) \
     for(hash_map_iterator_t iter = hashMapIterator_construct(props); \
         hashMapIterator_hasNext(&iter), (key) = (const char*)hashMapIterator_nextKey(&iter);)

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/include/celix_threads.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_threads.h b/libs/utils/include/celix_threads.h
index a9a3049..bf6e1f6 100644
--- a/libs/utils/include/celix_threads.h
+++ b/libs/utils/include/celix_threads.h
@@ -54,6 +54,11 @@ static const celix_thread_t celix_thread_default = {0, 0};
 celix_status_t
 celixThread_create(celix_thread_t *new_thread, celix_thread_attr_t *attr, celix_thread_start_t func, void *data);
 
+/**
+ * If supported by the platform sets the name of the thread.
+ */
+void celixThread_setName(celix_thread_t *thread, const char *threadName);
+
 void celixThread_exit(void *exitStatus);
 
 celix_status_t celixThread_detach(celix_thread_t thread);
@@ -123,7 +128,7 @@ celix_status_t celixThreadCondition_destroy(celix_thread_cond_t *condition);
 
 celix_status_t celixThreadCondition_wait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex);
 
-celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds);
+celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds);
 
 celix_status_t celixThreadCondition_broadcast(celix_thread_cond_t *cond);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/src/celix_threads.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/celix_threads.c b/libs/utils/src/celix_threads.c
index d8a8091..7bfc37d 100644
--- a/libs/utils/src/celix_threads.c
+++ b/libs/utils/src/celix_threads.c
@@ -24,6 +24,7 @@
  *  \copyright  Apache License, Version 2.0
  */
 #include <stdlib.h>
+#include <sys/time.h>
 #include "signal.h"
 #include "celix_threads.h"
 
@@ -41,6 +42,16 @@ celix_status_t celixThread_create(celix_thread_t *new_thread, celix_thread_attr_
 	return status;
 }
 
+#ifdef _GNU_SOURCE
+void celixThread_setName(celix_thread_t *thread, const char *threadName) {
+	pthread_setname_np(thread->thread, threadName);
+}
+#else
+void celixThread_setName(celix_thread_t *thread __attribute__((unused)), const char *threadName  __attribute__((unused))); {
+	//nop
+}
+#endif
+
 // Returns void, since pthread_exit does exit the thread and never returns.
 void celixThread_exit(void *exitStatus) {
     pthread_exit(exitStatus);
@@ -143,10 +154,11 @@ celix_status_t celixThreadCondition_wait(celix_thread_cond_t *cond, celix_thread
     return pthread_cond_wait(cond, mutex);
 }
 
-celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) {
+celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) {
 	struct timespec time;
-	time.tv_sec = seconds;
-	time.tv_nsec = nanoseconds;
+	clock_gettime(CLOCK_REALTIME, &time);
+	time.tv_sec += seconds;
+	time.tv_nsec += nanoseconds;
 	return pthread_cond_timedwait(cond, mutex, &time);
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/src/properties.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/properties.c b/libs/utils/src/properties.c
index 480a056..191dfd8 100644
--- a/libs/utils/src/properties.c
+++ b/libs/utils/src/properties.c
@@ -394,13 +394,37 @@ long celix_properties_getAsLong(const celix_properties_t *props, const char *key
 }
 
 void celix_properties_setLong(celix_properties_t *props, const char *key, long value) {
-	char buf[32]; //should be enough to store long long int
-	int writen = snprintf(buf, 32, "%li", value);
-	if (writen <= 31) {
-		celix_properties_set(props, key, buf);
-	} else {
-		fprintf(stderr,"buf to small for value '%li'\n", value);
+    char buf[32]; //should be enough to store long long int
+    int writen = snprintf(buf, 32, "%li", value);
+    if (writen <= 31) {
+        celix_properties_set(props, key, buf);
+    } else {
+        fprintf(stderr,"buf to small for value '%li'\n", value);
+    }
+}
+
+double celix_properties_getAsDouble(const celix_properties_t *props, const char *key, double defaultValue) {
+	double result = defaultValue;
+	const char *val = celix_properties_get(props, key, NULL);
+	if (val != NULL) {
+		char *enptr = NULL;
+		errno = 0;
+		double r = strtod(val, &enptr);
+		if (enptr != val && errno == 0) {
+			result = r;
+		}
 	}
+	return result;
+}
+
+void celix_properties_setDouble(celix_properties_t *props, const char *key, double val) {
+    char buf[32]; //should be enough to store long long int
+    int writen = snprintf(buf, 32, "%f", val);
+    if (writen <= 31) {
+        celix_properties_set(props, key, buf);
+    } else {
+        fprintf(stderr,"buf to small for value '%f'\n", val);
+    }
 }
 
 bool celix_properties_getAsBool(celix_properties_t *props, const char *key, bool defaultValue) {