You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/11/06 11:44:43 UTC

[1/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.

Repository: celix
Updated Branches:
  refs/heads/feature/CELIX-454-pubsub-disc [created] b2548c845


http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index bfd038b..aff03c7 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -40,66 +40,66 @@
 
 #define PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS       30L
 
-static void* pstm_psaHandlingThread(void *data);
+static void *pstm_psaHandlingThread(void *data);
 
 celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **out) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager));
-	if (manager == NULL) {
-		*out = NULL;
-		return CELIX_ENOMEM;
-	} else {
-		*out = manager;
-	}
-
-	manager->context = context;
-
-	celix_thread_mutexattr_t psaAttr;
-	celixThreadMutexAttr_create(&psaAttr);
-	celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-	status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
-	celixThreadMutexAttr_destroy(&psaAttr);
-
-	status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
-	status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
-	status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
-	status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
-	status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
-
-	status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
-
-	manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-	manager->announceEndpointListeners.list = celix_arrayList_create();
-	manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL);
-	manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-	manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
-	manager->loghelper = logHelper;
-	manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
-
-	manager->psaHandling.running = true;
-	celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
-	celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager");
-
-	return status;
+    celix_status_t status = CELIX_SUCCESS;
+
+    pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager));
+    if (manager == NULL) {
+        *out = NULL;
+        return CELIX_ENOMEM;
+    } else {
+        *out = manager;
+    }
+
+    manager->context = context;
+
+    celix_thread_mutexattr_t psaAttr;
+    celixThreadMutexAttr_create(&psaAttr);
+    celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+    status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
+    celixThreadMutexAttr_destroy(&psaAttr);
+
+    status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
+    status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
+    status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
+    status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
+    status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
+
+    status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
+
+    manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    manager->announceEndpointListeners.list = celix_arrayList_create();
+    manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL);
+    manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    manager->loghelper = logHelper;
+    manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
+
+    manager->psaHandling.running = true;
+    celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
+    celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager");
+
+    return status;
 }
 
 celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) {
-	celix_status_t status = CELIX_SUCCESS;
+    celix_status_t status = CELIX_SUCCESS;
 
-	celixThreadMutex_lock(&manager->psaHandling.mutex);
-	manager->psaHandling.running = false;
-	celixThreadCondition_broadcast(&manager->psaHandling.cond);
-	celixThreadMutex_unlock(&manager->psaHandling.mutex);
-	celixThread_join(manager->psaHandling.thread, NULL);
+    celixThreadMutex_lock(&manager->psaHandling.mutex);
+    manager->psaHandling.running = false;
+    celixThreadCondition_broadcast(&manager->psaHandling.cond);
+    celixThreadMutex_unlock(&manager->psaHandling.mutex);
+    celixThread_join(manager->psaHandling.thread, NULL);
 
-	celixThreadMutex_lock(&manager->pubsubadmins.mutex);
-	hashMap_destroy(manager->pubsubadmins.map, false, false);
-	celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-	celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
+    celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+    hashMap_destroy(manager->pubsubadmins.map, false, false);
+    celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+    celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
 
-	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+    celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
@@ -108,9 +108,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
             free(entry);
         }
     }
-	hashMap_destroy(manager->discoveredEndpoints.map, false, false);
-	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
-	celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex);
+    hashMap_destroy(manager->discoveredEndpoints.map, false, false);
+    celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+    celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex);
 
     celixThreadMutex_lock(&manager->topicReceivers.mutex);
     iter = hashMapIterator_construct(manager->topicReceivers.map);
@@ -120,6 +120,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
             free(entry->scopeAndTopicKey);
             free(entry->scope);
             free(entry->topic);
+            if (entry->topicProperties != NULL) {
+                celix_properties_destroy(entry->topicProperties);
+            }
             if (entry->endpoint != NULL) {
                 celix_properties_destroy(entry->endpoint);
             }
@@ -139,6 +142,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
             free(entry->scopeAndTopicKey);
             free(entry->scope);
             free(entry->topic);
+            if (entry->topicProperties != NULL) {
+                celix_properties_destroy(entry->topicProperties);
+            }
             if (entry->endpoint != NULL) {
                 celix_properties_destroy(entry->endpoint);
             }
@@ -151,70 +157,92 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
     celixThreadMutex_destroy(&manager->topicSenders.mutex);
 
     celixThreadMutex_destroy(&manager->announceEndpointListeners.mutex);
-	celix_arrayList_destroy(manager->announceEndpointListeners.list);
+    celix_arrayList_destroy(manager->announceEndpointListeners.list);
 
-	free(manager);
+    free(manager);
 
-	return status;
+    return status;
 }
 
-void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
-	pubsub_topology_manager_t *manager = handle;
-	pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
+void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+    pubsub_topology_manager_t *manager = handle;
+    pubsub_admin_service_t *psa = (pubsub_admin_service_t *) svc;
+
+
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA");
 
+    if (svcId >= 0) {
+        celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+        hashMap_put(manager->pubsubadmins.map, (void *) svcId, psa);
+        celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+    }
+
+    //NOTE new psa, so no endpoints announce yet
 
-	long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
-	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA");
+    //new PSA -> every topic sender/receiver entry needs a rematch
+    int needsRematchCount = 0;
 
-	if (svcId >= 0) {
-		celixThreadMutex_lock(&manager->pubsubadmins.mutex);
-		hashMap_put(manager->pubsubadmins.map, (void*)svcId, psa);
-		celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-	}
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        entry->needsMatch = true;
+        ++needsRematchCount;
+    }
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    iter = hashMapIterator_construct(manager->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        entry->needsMatch = true;
+        ++needsRematchCount;
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 
-	//NOTE new psa, so no endpoints announce yet
+    if (needsRematchCount > 0) {
+        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO,
+                      "A PSA is added after at least one active publisher/provided. \
+				It is preferred that all PSA are started before publiser/subscriber are started!\n\
+				Current topic/sender count is %i", needsRematchCount);
+    }
 
-	/* NOTE for now it assumed PSA / PST and PSD are started before subscribers/publisher
-	 * so no retroactively adding subscribers
-	 *
-	 * TODO future extension?
-	 */
 }
 
-void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
-	pubsub_topology_manager_t *manager = handle;
-	//pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
-	long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
+    pubsub_topology_manager_t *manager = handle;
+    //pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
 
-	//NOTE psa shutdown will teardown topic receivers / topic senders
-	//de-setup all topic receivers/senders for the removed psa.
-	//the next psaHandling run will try to find new psa.
+    //NOTE psa shutdown will teardown topic receivers / topic senders
+    //de-setup all topic receivers/senders for the removed psa.
+    //the next psaHandling run will try to find new psa.
 
     celixThreadMutex_lock(&manager->topicSenders.mutex);
-	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
-	while (hashMapIterator_hasNext(&iter)) {
-	    pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-	    if (entry->selectedPsaSvcId == svcId) {
-			/* de-announce all senders */
-			if (entry->endpoint != NULL) {
-				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-				for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
-					pubsub_announce_endpoint_listener_t *listener;
-					listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
-					listener->removeEndpoint(listener->handle, entry->endpoint);
-				}
-				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-			}
-
-			entry->needsMatch = true;
-	        entry->selectedSerializerSvcId = -1L;
-	        entry->selectedPsaSvcId = -1L;
-	        if (entry->endpoint != NULL) {
-	            celix_properties_destroy(entry->endpoint);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry->selectedPsaSvcId == svcId) {
+            /* de-announce all senders */
+            if (entry->endpoint != NULL) {
+                celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+                for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+                    pubsub_announce_endpoint_listener_t *listener;
+                    listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+                    listener->revokeEndpoint(listener->handle, entry->endpoint);
+                }
+                celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+            }
+
+            entry->needsMatch = true;
+            entry->selectedSerializerSvcId = -1L;
+            entry->selectedPsaSvcId = -1L;
+            if (entry->endpoint != NULL) {
+                celix_properties_destroy(entry->endpoint);
                 entry->endpoint = NULL;
             }
-	    }
-	}
+        }
+    }
     celixThreadMutex_unlock(&manager->topicSenders.mutex);
 
     celixThreadMutex_lock(&manager->topicReceivers.mutex);
@@ -222,16 +250,16 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
         if (entry->selectedPsaSvcId == svcId) {
-			/* de-announce all receivers */
-			if (entry->endpoint != NULL) {
-				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-				for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
-					pubsub_announce_endpoint_listener_t *listener;
-					listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
-					listener->removeEndpoint(listener->handle, entry->endpoint);
-				}
-				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-			}
+            /* de-announce all receivers */
+            if (entry->endpoint != NULL) {
+                celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+                for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+                    pubsub_announce_endpoint_listener_t *listener;
+                    listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+                    listener->revokeEndpoint(listener->handle, entry->endpoint);
+                }
+                celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+            }
 
             entry->needsMatch = true;
             entry->selectedSerializerSvcId = -1L;
@@ -245,207 +273,206 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 
 
-
     logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA");
 }
 
-void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
-	pubsub_topology_manager_t *manager = handle;
-
-	//NOTE new local subscriber service register
-	//1) First trying to see if a TopicReceiver already exists for this subscriber, if found
-	//2) update the usage count. if not found
-	//3) signal psaHandling thread to setup topic receiver
-
-	const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
-	const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
-	if (topic == NULL) {
-		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
-					  "[PSTM] Warning found subscriber service without mandatory %s property.",
-					  PUBSUB_SUBSCRIBER_TOPIC);
-		return;
-	}
-
-	long bndId = celix_bundle_getId(bnd);
-	char *scopeAndTopicKey = NULL;
-	scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-	celixThreadMutex_lock(&manager->topicReceivers.mutex);
-	pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
-	if (entry != NULL) {
-		entry->usageCount += 1;
-		free(scopeAndTopicKey);
-	} else {
-		entry = calloc(1, sizeof(*entry));
-		entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
-		entry->scope = strndup(scope, 1024 * 1024);
-		entry->topic = strndup(topic, 1024 * 1024);
-		entry->usageCount = 1;
-		entry->selectedPsaSvcId = -1L;
-		entry->selectedSerializerSvcId = -1L;
-		entry->needsMatch = true;
-		entry->bndId = bndId;
-		entry->subscriberProperties = celix_properties_copy(props);
-		hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
-
-		//signal psa handling thread
-		celixThreadCondition_broadcast(&manager->psaHandling.cond);
-	}
-	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
+    pubsub_topology_manager_t *manager = handle;
+
+    //NOTE new local subscriber service register
+    //1) First trying to see if a TopicReceiver already exists for this subscriber, if found
+    //2) update the usage count. if not found
+    //3) signal psaHandling thread to setup topic receiver
+
+    const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+    if (topic == NULL) {
+        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+                      "[PSTM] Warning found subscriber service without mandatory %s property.",
+                      PUBSUB_SUBSCRIBER_TOPIC);
+        return;
+    }
+
+    long bndId = celix_bundle_getId(bnd);
+    char *scopeAndTopicKey = NULL;
+    scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
+    if (entry != NULL) {
+        entry->usageCount += 1;
+        free(scopeAndTopicKey);
+    } else {
+        entry = calloc(1, sizeof(*entry));
+        entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
+        entry->scope = strndup(scope, 1024 * 1024);
+        entry->topic = strndup(topic, 1024 * 1024);
+        entry->usageCount = 1;
+        entry->selectedPsaSvcId = -1L;
+        entry->selectedSerializerSvcId = -1L;
+        entry->needsMatch = true;
+        entry->bndId = bndId;
+        entry->subscriberProperties = celix_properties_copy(props);
+        hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
+
+        //signal psa handling thread
+        celixThreadCondition_broadcast(&manager->psaHandling.cond);
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 }
 
-void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
-	pubsub_topology_manager_t *manager = handle;
+void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
+    pubsub_topology_manager_t *manager = handle;
 
-	//NOTE local subscriber service unregister
-	//1) Find topic receiver and decrease count
+    //NOTE local subscriber service unregister
+    //1) Find topic receiver and decrease count
 
-	const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
-	const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+    const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
 
-	if (topic == NULL) {
-		return;
-	}
+    if (topic == NULL) {
+        return;
+    }
 
-	char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
-	celixThreadMutex_lock(&manager->topicReceivers.mutex);
-	pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
-	if (entry != NULL) {
-		entry->usageCount -= 0;
-	}
-	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
-	free(scopeAndTopicKey);
+    char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
+    if (entry != NULL) {
+        entry->usageCount -= 0;
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+    free(scopeAndTopicKey);
 
-	//NOTE not waking up psaHandling thread, topic receiver does not need to be removed immediately.
+    //NOTE not waking up psaHandling thread, topic receiver does not need to be removed immediately.
 }
 
-void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
-	pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
-	pubsub_announce_endpoint_listener_t *listener = svc;
-
-	//1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
-	//2) Add listener to manager->announceEndpointListeners
-
-	celixThreadMutex_lock(&manager->topicSenders.mutex);
-	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
-	while (hashMapIterator_hasNext(&iter)) {
-		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (entry != NULL && entry->endpoint != NULL) {
-			listener->announceEndpoint(listener->handle, entry->endpoint);
-		}
-	}
-	celixThreadMutex_unlock(&manager->topicSenders.mutex);
-
-	celixThreadMutex_lock(&manager->topicReceivers.mutex);
-	iter = hashMapIterator_construct(manager->topicReceivers.map);
-	while (hashMapIterator_hasNext(&iter)) {
-		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (entry != NULL && entry->endpoint != NULL) {
-			listener->announceEndpoint(listener->handle, entry->endpoint);
-		}
-	}
-	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
-
-	celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-	celix_arrayList_add(manager->announceEndpointListeners.list, listener);
-	celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+    pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle;
+    pubsub_announce_endpoint_listener_t *listener = svc;
+
+    //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
+    //2) Add listener to manager->announceEndpointListeners
+
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->endpoint != NULL) {
+            listener->announceEndpoint(listener->handle, entry->endpoint);
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    iter = hashMapIterator_construct(manager->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->endpoint != NULL) {
+            listener->announceEndpoint(listener->handle, entry->endpoint);
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+    celix_arrayList_add(manager->announceEndpointListeners.list, listener);
+    celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
 }
 
 
-void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
-	pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
-	pubsub_announce_endpoint_listener_t *listener = svc;
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+    pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle;
+    pubsub_announce_endpoint_listener_t *listener = svc;
 
-	//1) Remove listener from manager->announceEndpointListeners
-	//2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints)
+    //1) Remove listener from manager->announceEndpointListeners
+    //2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints)
 
-	celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-	celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
-	celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+    celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
+    celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
 }
 
 void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
-	pubsub_topology_manager_t *manager = handle;
-
-	//NOTE new local subscriber service register
-	//1) First trying to see if a TopicReceiver already exists for this subscriber, if found
-	//2) update the usage count. if not found
-	//3) signal psaHandling thread to find a psa and setup TopicSender
-
-
-	char *topicFromFilter = NULL;
-	char *scopeFromFilter = NULL;
-	pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter);
-	char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter;
-	char *topic = topicFromFilter;
-
-	char *scopeAndTopicKey = NULL;
-	if (topic == NULL) {
-		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
-					  "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.",
-					  PUBSUB_SUBSCRIBER_TOPIC);
-		return;
-	}
-
-	scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
-	celixThreadMutex_lock(&manager->topicSenders.mutex);
-	pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
-	if (entry != NULL) {
-		entry->usageCount += 1;
-		free(scope);
-		free(topic);
-		free(scopeAndTopicKey);
-	} else {
-		entry = calloc(1, sizeof(*entry));
-		entry->usageCount = 1;
-		entry->selectedSerializerSvcId = -1L;
-		entry->selectedPsaSvcId = -1L;
-		entry->scope = scope; //taking ownership
-		entry->topic = topic; //taking ownership
-		entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership
-		entry->needsMatch = true;
-		entry->publisherFilter = celix_filter_create(info->filter->filterStr);
-		entry->bndId = info->bundleId;
-		hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
-
-		//new entry -> wakeup psaHandling thread
-		celixThreadCondition_broadcast(&manager->psaHandling.cond);
-	}
-	celixThreadMutex_unlock(&manager->topicSenders.mutex);
+    pubsub_topology_manager_t *manager = handle;
+
+    //NOTE new local subscriber service register
+    //1) First trying to see if a TopicReceiver already exists for this subscriber, if found
+    //2) update the usage count. if not found
+    //3) signal psaHandling thread to find a psa and setup TopicSender
+
+
+    char *topicFromFilter = NULL;
+    char *scopeFromFilter = NULL;
+    pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter);
+    char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter;
+    char *topic = topicFromFilter;
+
+    char *scopeAndTopicKey = NULL;
+    if (topic == NULL) {
+        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+                      "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.",
+                      PUBSUB_SUBSCRIBER_TOPIC);
+        return;
+    }
+
+    scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+    pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+    if (entry != NULL) {
+        entry->usageCount += 1;
+        free(scope);
+        free(topic);
+        free(scopeAndTopicKey);
+    } else {
+        entry = calloc(1, sizeof(*entry));
+        entry->usageCount = 1;
+        entry->selectedSerializerSvcId = -1L;
+        entry->selectedPsaSvcId = -1L;
+        entry->scope = scope; //taking ownership
+        entry->topic = topic; //taking ownership
+        entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership
+        entry->needsMatch = true;
+        entry->publisherFilter = celix_filter_create(info->filter->filterStr);
+        entry->bndId = info->bundleId;
+        hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
+
+        //new entry -> wakeup psaHandling thread
+        celixThreadCondition_broadcast(&manager->psaHandling.cond);
+    }
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
 }
 
 void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) {
-	pubsub_topology_manager_t *manager = handle;
+    pubsub_topology_manager_t *manager = handle;
 
-	//NOTE local subscriber service unregister
-	//1) Find topic sender and decrease count
+    //NOTE local subscriber service unregister
+    //1) Find topic sender and decrease count
 
-	char *topic = NULL;
-	char *scopeFromFilter = NULL;
-	pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
-	const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+    char *topic = NULL;
+    char *scopeFromFilter = NULL;
+    pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
+    const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
 
-	if (topic == NULL) {
-		free(scopeFromFilter);
-		return;
-	}
+    if (topic == NULL) {
+        free(scopeFromFilter);
+        return;
+    }
 
 
-	char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
-	celixThreadMutex_lock(&manager->topicSenders.mutex);
-	pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
-	if (entry != NULL) {
-		entry->usageCount -= 1;
-	}
-	celixThreadMutex_unlock(&manager->topicSenders.mutex);
+    char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+    pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+    if (entry != NULL) {
+        entry->usageCount -= 1;
+    }
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
 
-	free(scopeAndTopicKey);
-	free(topic);
-	free(scopeFromFilter);
+    free(scopeAndTopicKey);
+    free(topic);
+    free(scopeFromFilter);
 }
 
-celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint){
-	celix_status_t status = CELIX_SUCCESS;
+celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+    celix_status_t status = CELIX_SUCCESS;
     pubsub_topology_manager_t *manager = handle;
 
     const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
@@ -455,92 +482,92 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
     // 1) If not, find matching psa using the matchEndpoint
     // 2) if found call addEndpoint of the matching psa
 
-	if (manager->verbose) {
-		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
-					  "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
-					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
-					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
-					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
-					  uuid);
-	}
-
-
-	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
-	pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
-	if (entry != NULL) {
-		//already existing endpoint -> increase usage
-		entry->usageCount += 1;
-	} else {
-		//new endpoint -> new entry
-		entry = calloc(1, sizeof(*entry));
-		entry->usageCount = 1;
-		entry->endpoint = celix_properties_copy(endpoint);
-		entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
-		entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet
-		hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry);
-
-		//waking up psa handling thread to select psa
-		celixThreadCondition_broadcast(&manager->psaHandling.cond);
-
-	}
-	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+    if (manager->verbose) {
+        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+                      "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+                      celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+                      celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+                      celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+                      uuid);
+    }
+
+
+    celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+    pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
+    if (entry != NULL) {
+        //already existing endpoint -> increase usage
+        entry->usageCount += 1;
+    } else {
+        //new endpoint -> new entry
+        entry = calloc(1, sizeof(*entry));
+        entry->usageCount = 1;
+        entry->endpoint = celix_properties_copy(endpoint);
+        entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+        entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet
+        hashMap_put(manager->discoveredEndpoints.map, (void *) entry->uuid, entry);
+
+        //waking up psa handling thread to select psa
+        celixThreadCondition_broadcast(&manager->psaHandling.cond);
+
+    }
+    celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
 
     return status;
 }
 
 static void pstm_removeEndpointCallback(void *handle, void *svc) {
-	celix_properties_t *endpoint = handle;
-	pubsub_admin_service_t *psa = svc;
-	psa->removeEndpoint(psa->handle, endpoint);
+    celix_properties_t *endpoint = handle;
+    pubsub_admin_service_t *psa = svc;
+    psa->removeDiscoveredEndpoint(psa->handle, endpoint);
 }
 
 celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_topology_manager_t *manager = handle;
 
-	// 1) See if endpoint is already discovered, if so decrease usage count.
-	// 1) If usage count becomes 0, find matching psa using the matchEndpoint
-	// 2) if found call disconnectEndpoint of the matching psa
-
-	const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
-	assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
-
-	if (manager->verbose) {
-		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
-					  "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
-					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
-					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
-					  celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
-					  uuid);
-	}
-
-	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
-	pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
-	if (entry != NULL) {
-		//already existing endpoint -> decrease usage
-		entry->usageCount-= 1;
-		if (entry->usageCount <= 0) {
-			hashMap_remove(manager->discoveredEndpoints.map, entry->uuid);
-		} else {
-			entry = NULL; //still used (usage count > 0) -> do nothing
-		}
-	}
-	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
-
-	if (entry != NULL) {
-		//note entry is removed from manager->discoveredEndpoints, also inform used psa
-		if (entry->selectedPsaSvcId >= 0) {
-			//note that it is possible that the psa is already gone, in that case the call is also not needed anymore.
-			celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
-												 (void *) endpoint, pstm_removeEndpointCallback);
-		} else {
-			logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid);
-		}
-		celix_properties_destroy(entry->endpoint);
-		free(entry);
-	}
-
-
-	return CELIX_SUCCESS;
+    // 1) See if endpoint is already discovered, if so decrease usage count.
+    // 1) If usage count becomes 0, find matching psa using the matchEndpoint
+    // 2) if found call disconnectEndpoint of the matching psa
+
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+    assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
+
+    if (manager->verbose) {
+        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+                      "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+                      celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+                      celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+                      celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+                      uuid);
+    }
+
+    celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+    pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
+    if (entry != NULL) {
+        //already existing endpoint -> decrease usage
+        entry->usageCount -= 1;
+        if (entry->usageCount <= 0) {
+            hashMap_remove(manager->discoveredEndpoints.map, entry->uuid);
+        } else {
+            entry = NULL; //still used (usage count > 0) -> do nothing
+        }
+    }
+    celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+
+    if (entry != NULL) {
+        //note entry is removed from manager->discoveredEndpoints, also inform used psa
+        if (entry->selectedPsaSvcId >= 0) {
+            //note that it is possible that the psa is already gone, in that case the call is also not needed anymore.
+            celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+                                                 (void *) endpoint, pstm_removeEndpointCallback);
+        } else {
+            logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid);
+        }
+        celix_properties_destroy(entry->endpoint);
+        free(entry);
+    }
+
+
+    return CELIX_SUCCESS;
 }
 
 
@@ -559,45 +586,48 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
         if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) {
             if (manager->verbose && entry->endpoint != NULL) {
                 logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
-                        "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic);
+                              "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic);
             }
 
             if (entry->endpoint != NULL) {
-				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-				for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-					pubsub_announce_endpoint_listener_t *listener;
-					listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
-					listener->removeEndpoint(listener->handle, entry->endpoint);
-				}
-				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-				celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
-													 PUBSUB_ADMIN_SERVICE_NAME,
-													 entry, pstm_teardownTopicSenderCallback);
-			}
+                celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+                for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                    pubsub_announce_endpoint_listener_t *listener;
+                    listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                    listener->revokeEndpoint(listener->handle, entry->endpoint);
+                }
+                celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+                celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
+                                                     PUBSUB_ADMIN_SERVICE_NAME,
+                                                     entry, pstm_teardownTopicSenderCallback);
+            }
 
 
             //cleanup entry
             if (entry->usageCount <= 0) {
-            	//no usage -> remove
-				hashMapIterator_remove(&iter);
-				free(entry->scopeAndTopicKey);
-				free(entry->scope);
-				free(entry->topic);
-				if (entry->publisherFilter != NULL) {
-					celix_filter_destroy(entry->publisherFilter);
-				}
-				if (entry->endpoint != NULL) {
-					celix_properties_destroy(entry->endpoint);
-				}
-				free(entry);
-			} else {
-            	//still usage, setup for new match
-            	if (entry->endpoint != NULL) {
-					celix_properties_destroy(entry->endpoint);
-				}
-            	entry->endpoint = NULL;
-            	entry->selectedPsaSvcId = -1L;
-            	entry->selectedSerializerSvcId = -1L;
+                //no usage -> remove
+                hashMapIterator_remove(&iter);
+                free(entry->scopeAndTopicKey);
+                free(entry->scope);
+                free(entry->topic);
+                if (entry->topicProperties != NULL) {
+                    celix_properties_destroy(entry->topicProperties);
+                }
+                if (entry->publisherFilter != NULL) {
+                    celix_filter_destroy(entry->publisherFilter);
+                }
+                if (entry->endpoint != NULL) {
+                    celix_properties_destroy(entry->endpoint);
+                }
+                free(entry);
+            } else {
+                //still usage, setup for new match
+                if (entry->endpoint != NULL) {
+                    celix_properties_destroy(entry->endpoint);
+                }
+                entry->endpoint = NULL;
+                entry->selectedPsaSvcId = -1L;
+                entry->selectedSerializerSvcId = -1L;
             }
         }
     }
@@ -625,215 +655,235 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
             }
 
             if (entry->endpoint != NULL) {
-				celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
-													 PUBSUB_ADMIN_SERVICE_NAME, entry,
-													 pstm_teardownTopicReceiverCallback);
-				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-				for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-					pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
-							manager->announceEndpointListeners.list, i);
-					listener->removeEndpoint(listener->handle, entry->endpoint);
-				}
-				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-			}
-
-			if (entry->usageCount <= 0) {
-				//no usage -> remove
-				hashMapIterator_remove(&iter);
-				//cleanup entry
-				free(entry->scopeAndTopicKey);
-				free(entry->scope);
-				free(entry->topic);
-				if (entry->subscriberProperties != NULL) {
-					celix_properties_destroy(entry->subscriberProperties);
-				}
-				if (entry->endpoint != NULL) {
-					celix_properties_destroy(entry->endpoint);
-				}
-				free(entry);
-			} else {
-				//still usage -> setup for rematch
-				if (entry->endpoint != NULL) {
-					celix_properties_destroy(entry->endpoint);
-				}
-				entry->endpoint = NULL;
-				entry->selectedPsaSvcId = -1L;
-				entry->selectedSerializerSvcId = -1L;
-			}
+                celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
+                                                     PUBSUB_ADMIN_SERVICE_NAME, entry,
+                                                     pstm_teardownTopicReceiverCallback);
+                celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+                for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                    pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
+                            manager->announceEndpointListeners.list, i);
+                    listener->revokeEndpoint(listener->handle, entry->endpoint);
+                }
+                celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+            }
+
+            if (entry->usageCount <= 0) {
+                //no usage -> remove
+                hashMapIterator_remove(&iter);
+                //cleanup entry
+                free(entry->scopeAndTopicKey);
+                free(entry->scope);
+                free(entry->topic);
+                if (entry->topicProperties != NULL) {
+                    celix_properties_destroy(entry->topicProperties);
+                }
+                if (entry->subscriberProperties != NULL) {
+                    celix_properties_destroy(entry->subscriberProperties);
+                }
+                if (entry->endpoint != NULL) {
+                    celix_properties_destroy(entry->endpoint);
+                }
+                free(entry);
+            } else {
+                //still usage -> setup for rematch
+                if (entry->endpoint != NULL) {
+                    celix_properties_destroy(entry->endpoint);
+                }
+                entry->endpoint = NULL;
+                entry->selectedPsaSvcId = -1L;
+                entry->selectedSerializerSvcId = -1L;
+            }
         }
     }
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 }
 
 static void pstm_addEndpointCallback(void *handle, void *svc) {
-	celix_properties_t *endpoint = handle;
-	pubsub_admin_service_t *psa = svc;
-	psa->addEndpoint(psa->handle, endpoint);
+    celix_properties_t *endpoint = handle;
+    pubsub_admin_service_t *psa = svc;
+    psa->addDiscoveredEndpoint(psa->handle, endpoint);
 }
 
 static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
-	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
-	hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
-	while (hashMapIterator_hasNext(&iter)) {
-		pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (entry != NULL && entry->selectedPsaSvcId < 0) {
-			long psaSvcId = -1L;
-
-			celixThreadMutex_lock(&manager->pubsubadmins.mutex);
-			hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
-			while (hashMapIterator_hasNext(&iter2)) {
-				hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
-				pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
-				long svcId = (long) hashMapEntry_getKey(mapEntry);
-				bool match = false;
-				psa->matchEndpoint(psa->handle, entry->endpoint, &match);
-				if (match) {
-					psaSvcId = svcId;
-					break;
-				}
-			}
-			celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
-			if (psaSvcId >= 0) {
-				celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
-													 (void *)entry->endpoint, pstm_addEndpointCallback);
-			} else {
-				logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
-			}
-
-			entry->selectedPsaSvcId = psaSvcId;
-		}
-	}
-	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+    celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->selectedPsaSvcId < 0) {
+            long psaSvcId = -1L;
+
+            celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+            hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+            while (hashMapIterator_hasNext(&iter2)) {
+                hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+                pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+                long svcId = (long) hashMapEntry_getKey(mapEntry);
+                bool match = false;
+                psa->matchDiscoveredEndpoint(psa->handle, entry->endpoint, &match);
+                if (match) {
+                    psaSvcId = svcId;
+                    break;
+                }
+            }
+            celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+            if (psaSvcId >= 0) {
+                celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+                                                     (void *) entry->endpoint, pstm_addEndpointCallback);
+            } else {
+                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
+            }
+
+            entry->selectedPsaSvcId = psaSvcId;
+        }
+    }
+    celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
 }
 
 
 static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
-	pstm_topic_receiver_or_sender_entry_t *entry = handle;
-	pubsub_admin_service_t *psa = svc;
-	psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    pubsub_admin_service_t *psa = svc;
+    psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint);
 }
 
 static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
-	celixThreadMutex_lock(&manager->topicSenders.mutex);
-	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
-	while (hashMapIterator_hasNext(&iter)) {
-		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (entry != NULL && entry->needsMatch) {
-			//new topic sender needed, requesting match with current psa
-			double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
-			long serializerSvcId = -1L;
-			long selectedPsaSvcId = -1L;
-
-			celixThreadMutex_lock(&manager->pubsubadmins.mutex);
-			hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
-			while (hashMapIterator_hasNext(&iter2)) {
-				hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
-				long svcId = (long)hashMapEntry_getKey(mapEntry);
-				pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
-				double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
-				long serSvcId = -1L;
-				psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &score, &serSvcId);
-				if (score > highestScore) {
-					highestScore = score;
-					serializerSvcId = serSvcId;
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->needsMatch) {
+            //new topic sender needed, requesting match with current psa
+            double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+            long serializerSvcId = -1L;
+            long selectedPsaSvcId = -1L;
+            celix_properties_t *topicPropertiesForHighestMatch = NULL;
+
+            celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+            hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+            while (hashMapIterator_hasNext(&iter2)) {
+                hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+                long svcId = (long) hashMapEntry_getKey(mapEntry);
+                pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+                double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+                long serSvcId = -1L;
+                celix_properties_t *topicProps = NULL;
+                psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId);
+                if (score > highestScore) {
+                    if (topicPropertiesForHighestMatch != NULL) {
+                        celix_properties_destroy(topicPropertiesForHighestMatch);
+                    }
+                    highestScore = score;
+                    serializerSvcId = serSvcId;
                     selectedPsaSvcId = svcId;
-				}
-			}
-			celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
-			if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
-			    entry->selectedPsaSvcId = selectedPsaSvcId;
-			    entry->selectedSerializerSvcId = serializerSvcId;
-				bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
-
-				if (called && entry->endpoint != NULL) {
-					entry->needsMatch = false;
-
-					//announce new endpoint through the network
-					celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-					for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-						pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
-						listener->announceEndpoint(listener->handle, entry->endpoint);
-					}
-					celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-				}
-			}
-
-			if (entry->needsMatch) {
-				logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic);
-			}
-		}
-	}
-	celixThreadMutex_unlock(&manager->topicSenders.mutex);
+                    topicPropertiesForHighestMatch = topicProps;
+                } else if (topicProps != NULL) {
+                    celix_properties_destroy(topicProps);
+                }
+            }
+            celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+            if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+                entry->selectedPsaSvcId = selectedPsaSvcId;
+                entry->selectedSerializerSvcId = serializerSvcId;
+                entry->topicProperties = topicPropertiesForHighestMatch;
+                bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
+
+                if (called && entry->endpoint != NULL) {
+                    entry->needsMatch = false;
+
+                    //announce new endpoint through the network
+                    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+                    for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                        pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                        listener->announceEndpoint(listener->handle, entry->endpoint);
+                    }
+                    celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+                }
+            }
+
+            if (entry->needsMatch) {
+                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic);
+            }
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
 }
 
 static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
     pstm_topic_receiver_or_sender_entry_t *entry = handle;
     pubsub_admin_service_t *psa = svc;
-    psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+    psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint);
 }
 
 static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
-	celixThreadMutex_lock(&manager->topicReceivers.mutex);
-	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
-	while (hashMapIterator_hasNext(&iter)) {
-		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (entry->needsMatch) {
-
-			double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
-			long serializerSvcId = -1L;
-			long selectedPsaSvcId = -1L;
-
-			celixThreadMutex_lock(&manager->pubsubadmins.mutex);
-			hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
-			while (hashMapIterator_hasNext(&iter2)) {
-				hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
-				long svcId = (long) hashMapEntry_getKey(mapEntry);
-				pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
-				double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
-				long serSvcId = -1L;
-
-				psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &score, &serSvcId);
-				if (score > highestScore) {
-					highestScore = score;
-					serializerSvcId = serSvcId;
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry->needsMatch) {
+
+            double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+            long serializerSvcId = -1L;
+            long selectedPsaSvcId = -1L;
+            celix_properties_t *highestMatchTopicProperties = NULL;
+
+            celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+            hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+            while (hashMapIterator_hasNext(&iter2)) {
+                hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+                long svcId = (long) hashMapEntry_getKey(mapEntry);
+                pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+                double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+                long serSvcId = -1L;
+                celix_properties_t *topicProps = NULL;
+
+                psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score, &serSvcId);
+                if (score > highestScore) {
+                    if (highestMatchTopicProperties != NULL) {
+                        celix_properties_destroy(highestMatchTopicProperties);
+                    }
+                    highestScore = score;
+                    serializerSvcId = serSvcId;
                     selectedPsaSvcId = svcId;
-				}
-			}
-			celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
-			if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
-				entry->selectedPsaSvcId = selectedPsaSvcId;
-				entry->selectedSerializerSvcId = serializerSvcId;
-
-				bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
-													 entry,
-													 pstm_setupTopicReceiverCallback);
-
-				if (called && entry->endpoint != NULL) {
-				    entry->needsMatch = false;
-					//announce new endpoint through the network
-					celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-					for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-						pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
-								manager->announceEndpointListeners.list, i);
-						listener->announceEndpoint(listener->handle, entry->endpoint);
-					}
-					celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-				}
-			}
-
-
-			if (entry->needsMatch) {
-				logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic);
-			}
-		}
-	}
-	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+                    highestMatchTopicProperties = topicProps;
+                } else if (topicProps != NULL) {
+                    celix_properties_destroy(topicProps);
+                }
+            }
+            celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+            if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+                entry->selectedPsaSvcId = selectedPsaSvcId;
+                entry->selectedSerializerSvcId = serializerSvcId;
+
+                bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+                                                                   entry,
+                                                                   pstm_setupTopicReceiverCallback);
+
+                if (called && entry->endpoint != NULL) {
+                    entry->needsMatch = false;
+                    //announce new endpoint through the network
+                    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+                    for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                        pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
+                                manager->announceEndpointListeners.list, i);
+                        listener->announceEndpoint(listener->handle, entry->endpoint);
+                    }
+                    celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+                }
+            }
+
+
+            if (entry->needsMatch) {
+                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic);
+            }
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 }
 
-static void* pstm_psaHandlingThread(void *data) {
+static void *pstm_psaHandlingThread(void *data) {
     pubsub_topology_manager_t *manager = data;
 
     celixThreadMutex_lock(&manager->psaHandling.mutex);
@@ -841,7 +891,7 @@ static void* pstm_psaHandlingThread(void *data) {
     celixThreadMutex_unlock(&manager->psaHandling.mutex);
 
     while (running) {
-    	//first teardown -> also if rematch is needed
+        //first teardown -> also if rematch is needed
         pstm_teardownTopicSenders(manager);
         pstm_teardownTopicReceivers(manager);
 
@@ -849,7 +899,7 @@ static void* pstm_psaHandlingThread(void *data) {
         pstm_setupTopicSenders(manager);
         pstm_setupTopicReceivers(manager);
 
-		pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
+        pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
 
         celixThreadMutex_lock(&manager->psaHandling.mutex);
         celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS, 0L);
@@ -860,92 +910,141 @@ static void* pstm_psaHandlingThread(void *data) {
 }
 
 
-celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
-	pubsub_topology_manager_t *manager = handle;
-	//TODO add support for searching based on scope and topic
+celix_status_t pubsub_topologyManager_shellCommand(void *handle, char *commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
+    pubsub_topology_manager_t *manager = handle;
+    //TODO add support for searching based on scope and topic
 
-	fprintf(os, "\n");
+    fprintf(os, "\n");
 
-	fprintf(os, "Discovered Endpoints: \n");
-	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
-	hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
-	while (hashMapIterator_hasNext(&iter)) {
-		pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter);
+    fprintf(os, "Discovered Endpoints: \n");
+    celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter);
         const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!");
         const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!");
         const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
-		const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
-		const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
-		const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-		const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-		fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
+        const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+        const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
+        const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+        const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
         fprintf(os, "   |- container name = %s\n", cn);
         fprintf(os, "   |- fw uuid        = %s\n", fwuuid);
         fprintf(os, "   |- type           = %s\n", type);
-		fprintf(os, "   |- scope          = %s\n", scope);
-		fprintf(os, "   |- topic          = %s\n", topic);
-		fprintf(os, "   |- admin type     = %s\n", adminType);
-		fprintf(os, "   |- serializer     = %s\n", serType);
-		if (manager->verbose) {
-			fprintf(os, "   |- psa svc id     = %li\n", discovered->selectedPsaSvcId);
-			fprintf(os, "   |- usage count    = %i\n", discovered->usageCount);
-		}
-	}
-	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
-	fprintf(os,"\n");
-
-
-	fprintf(os, "Active Topic Senders:\n");
-	celixThreadMutex_lock(&manager->topicSenders.mutex);
-	iter = hashMapIterator_construct(manager->topicSenders.map);
-	while (hashMapIterator_hasNext(&iter)) {
-		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (entry->endpoint == NULL) {
-			continue;
-		}
-		const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
-		const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-		const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-		fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
-		fprintf(os, "   |- scope       = %s\n", entry->scope);
-		fprintf(os, "   |- topic       = %s\n", entry->topic);
-		fprintf(os, "   |- admin type  = %s\n", adminType);
-		fprintf(os, "   |- serializer  = %s\n", serType);
-		if (manager->verbose) {
-			fprintf(os, "   |- psa svc id  = %li\n", entry->selectedPsaSvcId);
-			fprintf(os, "   |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
-			fprintf(os, "   |- usage count = %i\n", entry->usageCount);
-		}
-	}
-	celixThreadMutex_unlock(&manager->topicSenders.mutex);
-	fprintf(os,"\n");
-
-	fprintf(os, "Active Topic Receivers:\n");
-	celixThreadMutex_lock(&manager->topicReceivers.mutex);
-	iter = hashMapIterator_construct(manager->topicReceivers.map);
-	while (hashMapIterator_hasNext(&iter)) {
-		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-		if (entry->endpoint == NULL) {
-			continue;
-		}
-		const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
-		const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-		const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-		fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
-		fprintf(os, "   |- scope       = %s\n", entry->scope);
-		fprintf(os, "   |- topic       = %s\n", entry->topic);
-		fprintf(os, "   |- admin type  = %s\n", adminType);
-		fprintf(os, "   |- serializer  = %s\n", serType);
-		if (manager->verbose) {
-			fprintf(os, "    |- psa svc id  = %li\n", entry->selectedPsaSvcId);
-			fprintf(os, "    |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
-			fprintf(os, "    |- usage count = %i\n", entry->usageCount);
-		}
-	}
-	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
-	fprintf(os,"\n");
-
-	fprintf(os, "TODO pending topic senders/receivers\n");
-
-	return CELIX_SUCCESS;
+        fprintf(os, "   |- scope          = %s\n", scope);
+        fprintf(os, "   |- topic          = %s\n", topic);
+        fprintf(os, "   |- admin type     = %s\n", adminType);
+        fprintf(os, "   |- serializer     = %s\n", serType);
+        if (manager->verbose) {
+            fprintf(os, "   |- psa svc id     = %li\n", discovered->selectedPsaSvcId);
+            fprintf(os, "   |- usage count    = %i\n", discovered->usageCount);
+        }
+    }
+    celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+    fprintf(os, "\n");
+
+
+    fprintf(os, "Active Topic Senders:\n");
+    int countPendingSenders = 0;
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+    iter = hashMapIterator_construct(manager->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry->endpoint == NULL) {
+            ++countPendingSenders;
+            continue;
+        }
+        const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
+        const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
+        fprintf(os, "   |- scope       = %s\n", entry->scope);
+        fprintf(os, "   |- topic       = %s\n", entry->topic);
+        fprintf(os, "   |- admin type  = %s\n", adminType);
+        fprintf(os, "   |- serializer  = %s\n", serType);
+        if (manager->verbose) {
+            fprintf(os, "   |- psa svc id  = %li\n", entry->selectedPsaSvcId);
+            fprintf(os, "   |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
+            fprintf(os, "   |- usage count = %i\n", entry->usageCount);
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
+    fprintf(os, "\n");
+
+    fprintf(os, "Active Topic Receivers:\n");
+    int countPendingReceivers = 0;
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    iter = hashMapIterator_construct(manager->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry->endpoint == NULL) {
+            ++countPendingReceivers;
+            continue;
+        }
+        const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
+        const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
+        fprintf(os, "   |- scope       = %s\n", entry->scope);
+        fprintf(os, "   |- topic       = %s\n", entry->topic);
+        fprintf(os, "   |- admin type  = %s\n", adminType);
+        fprintf(os, "   |- serializer  = %s\n", serType);
+        if (manager->verbose) {
+            fprintf(os, "    |- psa svc id  = %li\n", entry->selectedPsaSvcId);
+            fprintf(os, "    |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
+            fprintf(os, "    |- usage count = %i\n", entry->usageCount);
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+    fprintf(os, "\n");
+
+    if (countPendingSenders > 0) {
+        fprintf(os, "Pending Topic Senders:\n");
+        celixThreadMutex_lock(&manager->topicSenders.mutex);
+        iter = hashMapIterator_construct(manager->topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (entry->endpoint == NULL) {
+                fprintf(os, "|- Pending Topic Sender for scope/topic %s/%s:\n", entry->scope, entry->topic);
+                const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)");
+                const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
+                const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)");
+                fprintf(os, "    |- requested qos        = %s\n", requestedQos);
+                fprintf(os, "    |- requested config     = %s\n", requestedConfig);
+                fprintf(os, "    |- requested serializer = %s\n", requestedSer);
+                if (manager->verbose) {
+                    fprintf(os, "    |- usage count          = %i\n", entry->usageCount);
+                }
+            }
+        }
+        celixThreadMutex_unlock(&manager->topicSenders.mutex);
+        fprintf(os, "\n");
+    }
+
+    if (countPendingReceivers > 0) {
+        fprintf(os, "Pending Topic Receivers:\n");
+        celixThreadMutex_lock(&manager->topicReceivers.mutex);
+        iter = hashMapIterator_construct(manager->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (entry->endpoint == NULL) {
+                fprintf(os, "|- Topic Receiver for scope/topic %s/%s:\n", entry->scope, entry->topic);
+                const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)");
+                const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
+                const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)");
+                fprintf(os, "    |- requested qos        = %s\n", requestedQos);
+                fprintf(os, "    |- requested config     = %s\n", requestedConfig);
+                fprintf(os, "    |- requested serializer = %s\n", requestedSer);
+                if (manager->verbose) {
+                    fprintf(os, "    |- usage count          = %i\n", entry->usageCount);
+                }
+            }
+        }
+        celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+        fprintf(os, "\n");
+    }
+
+
+    return CELIX_SUCCESS;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 39277fe..e66831e 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -94,6 +94,7 @@ typedef struct pstm_topic_receiver_or_sender_entry {
 	long selectedPsaSvcId;
 	long selectedSerializerSvcId;
 	long bndId;
+	celix_properties_t *topicProperties; //found in META-INF/(pub|sub)/(topic).properties
 
 	//for sender entry
 	celix_filter_t *publisherFilter;
@@ -104,7 +105,6 @@ typedef struct pstm_topic_receiver_or_sender_entry {
 
 celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager);
 celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager);
-celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_t *manager);
 
 void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props);
 void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const celix_properties_t *props);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/libs/utils/src/properties.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/properties.c b/libs/utils/src/properties.c
index 1519ecc..3c7ba74 100644
--- a/libs/utils/src/properties.c
+++ b/libs/utils/src/properties.c
@@ -357,7 +357,10 @@ celix_properties_t* celix_properties_copy(const celix_properties_t *properties)
 }
 
 const char* celix_properties_get(const celix_properties_t *properties, const char *key, const char *defaultValue) {
-	const char* value = hashMap_get((hash_map_t*)properties, (void*)key);
+	const char* value = NULL;
+	if (properties != NULL) {
+		value = hashMap_get((hash_map_t*)properties, (void*)key);
+	}
 	return value == NULL ? defaultValue : value;
 }
 


[3/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.

Posted by pn...@apache.org.
CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.

Multipart support can come back, but then in seperated service (i.e. pubsub_multipart_subscriber / pubsub_multipart_publisher
Also
 - remove multipart examples
 - Add optional init function to the subscriber api, making it possible for subscriber to tweak the receiver thread
 - Initial support for static zmq bind,discover and connect urls to use for (among others) testing


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b2548c84
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b2548c84
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b2548c84

Branch: refs/heads/feature/CELIX-454-pubsub-disc
Commit: b2548c84502e389b4e1565b9895e16459e9ef886
Parents: 68f69f8
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Tue Nov 6 12:41:44 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Tue Nov 6 12:41:44 2018 +0100

----------------------------------------------------------------------
 bundles/pubsub/examples/CMakeLists.txt          |   40 -
 .../pubsub/examples/mp_pubsub/CMakeLists.txt    |   23 -
 .../examples/mp_pubsub/common/include/ew.h      |   53 -
 .../examples/mp_pubsub/common/include/ide.h     |   49 -
 .../mp_pubsub/common/include/kinematics.h       |   55 -
 .../mp_pubsub/msg_descriptors/msg_ew.descriptor |    9 -
 .../msg_descriptors/msg_ide.descriptor          |    9 -
 .../msg_descriptors/msg_kinematics.descriptor   |   10 -
 .../examples/mp_pubsub/publisher/CMakeLists.txt |   43 -
 .../private/include/mp_publisher_private.h      |   58 -
 .../publisher/private/src/mp_pub_activator.c    |  144 --
 .../publisher/private/src/mp_publisher.c        |  160 --
 .../mp_pubsub/subscriber/CMakeLists.txt         |   43 -
 .../private/include/mp_subscriber_private.h     |   50 -
 .../subscriber/private/src/mp_sub_activator.c   |  108 --
 .../subscriber/private/src/mp_subscriber.c      |  119 --
 .../private/include/pubsub_subscriber_private.h |    2 +-
 .../subscriber/private/src/pubsub_subscriber.c  |    2 +-
 bundles/pubsub/mock/src/publisher_mock.cc       |   14 -
 .../pubsub_admin_udp_mc/src/psa_activator.c     |    6 +-
 .../src/pubsub_udpmc_admin.c                    |   12 +-
 .../src/pubsub_udpmc_admin.h                    |    8 +-
 .../src/pubsub_udpmc_topic_receiver.c           |    7 +-
 .../src/pubsub_udpmc_topic_sender.c             |   10 +-
 .../pubsub/pubsub_admin_zmq/src/psa_activator.c |    6 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_admin.c     |   38 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_admin.h     |   36 +-
 .../src/pubsub_zmq_topic_receiver.c             |  121 +-
 .../src/pubsub_zmq_topic_receiver.h             |    1 +
 .../src/pubsub_zmq_topic_sender.c               |   49 +-
 .../src/pubsub_zmq_topic_sender.h               |    1 +
 .../pubsub_api/include/pubsub/publisher.h       |   21 +-
 .../pubsub_api/include/pubsub/subscriber.h      |   20 +-
 .../pubsub/pubsub_discovery/src/psd_activator.c |    2 +-
 .../src/pubsub_discovery_impl.c                 |    2 +-
 .../src/pubsub_discovery_impl.h                 |    2 +-
 .../pubsub/pubsub_spi/include/pubsub_admin.h    |   18 +-
 .../pubsub_spi/include/pubsub_listeners.h       |    7 +-
 .../pubsub/pubsub_spi/include/pubsub_utils.h    |    2 +
 .../pubsub/pubsub_spi/src/pubsub_utils_match.c  |   12 +-
 .../src/pubsub_topology_manager.c               | 1407 ++++++++++--------
 .../src/pubsub_topology_manager.h               |    2 +-
 libs/utils/src/properties.c                     |    5 +-
 43 files changed, 1009 insertions(+), 1777 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index e8113b9..2e43e6d 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -16,7 +16,6 @@
 # under the License.
 
 add_subdirectory(pubsub)
-add_subdirectory(mp_pubsub)
 
 find_program(ETCD_CMD NAMES etcd)
 find_program(XTERM_CMD NAMES xterm)
@@ -198,33 +197,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     )
     target_link_libraries(pubsub_subscriber2_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
-    # ZMQ Multipart
-    add_celix_container("pubsub_mp_subscriber_zmq"
-            GROUP "pubsub"
-            BUNDLES
-            Celix::shell
-            Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq
-            org.apache.celix.pubsub_subscriber.MpSubscriber
-            )
-    target_link_libraries(pubsub_mp_subscriber_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
-
-    add_celix_container("pubsub_mp_publisher_zmq"
-            GROUP "pubsub"
-            BUNDLES
-            Celix::shell
-            Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq
-            org.apache.celix.pubsub_publisher.MpPublisher
-            )
-    target_link_libraries(pubsub_mp_publisher_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
-
     if (ETCD_CMD AND XTERM_CMD)
         #Runtime starting two bundles using both zmq and upd mc pubsub
         add_celix_runtime(pubsub_rt_zmq_udpmc_combi
@@ -251,18 +223,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                 etcd
             USE_TERM
         )
-
-        #Runtime starting a multipart (multiple message in one send) publish and subscriber for zmq
-        add_celix_runtime(pubsub_rt_multipart_zmq
-            NAME zmq_multipart
-            GROUP pubsub
-            CONTAINERS
-                pubsub_mp_subscriber_zmq
-                pubsub_mp_publisher_zmq
-            COMMANDS
-                etcd
-            USE_TERM
-        )
     endif ()
 
 endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
deleted file mode 100644
index c828832..0000000
--- a/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-include_directories("common/include")
-
-add_subdirectory(publisher)
-add_subdirectory(subscriber)
-
-

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/ew.h b/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
deleted file mode 100644
index 81ca5f3..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ew.h
- *
- *  \date       Jan 15, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef EW_H_
-#define EW_H_
-
-#define MIN_AREA	50.0F
-#define MAX_AREA	15000.0F
-
-#define MSG_EW_NAME	"ew" //Has to match the message name in the msg descriptor!
-
-typedef enum color{
-	GREEN,
-	BLUE,
-	RED,
-	BLACK,
-	WHITE,
-	LAST_COLOR
-} color_e;
-
-const char* color_tostring[] = {"GREEN","BLUE","RED","BLACK","WHITE"};
-
-struct ew_data{
-	double area;
-	color_e color;
-};
-
-typedef struct ew_data* ew_data_pt;
-
-#endif /* EW_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/ide.h b/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
deleted file mode 100644
index 2b9588d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ide.h
- *
- *  \date       Jan 15, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef IDE_H_
-#define IDE_H_
-
-#define MSG_IDE_NAME	"ide" //Has to match the message name in the msg descriptor!
-
-typedef enum shape{
-	SQUARE,
-	CIRCLE,
-	TRIANGLE,
-	RECTANGLE,
-	HEXAGON,
-	LAST_SHAPE
-} shape_e;
-
-const char* shape_tostring[] = {"SQUARE","CIRCLE","TRIANGLE","RECTANGLE","HEXAGON"};
-
-struct ide{
-	shape_e shape;
-};
-
-typedef struct ide* ide_pt;
-
-#endif /* IDE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h b/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
deleted file mode 100644
index 5601509..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * kinematics.h
- *
- *  \date       Nov 12, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef KINEMATICS_H_
-#define KINEMATICS_H_
-
-#define MIN_LAT	-90.0F
-#define MAX_LAT	 90.0F
-#define MIN_LON	-180.0F
-#define MAX_LON	 180.0F
-
-#define MIN_OCCUR 1
-#define MAX_OCCUR 5
-
-#define MSG_KINEMATICS_NAME	"kinematics" //Has to match the message name in the msg descriptor!
-
-struct position{
-	double lat;
-	double lon;
-};
-
-typedef struct position position_t;
-
-struct kinematics{
-	position_t position;
-	int occurrences;
-};
-
-typedef struct kinematics* kinematics_pt;
-
-
-#endif /* KINEMATICS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
deleted file mode 100644
index 7eb8c29..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
+++ /dev/null
@@ -1,9 +0,0 @@
-:header
-type=message
-name=ew
-version=1.0.0
-:annotations
-classname=org.example.Ew
-:types
-:message
-{Di area color}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
deleted file mode 100644
index f26286d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
+++ /dev/null
@@ -1,9 +0,0 @@
-:header
-type=message
-name=ide
-version=1.0.0
-:annotations
-classname=org.example.Ide
-:types
-:message
-{i shape}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
deleted file mode 100644
index 447b645..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
+++ /dev/null
@@ -1,10 +0,0 @@
-:header
-type=message
-name=kinematics
-version=1.0.0
-:annotations
-classname=org.example.Kinematics
-:types
-position={DD lat long}
-:message
-{lposition;N position occurrences}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
deleted file mode 100644
index 76c9bce..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-add_celix_bundle(org.apache.celix.pubsub_publisher.MpPublisher
-    SYMBOLIC_NAME "apache_celix_pubsub_mp_publisher"
-    VERSION "1.0.0"
-    SOURCES 
-    	private/src/mp_pub_activator.c
-    	private/src/mp_publisher.c
-)
-target_link_libraries(org.apache.celix.pubsub_publisher.MpPublisher PRIVATE Celix::framework Celix::pubsub_api)
-target_include_directories(org.apache.celix.pubsub_publisher.MpPublisher PRIVATE private/include)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
-	DESTINATION "META-INF/descriptors"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
-    DESTINATION "META-INF/keys"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
-    DESTINATION "META-INF/keys/subscriber"
-)

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h b/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
deleted file mode 100644
index 9c227fd..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_publisher_private.h
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef MP_PUBLISHER_PRIVATE_H_
-#define MP_PUBLISHER_PRIVATE_H_
-
-#include <celixbool.h>
-
-#include "pubsub/publisher.h"
-
-struct pubsub_sender {
-	array_list_pt trackers;
-	const char *ident;
-	long bundleId;
-};
-
-typedef struct pubsub_sender * pubsub_sender_pt;
-
-typedef struct send_thread_struct{
-	pubsub_publisher_pt service;
-	pubsub_sender_pt publisher;
-} *send_thread_struct_pt;
-
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId);
-
-void publisher_start(pubsub_sender_pt client);
-void publisher_stop(pubsub_sender_pt client);
-
-void publisher_destroy(pubsub_sender_pt client);
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service);
-
-
-#endif /* MP_PUBLISHER_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c b/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
deleted file mode 100644
index 0b6041d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_pub_activator.c
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <sys/cdefs.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "constants.h"
-
-#include "mp_publisher_private.h"
-
-static const char * PUB_TOPICS[] = {
-		"multipart",
-		NULL
-};
-
-
-struct publisherActivator {
-	pubsub_sender_pt client;
-	array_list_pt trackerList;//List<service_tracker_pt>
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	struct publisherActivator * act = malloc(sizeof(*act));
-
-	const char* fwUUID = NULL;
-
-	bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-	if(fwUUID == NULL){
-		printf("MP_PUBLISHER: Cannot retrieve fwUUID.\n");
-		status = CELIX_INVALID_BUNDLE_CONTEXT;
-	}
-
-	if (status == CELIX_SUCCESS){
-		bundle_pt bundle = NULL;
-		long bundleId = 0;
-		bundleContext_getBundle(context,&bundle);
-		bundle_getBundleId(bundle,&bundleId);
-
-		arrayList_create(&(act->trackerList));
-		act->client = publisher_create(act->trackerList,fwUUID,bundleId);
-		*userData = act;
-	} else {
-		free(act);
-	}
-
-	return status;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-
-	struct publisherActivator * act = (struct publisherActivator *) userData;
-
-	int i;
-
-	char filter[128];
-	for(i=0; PUB_TOPICS[i] != NULL; i++){
-		const char* topic = PUB_TOPICS[i];
-
-		bundle_pt bundle = NULL;
-		long bundleId = 0;
-		bundleContext_getBundle(context,&bundle);
-		bundle_getBundleId(bundle,&bundleId);
-
-		service_tracker_pt tracker = NULL;
-		memset(filter,0,128);
-
-		snprintf(filter, 128, "(&(%s=%s)(%s=%s))", (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC,topic);
-
-		service_tracker_customizer_pt customizer = NULL;
-
-		serviceTrackerCustomizer_create(act->client,NULL,publisher_publishSvcAdded,NULL,publisher_publishSvcRemoved,&customizer);
-		serviceTracker_createWithFilter(context, filter, customizer, &tracker);
-
-		arrayList_add(act->trackerList,tracker);
-	}
-
-	publisher_start(act->client);
-
-	for(i=0;i<arrayList_size(act->trackerList);i++){
-		service_tracker_pt tracker = arrayList_get(act->trackerList,i);
-		serviceTracker_open(tracker);
-	}
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) {
-	struct publisherActivator * act = (struct publisherActivator *) userData;
-	int i;
-
-	for(i=0;i<arrayList_size(act->trackerList);i++){
-		service_tracker_pt tracker = arrayList_get(act->trackerList,i);
-		serviceTracker_close(tracker);
-	}
-
-	publisher_stop(act->client);
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt  __attribute__((unused)) context) {
-	struct publisherActivator * act = (struct publisherActivator *) userData;
-	int i;
-
-	for(i=0;i<arrayList_size(act->trackerList);i++){
-		service_tracker_pt tracker = arrayList_get(act->trackerList,i);
-		serviceTracker_destroy(tracker);
-	}
-
-	publisher_destroy(act->client);
-	arrayList_destroy(act->trackerList);
-
-	free(act);
-
-	return CELIX_SUCCESS;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c b/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
deleted file mode 100644
index f47be29..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_publisher.c
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <unistd.h>
-
-#include "service_tracker.h"
-#include "celix_threads.h"
-
-#include "ew.h"
-#include "ide.h"
-#include "kinematics.h"
-#include "mp_publisher_private.h"
-
-
-static bool stop=false;
-static celix_thread_t tid;
-
-static double randDouble(double min, double max){
-	double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min))) ;
-	return ret;
-}
-
-static unsigned int randInt(unsigned int min, unsigned int max){
-	double scaled = ((double)random())/((double)RAND_MAX);
-	return (max - min +1)*scaled + min;
-}
-
-static void* send_thread(void* arg){
-
-	send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
-
-	pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
-
-	unsigned int kin_msg_id = 0;
-	unsigned int ide_msg_id = 0;
-	unsigned int ew_msg_id = 0;
-
-	if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_KINEMATICS_NAME,&kin_msg_id) != 0 ){
-		printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_KINEMATICS_NAME);
-		return NULL;
-	}
-
-	if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_IDE_NAME,&ide_msg_id) != 0 ){
-		printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_IDE_NAME);
-		return NULL;
-	}
-
-	if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_EW_NAME,&ew_msg_id) != 0 ){
-		printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_EW_NAME);
-		return NULL;
-	}
-
-	kinematics_pt kin_data = calloc(1,sizeof(*kin_data));
-	ide_pt ide_data = calloc(1,sizeof(*ide_data));
-	ew_data_pt ew_data = calloc(1,sizeof(*ew_data));
-
-	unsigned int counter = 1;
-
-	while(stop==false){
-		kin_data->position.lat = randDouble(MIN_LAT,MAX_LAT);
-		kin_data->position.lon = randDouble(MIN_LON,MAX_LON);
-		kin_data->occurrences = randInt(MIN_OCCUR,MAX_OCCUR);
-		publish_svc->sendMultipart(publish_svc->handle,kin_msg_id,kin_data, PUBSUB_PUBLISHER_FIRST_MSG);
-		printf("Track#%u kin_data: pos=[%f, %f] occurrences=%d\n",counter,kin_data->position.lat,kin_data->position.lon, kin_data->occurrences);
-
-		ide_data->shape = (shape_e)randInt(0,LAST_SHAPE-1);
-		publish_svc->sendMultipart(publish_svc->handle,ide_msg_id,ide_data, PUBSUB_PUBLISHER_PART_MSG);
-		printf("Track#%u ide_data: shape=%s\n",counter,shape_tostring[(int)ide_data->shape]);
-
-		ew_data->area = randDouble(MIN_AREA,MAX_AREA);
-		ew_data->color = (color_e)randInt(0,LAST_COLOR-1);
-		publish_svc->sendMultipart(publish_svc->handle,ew_msg_id,ew_data, PUBSUB_PUBLISHER_LAST_MSG);
-		printf("Track#%u ew_data: area=%f color=%s\n",counter,ew_data->area,color_tostring[(int)ew_data->color]);
-
-		printf("\n");
-		sleep(2);
-		counter++;
-	}
-
-	free(ew_data);
-	free(ide_data);
-	free(kin_data);
-	free(st_struct);
-
-	return NULL;
-
-}
-
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId) {
-	pubsub_sender_pt publisher = malloc(sizeof(*publisher));
-
-	publisher->trackers = trackers;
-	publisher->ident = ident;
-	publisher->bundleId = bundleId;
-
-	return publisher;
-}
-
-void publisher_start(pubsub_sender_pt client) {
-	printf("MP_PUBLISHER: starting up...\n");
-}
-
-void publisher_stop(pubsub_sender_pt client) {
-	printf("MP_PUBLISHER: stopping...\n");
-}
-
-void publisher_destroy(pubsub_sender_pt client) {
-	client->trackers = NULL;
-	client->ident = NULL;
-	free(client);
-}
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){
-	pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service;
-	pubsub_sender_pt manager = (pubsub_sender_pt)handle;
-
-	printf("MP_PUBLISHER: new publish service exported (%s).\n",manager->ident);
-
-	send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
-	data->service = publish_svc;
-	data->publisher = manager;
-
-	celixThread_create(&tid,NULL,send_thread,(void*)data);
-	return CELIX_SUCCESS;
-}
-
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){
-	//publish_service_pt publish_svc = (publish_service_pt)service;
-	pubsub_sender_pt manager = (pubsub_sender_pt)handle;
-	printf("MP_PUBLISHER: publish service unexported (%s)!\n",manager->ident);
-	stop=true;
-	celixThread_join(tid,NULL);
-	return CELIX_SUCCESS;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
deleted file mode 100644
index 444764e..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-add_celix_bundle(org.apache.celix.pubsub_subscriber.MpSubscriber
-    SYMBOLIC_NAME "apache_celix_pubsub_mp_subscriber"
-    VERSION "1.0.0"
-    SOURCES 
-		private/src/mp_sub_activator.c
-		private/src/mp_subscriber.c
-)
-target_link_libraries(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE Celix::framework Celix::pubsub_api)
-target_include_directories(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE private/include)
-
-celix_bundle_files( org.apache.celix.pubsub_subscriber.MpSubscriber
-    ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
-    ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
-    ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
-	DESTINATION "META-INF/descriptors"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
-		${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber
-    DESTINATION "META-INF/keys"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher/public
-    DESTINATION "META-INF/keys/publisher"
-)

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h b/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
deleted file mode 100644
index 2d582b3..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_subscriber_private.h
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_SUBSCRIBER_PRIVATE_H_
-#define PUBSUB_SUBSCRIBER_PRIVATE_H_
-
-
-#include <string.h>
-
-#include "celixbool.h"
-
-#include "pubsub/subscriber.h"
-
-struct pubsub_receiver {
-	char * name;
-};
-
-typedef struct pubsub_receiver* pubsub_receiver_pt;
-
-pubsub_receiver_pt subscriber_create(char* topics);
-void subscriber_start(pubsub_receiver_pt client);
-void subscriber_stop(pubsub_receiver_pt client);
-void subscriber_destroy(pubsub_receiver_pt client);
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
-
-#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c b/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
deleted file mode 100644
index 591a395..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_sub_activator.c
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "pubsub/subscriber.h"
-#include "bundle_activator.h"
-
-#include "mp_subscriber_private.h"
-
-#define SUB_NAME "multipart"
-static const char * SUB_TOPICS[] = {
-		"multipart",
-		NULL
-};
-
-struct subscriberActivator {
-	array_list_pt registrationList; //List<service_registration_pt>
-	pubsub_subscriber_t* subsvc;
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
-	struct subscriberActivator * act = calloc(1,sizeof(struct subscriberActivator));
-	*userData = act;
-	arrayList_create(&(act->registrationList));
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-	struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
-
-	pubsub_subscriber_pt subsvc = calloc(1,sizeof(*subsvc));
-	pubsub_receiver_pt sub = subscriber_create(SUB_NAME);
-	subsvc->handle = sub;
-	subsvc->receive = pubsub_subscriber_recv;
-
-	act->subsvc = subsvc;
-
-	int i;
-	for(i=0; SUB_TOPICS[i] != NULL; i++){
-		const char* topic = SUB_TOPICS[i];
-
-		properties_pt props = properties_create();
-		properties_set(props, PUBSUB_SUBSCRIBER_TOPIC,topic);
-		service_registration_pt reg = NULL;
-		bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, subsvc, props, &reg);
-		arrayList_add(act->registrationList,reg);
-	}
-
-	subscriber_start((pubsub_receiver_pt)act->subsvc->handle);
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
-	struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
-	int i;
-	for(i=0; i<arrayList_size(act->registrationList);i++){
-		service_registration_pt reg = arrayList_get(act->registrationList,i);
-		serviceRegistration_unregister(reg);
-
-	}
-
-	subscriber_stop((pubsub_receiver_pt)act->subsvc->handle);
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
-
-	struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
-	act->subsvc->receive = NULL;
-	subscriber_destroy((pubsub_receiver_pt)act->subsvc->handle);
-	act->subsvc->handle = NULL;
-	free(act->subsvc);
-	act->subsvc = NULL;
-
-	arrayList_destroy(act->registrationList);
-	free(act);
-
-	return CELIX_SUCCESS;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c b/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
deleted file mode 100644
index a5ad03a..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_subscriber.c
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "ew.h"
-#include "ide.h"
-#include "kinematics.h"
-#include "mp_subscriber_private.h"
-
-pubsub_receiver_pt subscriber_create(char* topics) {
-	pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
-	sub->name = strdup(topics);
-	return sub;
-}
-
-
-void subscriber_start(pubsub_receiver_pt subscriber){
-	printf("MP_SUBSCRIBER: starting up...\n");
-}
-
-void subscriber_stop(pubsub_receiver_pt subscriber){
-	printf("MP_SUBSCRIBER: stopping...\n");
-}
-
-void subscriber_destroy(pubsub_receiver_pt subscriber){
-	if(subscriber->name!=NULL){
-		free(subscriber->name);
-	}
-	subscriber->name=NULL;
-	free(subscriber);
-}
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
-
-	unsigned int kin_msg_id = 0;
-	unsigned int ide_msg_id = 0;
-	unsigned int ew_msg_id = 0;
-
-	if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_KINEMATICS_NAME,&kin_msg_id) != 0 ){
-		printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_KINEMATICS_NAME);
-		return -1;
-	}
-
-	if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_IDE_NAME,&ide_msg_id) != 0 ){
-		printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_IDE_NAME);
-		return -1;
-	}
-
-	if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_EW_NAME,&ew_msg_id) != 0 ){
-		printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_EW_NAME);
-		return -1;
-	}
-
-	if(msgTypeId!=kin_msg_id){
-		printf("MP_SUBSCRIBER: Multipart Message started with wrong message (expected %u, got %u)\n",msgTypeId,kin_msg_id);
-		return -1;
-	}
-
-	kinematics_pt kin_data = (kinematics_pt)msg;
-
-	void* ide_msg = NULL;
-	callbacks->getMultipart(callbacks->handle,ide_msg_id,false,&ide_msg);
-
-	void* ew_msg = NULL;
-	callbacks->getMultipart(callbacks->handle,ew_msg_id,false,&ew_msg);
-
-	if(kin_data==NULL){
-		printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_KINEMATICS_NAME);
-	}
-	else{
-		printf("kin_data: pos=[%f, %f] occurrences=%d\n",kin_data->position.lat,kin_data->position.lon, kin_data->occurrences);
-	}
-
-	if(ide_msg==NULL){
-		printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_IDE_NAME);
-	}
-	else{
-		ide_pt ide_data = (ide_pt)ide_msg;
-		printf("ide_data: shape=%s\n",shape_tostring[(int)ide_data->shape]);
-	}
-
-	if(ew_msg==NULL){
-		printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_EW_NAME);
-	}
-	else{
-		ew_data_pt ew_data = (ew_data_pt)ew_msg;
-		printf("ew_data: area=%f color=%s\n",ew_data->area,color_tostring[(int)ew_data->color]);
-	}
-
-	printf("\n");
-
-	return 0;
-
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
index 00ca9b4..291a6aa 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -45,7 +45,7 @@ void subscriber_start(pubsub_receiver_pt client);
 void subscriber_stop(pubsub_receiver_pt client);
 void subscriber_destroy(pubsub_receiver_pt client);
 
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release);
 
 
 #endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index a137253..7cfbedb 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -53,7 +53,7 @@ void subscriber_destroy(pubsub_receiver_pt subscriber){
 	free(subscriber);
 }
 
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release){
 
 	location_t place = (location_t)msg;
 	int nrchars = 25;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/mock/src/publisher_mock.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/mock/src/publisher_mock.cc b/bundles/pubsub/mock/src/publisher_mock.cc
index e8902a8..1bdae98 100644
--- a/bundles/pubsub/mock/src/publisher_mock.cc
+++ b/bundles/pubsub/mock/src/publisher_mock.cc
@@ -45,24 +45,10 @@ static int pubsub__publisherMock_send(void *handle, unsigned int msgTypeId, cons
 }
 
 /*============================================================================
-  MOCK - mock function for pubsub_publisher->sendMultipart
-  ============================================================================*/
-static int pubsub__publisherMock_sendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags) {
-    return mock(PUBSUB_PUBLISHERMOCK_SCOPE)
-        .actualCall(PUBSUB_PUBLISHERMOCK_SEND_MULTIPART_METHOD)
-        .withPointerParameter("handle", handle)
-        .withParameter("msgTypeId", msgTypeId)
-        .withPointerParameter("msg", (void*)msg)
-        .withParameter("flags", flags)
-        .returnIntValue();
-}
-
-/*============================================================================
   MOCK - mock setup for publisher service
   ============================================================================*/
 void pubsub_publisherMock_init(pubsub_publisher_t* srv, void* handle) {
     srv->handle = handle;
     srv->localMsgTypeIdForMsgType = pubsub__publisherMock_localMsgTypeIdForMsgType;
     srv->send = pubsub__publisherMock_send;
-    srv->sendMultipart = pubsub__publisherMock_sendMultipart;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index c838899..63de809 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -71,13 +71,13 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
 		psaSvc->handle = act->admin;
 		psaSvc->matchPublisher = pubsub_udpmcAdmin_matchPublisher;
 		psaSvc->matchSubscriber = pubsub_udpmcAdmin_matchSubscriber;
-		psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint;
+		psaSvc->matchDiscoveredEndpoint = pubsub_udpmcAdmin_matchEndpoint;
 		psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
 		psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
 		psaSvc->setupTopicReceiver = pubsub_udpmcAdmin_setupTopicReceiver;
 		psaSvc->teardownTopicReceiver = pubsub_udpmcAdmin_teardownTopicReceiver;
-		psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint;
-		psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint;
+		psaSvc->addDiscoveredEndpoint = pubsub_udpmcAdmin_addEndpoint;
+		psaSvc->removeDiscoveredEndpoint = pubsub_udpmcAdmin_removeEndpoint;
 
 		celix_properties_t *props = celix_properties_create();
 		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDPMC_ADMIN_TYPE);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 6d06ee6..73fab20 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -244,23 +244,23 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
     free(psa);
 }
 
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
     pubsub_udpmc_admin_t *psa = handle;
     L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
-                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
     *outScore = score;
 
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
     pubsub_udpmc_admin_t *psa = handle;
     L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
-            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
     if (outScore != NULL) {
         *outScore = score;
     }
@@ -278,7 +278,7 @@ celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_propert
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_udpmc_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
@@ -358,7 +358,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_udpmc_admin_t *psa = handle;
 
     celix_properties_t *newEndpoint = NULL;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 02ebd44..17b8957 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -40,14 +40,14 @@ typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
 pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
 void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
 
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
 celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
 
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
 celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
 celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
 void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index efebf7c..a2f37d4 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -396,13 +396,8 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
 
                 if (status == CELIX_SUCCESS) {
                     bool release = true;
-                    pubsub_multipart_callbacks_t mp_callbacks;
-                    mp_callbacks.handle = receiver;
-                    mp_callbacks.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
-                    mp_callbacks.getMultipart = NULL;
-
                     pubsub_subscriber_t *svc = entry->svc;
-                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &release);
 
                     if(release){
                         msgSer->freeMsg(msgSer,msgInst);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 59c809c..ddc6172 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -79,7 +79,7 @@ typedef struct pubsub_msg{
 static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
-static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback);
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg);
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
 pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
@@ -205,7 +205,6 @@ static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *r
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
             entry->service.send = psa_udpmc_topicPublicationSend;
-            entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
             hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
             svc = &entry->service;
         } else {
@@ -277,7 +276,7 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
             msg->payloadSize = serializedOutputLen;
 
 
-            if (psa_udpmc_sendMsg(entry, msg, true, NULL) == false) {
+            if (psa_udpmc_sendMsg(entry, msg) == false) {
                 status = -1;
             }
             free(msg);
@@ -306,7 +305,7 @@ static void delay_first_send_for_late_joiners(){
     }
 }
 
-static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback) {
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg) {
     const int iovec_len = 3; // header + size + payload
     bool ret = true;
 
@@ -325,9 +324,6 @@ static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_m
         ret = false;
     }
 
-    if(releaseCallback) {
-        releaseCallback->release(msg->payload, entry);
-    }
     return ret;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index 01547cc..58f093a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -70,13 +70,13 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
 		psaSvc->handle = act->admin;
 		psaSvc->matchPublisher = pubsub_zmqAdmin_matchPublisher;
 		psaSvc->matchSubscriber = pubsub_zmqAdmin_matchSubscriber;
-		psaSvc->matchEndpoint = pubsub_zmqAdmin_matchEndpoint;
+		psaSvc->matchDiscoveredEndpoint = pubsub_zmqAdmin_matchDiscoveredEndpoint;
 		psaSvc->setupTopicSender = pubsub_zmqAdmin_setupTopicSender;
 		psaSvc->teardownTopicSender = pubsub_zmqAdmin_teardownTopicSender;
 		psaSvc->setupTopicReceiver = pubsub_zmqAdmin_setupTopicReceiver;
 		psaSvc->teardownTopicReceiver = pubsub_zmqAdmin_teardownTopicReceiver;
-		psaSvc->addEndpoint = pubsub_zmqAdmin_addEndpoint;
-		psaSvc->removeEndpoint = pubsub_zmqAdmin_removeEndpoint;
+		psaSvc->addDiscoveredEndpoint = pubsub_zmqAdmin_addDiscoveredEndpoint;
+		psaSvc->removeDiscoveredEndpoint = pubsub_zmqAdmin_removeDiscoveredEndpoint;
 
 		celix_properties_t *props = celix_properties_create();
 		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index a13cb26..0192bb4 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -308,30 +308,30 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
     celixThreadMutex_unlock(&psa->serializers.mutex);
 }
 
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_ZMQ_ADMIN_TYPE,
-                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
     *outScore = score;
 
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_ZMQ_ADMIN_TYPE,
-            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
     if (outScore != NULL) {
         *outScore = score;
     }
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
@@ -342,7 +342,7 @@ celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_propertie
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_zmq_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
@@ -353,6 +353,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
 
     celix_properties_t *newEndpoint = NULL;
 
+    const char * staticBindUrl = topicProperties != NULL ?
+            celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_BIND_URL, NULL) : NULL;
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
     celixThreadMutex_lock(&psa->serializers.mutex);
@@ -361,8 +363,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
     if (sender == NULL) {
         psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
         if (serEntry != NULL) {
-            sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc, psa->ipAddress,
-                                                  psa->basePort, psa->maxPort);
+            sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
+                    psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
         }
         if (sender != NULL) {
             const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
@@ -370,6 +372,15 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
                                                 serType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
+
+            //if configured use a static discover url
+            const char *staticDiscUrl = topicProperties != NULL ?
+                    celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_DISCOVER_URL, NULL) : NULL;
+            if (staticDiscUrl != NULL) {
+                celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl);
+            }
+            celix_properties_setBool(newEndpoint, PUBSUB_ZMQ_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL);
+
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {
@@ -423,11 +434,14 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_zmq_admin_t *psa = handle;
 
     celix_properties_t *newEndpoint = NULL;
 
+    const char *staticConnectUrls = topicProperties != NULL ?
+            celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_CONNECT_URLS, NULL) : NULL;
+
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -435,7 +449,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
     if (receiver == NULL) {
         psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
         if (serEntry != NULL) {
-            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc);
+            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, staticConnectUrls, serializerSvcId, serEntry->svc);
         } else {
             L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
         }
@@ -530,7 +544,7 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_zmq_admin_t *psa = handle;
 
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
@@ -581,7 +595,7 @@ static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_zmq_admin_t *psa = handle;
 
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index 180a9db..304ad11 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -35,23 +35,45 @@
 
 #define PUBSUB_ZMQ_DEFAULT_IP       "127.0.0.1"
 
+/**
+ * Can be set in the topic properties to fix a static bind url
+ */
+#define PUBSUB_ZMQ_STATIC_BIND_URL       "zmq.static.bind.url"
+
+/**
+ * Can be set in the topic properties to fix a static url used for discovery
+ */
+#define PUBSUB_ZMQ_STATIC_DISCOVER_URL       "zmq.static.bind.url"
+
+/**
+ * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_ZMQ_STATIC_CONFIGURED       "zmq.static.configured"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_ZMQ_STATIC_CONNECT_URLS    "zmq.static.connect.url"
+
+
 typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
 
 pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
 void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
 
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
 
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
 celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
 celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
 
 void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
 void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index b348fdc..bb190cc 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -66,38 +66,46 @@ struct pubsub_zmq_topic_receiver {
     struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+        bool allConnected; //true if all requestedConnectection are connected
     } requestedConnections;
 
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+        bool allInitialized;
     } subscribers;
 };
 
 typedef struct psa_zmq_requested_connection_entry {
     char *url;
     bool connected;
+    bool statically; //true if the connection is statically configured through the topic properties.
 } psa_zmq_requested_connection_entry_t;
 
 typedef struct psa_zmq_subscriber_entry {
     int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
     pubsub_subscriber_t *svc;
+    bool initialized; //true if the init function is called through the receive thread
 } psa_zmq_subscriber_entry_t;
 
 
 static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void* psa_zmq_recvThread(void * data);
+static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver);
+static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
+
 
 
 pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
                                                               log_helper_t *logHelper,
                                                               const char *scope,
-                                                                const char *topic,
-                                                                long serializerSvcId,
-                                                                pubsub_serializer_service_t *serializer) {
+                                                              const char *topic,
+                                                              const char *staticConnectUrls,
+                                                              long serializerSvcId,
+                                                              pubsub_serializer_service_t *serializer) {
     pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
@@ -185,6 +193,22 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
     }
 
+    if (receiver->zmqSocket != NULL && staticConnectUrls != NULL) {
+        char *urlsCopy = strndup(staticConnectUrls, 1024*1024);
+        char* url;
+        char* save = urlsCopy;
+
+        while ((url = strtok_r(save, " ", &save))) {
+            psa_zmq_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+            entry->statically = true;
+            entry->connected = false;
+            entry->url = strndup(url, 1024*1024);
+            hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+            receiver->requestedConnections.allConnected = false;
+        }
+        free(urlsCopy);
+    }
+
     //track subscribers
     if (receiver->zmqSocket != NULL ) {
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -280,10 +304,12 @@ void pubsub_zmqTopicReceiver_listConnections(pubsub_zmq_topic_receiver_t *receiv
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+        char *url = NULL;
+        asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
         if (entry->connected) {
-            celix_arrayList_add(connectedUrls, strndup(entry->url, 1024));
+            celix_arrayList_add(connectedUrls, url);
         } else {
-            celix_arrayList_add(unconnectedUrls, strndup(entry->url, 1024));
+            celix_arrayList_add(unconnectedUrls, url);
         }
     }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
@@ -301,16 +327,13 @@ void pubsub_zmqTopicReceiver_connectTo(
         entry = calloc(1, sizeof(*entry));
         entry->url = strndup(url, 1024*1024);
         entry->connected = false;
+        entry->statically = false;
         hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
-    }
-    if (!entry->connected) {
-        if (zsock_connect(receiver->zmqSocket, "%s", url) == 0) {
-            entry->connected = true;
-        } else {
-            L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", url, strerror(errno));
-        }
+        receiver->requestedConnections.allConnected = false;
     }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    psa_zmq_connectToAllRequestedConnections(receiver);
 }
 
 void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url) {
@@ -351,6 +374,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
         entry = calloc(1, sizeof(*entry));
         entry->usageCount = 1;
         entry->svc = svc;
+        entry->initialized = false;
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
         if (rc == 0) {
@@ -396,7 +420,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
             celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
             if(status == CELIX_SUCCESS) {
                 bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
+                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
                 if (release) {
                     msgSer->freeMsg(msgSer->handle, deserializedMsg);
                 }
@@ -428,7 +452,23 @@ static void* psa_zmq_recvThread(void * data) {
     bool running = receiver->recvThread.running;
     celixThreadMutex_unlock(&receiver->recvThread.mutex);
 
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    bool allConnected = receiver->requestedConnections.allConnected;
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    bool allInitialized = receiver->subscribers.allInitialized;
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+
     while (running) {
+        if (!allConnected) {
+            psa_zmq_connectToAllRequestedConnections(receiver);
+        }
+        if (!allInitialized) {
+            psa_zmq_initializeAllSubscribers(receiver);
+        }
+
         zmsg_t *zmsg = zmsg_recv(receiver->zmqSocket);
         if (zmsg != NULL) {
             if (zmsg_size(zmsg) != 3) {
@@ -458,7 +498,62 @@ static void* psa_zmq_recvThread(void * data) {
         celixThreadMutex_lock(&receiver->recvThread.mutex);
         running = receiver->recvThread.running;
         celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+        celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+        allConnected = receiver->requestedConnections.allConnected;
+        celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+        celixThreadMutex_lock(&receiver->subscribers.mutex);
+        allInitialized = receiver->subscribers.allInitialized;
+        celixThreadMutex_unlock(&receiver->subscribers.mutex);
     } // while
 
     return NULL;
 }
+
+
+static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver) {
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    if (!receiver->requestedConnections.allConnected) {
+        bool allConnected = true;
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->connected){
+                if (zsock_connect(receiver->zmqSocket, "%s", entry->url) == 0) {
+                    entry->connected = true;
+                } else {
+                    L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", entry->url, strerror(errno));
+                    allConnected = false;
+                }
+            }
+        }
+        receiver->requestedConnections.allConnected = allConnected;
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver) {
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    if (!receiver->subscribers.allInitialized) {
+        bool allInitialized = true;
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->initialized) {
+                int rc = 0;
+                if (entry->svc != NULL && entry->svc->init != NULL) {
+                    rc = entry->svc->init(entry->svc->handle);
+                }
+                if (rc == 0) {
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
+                }
+            }
+        }
+        receiver->subscribers.allInitialized = allInitialized;
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index 9049153..d093bfd 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -27,6 +27,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         log_helper_t *logHelper,
         const char *scope, 
         const char *topic,
+        const char *staticConnectUrls,
         long serializerSvcId,
         pubsub_serializer_service_t *serializer);
 void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index f6221a2..9ad7783 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -95,6 +95,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         long serializerSvcId,
         pubsub_serializer_service_t *ser,
         const char *bindIP,
+        const char *staticBindUrl,
         unsigned int basePort,
         unsigned int maxPort) {
     pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
@@ -172,29 +173,38 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
 	    }
 #endif
 
-        int rv = -1, retry=0;
-        while(rv==-1 && retry < ZMQ_BIND_MAX_RETRY ) {
-            /* Randomized part due to same bundle publishing on different topics */
-            unsigned int port = rand_range(basePort,maxPort);
+        if (staticBindUrl != NULL) {
+            int rv = zsock_bind (socket, "%s", staticBindUrl);
+            if (rv == -1) {
+                L_WARN("Error for zmq_bind using static bind url '%s'. %s", staticBindUrl, strerror(errno));
+            } else {
+                sender->url = strndup(staticBindUrl, 1024*1024);
+            }
+        } else {
 
-            size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
-            char *url = calloc(len, sizeof(char*));
-            snprintf(url, len, "tcp://%s:%u", bindIP, port);
+            int retry = 0;
+            while (sender->url == NULL && retry < ZMQ_BIND_MAX_RETRY) {
+                /* Randomized part due to same bundle publishing on different topics */
+                unsigned int port = rand_range(basePort, maxPort);
 
-            len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
-            char *bindUrl = calloc(len, sizeof(char));
-            snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
+                char *url = NULL;
+                asprintf(&url, "tcp://%s:%u", bindIP, port);
 
-            rv = zsock_bind (socket, "%s", bindUrl);
-            if (rv == -1) {
-                perror("Error for zmq_bind");
-                free(url);
-            } else {
-                sender->url = url;
-                sender->zmq.socket = socket;
+                char *bindUrl = NULL;
+                asprintf(&bindUrl, "tcp://0.0.0.0:%u", port);
+
+
+                int rv = zsock_bind(socket, "%s", bindUrl);
+                if (rv == -1) {
+                    L_WARN("Error for zmq_bind using dynamic bind url '%s'. %s", bindUrl, strerror(errno));
+                    free(url);
+                } else {
+                    sender->url = url;
+                    sender->zmq.socket = socket;
+                }
+                retry++;
+                free(bindUrl);
             }
-            retry++;
-            free(bindUrl);
         }
     }
 
@@ -305,7 +315,6 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
             entry->service.send = psa_zmq_topicPublicationSend;
-            entry->service.sendMultipart = NULL; //not supported TODO remove
             hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
         } else {
             L_ERROR("Error creating serializer map for ZMQ TopicSender %s/%s", sender->scope, sender->topic);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
index e537edd..af63870 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -31,6 +31,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         long serializerSvcId,
         pubsub_serializer_service_t *ser,
         const char *bindIP,
+        const char *staticBindUrl,
         unsigned int basePort,
         unsigned int maxPort);
 void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);


[2/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.

Posted by pn...@apache.org.
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 9f7f3b6..fb8c5f6 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -30,7 +30,7 @@
 #include <stdlib.h>
 
 #define PUBSUB_PUBLISHER_SERVICE_NAME           "pubsub.publisher"
-#define PUBSUB_PUBLISHER_SERVICE_VERSION	    "2.0.0"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION	    "3.0.0"
  
 //properties
 #define PUBSUB_PUBLISHER_TOPIC                  "topic"
@@ -38,17 +38,7 @@
 #define PUBSUB_PUBLISHER_CONFIG                 "pubsub.config"
  
 #define PUBSUB_PUBLISHER_SCOPE_DEFAULT			"default"
-//flags
-#define PUBSUB_PUBLISHER_FIRST_MSG  01
-#define PUBSUB_PUBLISHER_PART_MSG   02
-#define PUBSUB_PUBLISHER_LAST_MSG   04
 
-struct pubsub_release_callback_struct {
-    void *handle;
-    void (*release)(char *buf, void *handle);
-};
-typedef struct pubsub_release_callback_struct pubsub_release_callback_t;
-typedef struct pubsub_release_callback_struct* pubsub_release_callback_pt;
  
  
 struct pubsub_publisher {
@@ -71,15 +61,6 @@ struct pubsub_publisher {
      */
     int (*send)(void *handle, unsigned int msgTypeId, const void *msg);
  
-  
-    /**
-     * sendMultipart is a async function, but the msg can be safely deleted after send returns.
-     * The first (primary) message of a multipart message must have the flag PUBLISHER_PRIMARY_MSG
-     * The last message of a multipart message must have the flag PUBLISHER_LAST_MSG
-     * Returns 0 on success.
-     */
-    int (*sendMultipart)(void *handle, unsigned int msgTypeId, const void *msg, int flags);
- 
 };
 typedef struct pubsub_publisher pubsub_publisher_t;
 typedef struct pubsub_publisher* pubsub_publisher_pt;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index ca6d4d1..7a0d85b 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -30,7 +30,7 @@
 #include <stdbool.h>
 
 #define PUBSUB_SUBSCRIBER_SERVICE_NAME          "pubsub.subscriber"
-#define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "2.0.0"
+#define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "3.0.0"
  
 //properties
 #define PUBSUB_SUBSCRIBER_TOPIC                "topic"
@@ -38,17 +38,15 @@
 #define PUBSUB_SUBSCRIBER_CONFIG               "pubsub.config"
 
 #define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT        "default"
- 
-struct pubsub_multipart_callbacks_struct {
-    void *handle;
-    int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgId);
-    int (*getMultipart)(void *handle, unsigned int msgTypeId, bool retain, void **part);
-};
-typedef struct pubsub_multipart_callbacks_struct pubsub_multipart_callbacks_t;
-typedef struct pubsub_multipart_callbacks_struct* pubsub_multipart_callbacks_pt;
- 
+
 struct pubsub_subscriber_struct {
     void *handle;
+
+    /**
+     * Called to initialize the subscriber with the receiver thread.
+     * Can be used to tweak the receiver thread attributes
+     */
+    int (*init)(void *handle);
      
     /**
      * When a new message for a topic is available the receive will be called.
@@ -64,7 +62,7 @@ struct pubsub_subscriber_struct {
      *
      * this method can be  NULL.
      */
-    int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
+    int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
 
 };
 typedef struct pubsub_subscriber_struct pubsub_subscriber_t;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index 56d8997..06e3c56 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -67,7 +67,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
 
 	act->listenerSvc.handle = act->pubsub_discovery;
 	act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint;
-	act->listenerSvc.removeEndpoint = pubsub_discovery_removeEndpoint;
+	act->listenerSvc.revokeEndpoint = pubsub_discovery_revokeEndpoint;
 
 	//register shell command service
 	//register shell command

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index de708ff..673bdfd 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -422,7 +422,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
 
     return status;
 }
-celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_discovery_t *disc = handle;
     celix_status_t status = CELIX_SUCCESS;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index b2726fb..269fe5a 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -95,7 +95,7 @@ void pubsub_discovery_discoveredEndpointsListenerAdded(void *handle, void *svc,
 void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
 
 celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_properties_t *endpoint);
 
 celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine, FILE *os, FILE *errorStream);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index 7f1cc73..64abe24 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -32,8 +32,8 @@
 #include "celix_filter.h"
 
 #define PUBSUB_ADMIN_SERVICE_NAME		"pubsub_admin"
-#define PUBSUB_ADMIN_SERVICE_VERSION	"2.0.0"
-#define PUBSUB_ADMIN_SERVICE_RANGE		"[2,3)"
+#define PUBSUB_ADMIN_SERVICE_VERSION	"3.0.0"
+#define PUBSUB_ADMIN_SERVICE_RANGE		"[3,4)"
 
 //expected service properties
 #define PUBSUB_ADMIN_SERVICE_TYPE		"psa_type"
@@ -44,20 +44,20 @@
 struct pubsub_admin_service {
 	void *handle;
 
-	celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-	celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-	celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
+	celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScopre, long *outSerializerSvcId);
+	celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
+	celix_status_t (*matchDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
 
 	//note endpoint is owned by caller
-	celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+	celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
 	celix_status_t (*teardownTopicSender)(void *handle, const char *scope, const char *topic);
 
 	//note endpoint is owned by caller
-	celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+	celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
 	celix_status_t (*teardownTopicReceiver)(void *handle, const char *scope, const char *topic);
 
-	celix_status_t (*addEndpoint)(void *handle, const celix_properties_t *endpoint);
-	celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *endpoint);
+	celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
+	celix_status_t (*removeDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
 };
 
 typedef struct pubsub_admin_service pubsub_admin_service_t;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
index 7e4bfec..be4e569 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
@@ -33,15 +33,12 @@ typedef struct pubsub_discovered_endpoint_listener pubsub_discovered_endpoint_li
 
 
 
-//Informs the discovery admins to publish info into the network
+//Informs the pubsub discoveries to announce/revoke endpoint
 struct pubsub_announce_endpoint_listener {
 	void *handle;
 
 	celix_status_t (*announceEndpoint)(void *handle, const celix_properties_t *properties);
-	celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *properties);
-
-	//getCurrentSubscriberEndPoints
-	//getCurrentPublisherEndPoints
+	celix_status_t (*revokeEndpoint)(void *handle, const celix_properties_t *properties);
 };
 
 typedef struct pubsub_announce_endpoint_listener pubsub_announce_endpoint_listener_t;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 66cc44a..9d707f3 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -48,6 +48,7 @@ double pubsub_utils_matchPublisher(
         double sampleScore,
         double controlScore,
         double defaultScore,
+        celix_properties_t **outTopicProperties,
         long *outSerializerSvcId);
 
 double pubsub_utils_matchSubscriber(
@@ -58,6 +59,7 @@ double pubsub_utils_matchSubscriber(
         double sampleScore,
         double controlScore,
         double defaultScore,
+        celix_properties_t **outTopicProperties,
         long *outSerializerSvcId);
 
 bool pubsub_utils_matchEndpoint(

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index dc5c35e..42c141f 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -107,6 +107,7 @@ double pubsub_utils_matchPublisher(
 		double sampleScore,
 		double controlScore,
 		double defaultScore,
+		celix_properties_t **outTopicProperties,
 		long *outSerializerSvcId) {
 
 	celix_properties_t *ep = pubsubEndpoint_createFromPublisherTrackerInfo(ctx, bundleId, filter);
@@ -132,8 +133,10 @@ double pubsub_utils_matchPublisher(
 		*outSerializerSvcId = serializerSvcId;
 	}
 
-	if (ep != NULL) {
-		celix_properties_destroy(ep); //TODO improve function to that tmp endpoint is not needed -> parse filter
+	if (outTopicProperties != NULL) {
+		*outTopicProperties = ep;
+	} else if (ep != NULL) {
+		celix_properties_destroy(ep);
 	}
 
 	return score;
@@ -159,6 +162,7 @@ double pubsub_utils_matchSubscriber(
 		double sampleScore,
 		double controlScore,
 		double defaultScore,
+		celix_properties_t **outTopicProperties,
 		long *outSerializerSvcId) {
 
 	pubsub_get_topic_properties_data_t data;
@@ -188,7 +192,9 @@ double pubsub_utils_matchSubscriber(
 		*outSerializerSvcId = serializerSvcId;
 	}
 
-	if (ep != NULL) {
+	if (outTopicProperties != NULL) {
+		*outTopicProperties = ep;
+	} else if (ep != NULL) {
 		celix_properties_destroy(ep);
 	}