You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:27 UTC

[10/34] celix git commit: CELIX-454: More PubSub. The UDPMC is now somewhat working again, but still needs some testing.

CELIX-454: More PubSub. The UDPMC is now somewhat working again, but still needs some testing.


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/2eb219e3
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/2eb219e3
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/2eb219e3

Branch: refs/heads/develop
Commit: 2eb219e393cfafd47a99739da3e56e0ab882799a
Parents: 69596cf
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Wed Sep 26 21:13:50 2018 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Wed Sep 26 21:13:50 2018 +0200

----------------------------------------------------------------------
 .../pubsub/pubsub_admin_udp_mc/CMakeLists.txt   |   2 +
 .../pubsub_admin_udp_mc/src/psa_activator.c     |   6 +-
 .../src/pubsub_udpmc_admin.c                    | 295 +++++++++-
 .../src/pubsub_udpmc_admin.h                    |  47 +-
 .../src/pubsub_udpmc_common.c                   |  40 ++
 .../src/pubsub_udpmc_common.h                   |  39 ++
 .../src/pubsub_udpmc_topic_receiver.c           | 450 +++++++++++++++
 .../src/pubsub_udpmc_topic_receiver.h           |  45 ++
 .../src/pubsub_udpmc_topic_sender.c             | 305 +++++++++-
 .../src/pubsub_udpmc_topic_sender.h             |  10 +-
 .../pubsub_admin_udp_mc/src/topic_publication.c |   2 +-
 .../pubsub_admin_zmq/src/topic_subscription.c   |   2 +-
 .../pubsub/pubsub_discovery/src/psd_activator.c |  19 +-
 .../src/pubsub_discovery_impl.c                 |  32 +-
 .../src/pubsub_discovery_impl.h                 |   8 +-
 .../pubsub/pubsub_spi/include/pubsub_admin.h    |   2 +-
 .../pubsub/pubsub_spi/include/pubsub_common.h   |   2 +-
 .../pubsub/pubsub_spi/include/pubsub_utils.h    |   5 +-
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c |   9 +-
 .../pubsub/pubsub_spi/src/pubsub_utils_match.c  |  33 +-
 .../src/pubsub_topology_manager.c               | 565 +++++++++++--------
 .../src/pubsub_topology_manager.h               |  15 +-
 libs/framework/include/celix_bundle_context.h   |   2 +-
 23 files changed, 1576 insertions(+), 359 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index 94ce5cc..3f74376 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -25,6 +25,8 @@ add_celix_bundle(celix_pubsub_admin_udp_multicast
         src/psa_activator.c
 		src/pubsub_udpmc_admin.c
 		src/pubsub_udpmc_topic_sender.c
+		src/pubsub_udpmc_topic_receiver.c
+		src/pubsub_udpmc_common.c
         #src/pubsub_admin_impl.c
         #src/topic_subscription.c
         #src/topic_publication.c

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index 406720e..682efc5 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -59,8 +59,8 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
 		psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint;
 		psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
 		psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
-		psaSvc->setupTopicReciever = pubsub_udpmcAdmin_setupTopicReciever;
-		psaSvc->teardownTopicReciever = pubsub_udpmcAdmin_teardownTopicReciever;
+		psaSvc->setupTopicReciever = pubsub_udpmcAdmin_setupTopicReceiver;
+		psaSvc->teardownTopicReciever = pubsub_udpmcAdmin_teardownTopicReceiver;
 		psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint;
 		psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint;
 
@@ -78,7 +78,7 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
 		celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_udpmc");
 		celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_udpmc");
 		celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the UDPMC PSA");
-		celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+		act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
 	}
 
 	return status;

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 650e439..3e06334 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
 
 #include <memory.h>
 #include <sys/socket.h>
@@ -11,8 +29,11 @@
 #include "pubsub_udpmc_admin.h"
 #include "pubsub_psa_udpmc_constants.h"
 #include "pubsub_udpmc_topic_sender.h"
+#include "pubsub_udpmc_topic_receiver.h"
 
 #define PUBSUB_UDPMC_MC_IP_DEFAULT                     "224.100.1.1"
+#define PUBSUB_UDPMC_SOCKET_ADDRESS_KEY                "udpmc.socket_address"
+#define PUBSUB_UDPMC_SOCKET_PORT_KEY                   "udpmc.socker_port"
 
 #define LOG_DEBUG(...) \
     logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
@@ -23,8 +44,46 @@
 #define LOG_ERROR(...) \
     logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
+struct pubsub_udpmc_admin {
+    celix_bundle_context_t *ctx;
+    log_helper_t *log;
+    char *ifIpAddress; // The local interface which is used for multicast communication
+    char *mcIpAddress; // The multicast IP address
+    int sendSocket;
+    double qosSampleScore;
+    double qosControlScore;
+    double defaultScore;
+    bool verbose;
+    const char *fwUUID;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
+    } topicSenders;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
+    } topicReceivers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = endpoint uuid, value = psa_udpmc_connected_endpoint_entry_t
+    } discoveredEndpoints;
+
+};
+
+typedef struct psa_udpmc_connected_endpoint {
+    void *sender; //if connected endpoint is subscriber. todo type
+    void *receiver; //if connected endpoint is publisher. TODO type
+    char *endpointUUID;
+} psa_udpmc_connected_endpoint_t;
+
 
 static celix_status_t udpmc_getIpAddress(const char* interface, char** ip);
+static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+
 
 pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
     pubsub_udpmc_admin_t *psa = calloc(1, sizeof(*psa));
@@ -115,8 +174,8 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
     celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
     psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
-    celixThreadMutex_create(&psa->connectedEndpoints.mutex, NULL);
-    psa->connectedEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
+    psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
     return psa;
 }
@@ -128,14 +187,38 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
 
     //note assuming al psa register services and service tracker are removed.
 
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        pubsub_udpmcTopicSender_destroy(sender);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_updmc_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+        pubsub_udpmcTopicReceiver_destroy(recv);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_properties_t *ep = hashMapIterator_nextValue(&iter);
+        celix_properties_destroy(ep);
+    }
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
     celixThreadMutex_destroy(&psa->topicSenders.mutex);
     hashMap_destroy(psa->topicSenders.map, true, false);
 
     celixThreadMutex_destroy(&psa->topicReceivers.mutex);
     hashMap_destroy(psa->topicReceivers.map, true, false);
 
-    celixThreadMutex_destroy(&psa->connectedEndpoints.mutex);
-    hashMap_destroy(psa->connectedEndpoints.map, true, false);
+    celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
+    hashMap_destroy(psa->discoveredEndpoints.map, true, false);
 
     free(psa->mcIpAddress);
     free(psa->ifIpAddress);
@@ -165,14 +248,13 @@ celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderB
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *outScore) {
+celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
     pubsub_udpmc_admin_t *psa = handle;
     LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
-    double score = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE,
-                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, NULL);
-    if (outScore != NULL) {
-        *outScore = score;
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, NULL);
+    if (outMatch != NULL) {
+        *outMatch = match;
     }
     return status;
 }
@@ -192,11 +274,18 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     pubsub_updmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
     if (sender == NULL) {
-        sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId);
+        sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, psa->sendSocket, psa->mcIpAddress);
         const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
         const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
         newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
                 PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+        celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
+        celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
+        //if available also set container name
+        const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+        if (cn != NULL) {
+            celix_properties_set(newEndpoint, "container_name", cn);
+        }
         bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
         if (!valid) {
             LOG_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid");
@@ -205,7 +294,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
             free(key);
         } else {
             hashMap_put(psa->topicSenders.map, key, sender);
-            //TODO connect endpoints to sender
+            //TODO connect endpoints to sender, NOTE is this needed for a udpmc topic sender?
         }
     } else {
         free(key);
@@ -235,7 +324,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
         char *mapKey = hashMapEntry_getKey(entry);
         pubsub_updmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
         free(mapKey);
-        //TODO disconnect endpoints to sender
+        //TODO disconnect endpoints to sender. note is this needed for a udpmc topic sender?
         pubsub_udpmcTopicSender_destroy(sender);
     } else {
         LOG_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
@@ -246,30 +335,175 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_udpmc_admin_t *psa = handle;
-    LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_setupTopicReciever. scope/topic: %s/%s", scope, topic);
+
+    celix_properties_t *newEndpoint = NULL;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    pubsub_updmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+    if (receiver == NULL) {
+        receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, scope, topic, psa->ifIpAddress, serializerSvcId);
+        const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver);
+        const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver);
+        newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+                                            PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+
+        //if available also set container name
+        const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+        if (cn != NULL) {
+            celix_properties_set(newEndpoint, "container_name", cn);
+        }
+        bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
+        if (!valid) {
+            LOG_ERROR("[PSA UDPMC] Error creating a valid TopicReceiver. Endpoints are not valid");
+            celix_properties_destroy(newEndpoint);
+            pubsub_udpmcTopicReceiver_destroy(receiver);
+            free(key);
+        } else {
+            hashMap_put(psa->topicReceivers.map, key, receiver);
+
+            celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+            hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+                pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
+            celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+        }
+    } else {
+        free(key);
+        LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+
+    if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+        *outSubscriberEndpoint = newEndpoint;
+    }
+
     celix_status_t  status = CELIX_SUCCESS;
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic) {
+celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
     pubsub_udpmc_admin_t *psa = handle;
-    LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_teardownTopicReciever. scope/topic: %s/%s", scope, topic);
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+    free(key);
+    if (entry != NULL) {
+        char *receiverKey = hashMapEntry_getKey(entry);
+        pubsub_updmc_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+        hashMap_remove(psa->topicReceivers.map, receiverKey);
+
+        free(receiverKey);
+        pubsub_udpmcTopicReceiver_destroy(receiver);
+    }
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+
     celix_status_t  status = CELIX_SUCCESS;
     return status;
 }
 
+static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
+    const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *sockAdress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
+    long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L);
+
+    if (type == NULL || sockAdress == NULL || sockPort < 0) {
+        fprintf(stderr, "[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type");
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        if (eScope != NULL && eTopic != NULL && type != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 &&
+            strncmp(eTopic, topic, 1024 * 1024) == 0 &&
+            strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+            pubsub_udpmcTopicReceiver_connectTo(receiver, sockAdress, sockPort);
+        }
+    }
+
+    return status;
+}
+
 celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_udpmc_admin_t *psa = handle;
-    LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_addEndpoint");
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    celix_properties_t *cpy = celix_properties_copy(endpoint);
+    const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
+    hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
     celix_status_t  status = CELIX_SUCCESS;
     return status;
 }
 
+
+static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
+    const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *sockAdress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
+    long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L);
+
+    if (type == NULL || sockAdress == NULL || sockPort < 0) {
+        fprintf(stderr, "[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type");
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        if (eScope != NULL && eTopic != NULL && type != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 &&
+            strncmp(eTopic, topic, 1024 * 1024) == 0 &&
+            strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+            pubsub_udpmcTopicReceiver_disconnectFrom(receiver, sockAdress, sockPort);
+        }
+    }
+
+    return status;
+}
+
 celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_udpmc_admin_t *psa = handle;
-    LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_removeEndpoint");
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        pubsub_udpmcAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+    celix_properties_t *found = hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid);
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    if (found != NULL) {
+        celix_properties_destroy(found);
+    }
+
     celix_status_t  status = CELIX_SUCCESS;
     return status;
 }
@@ -278,7 +512,8 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
     pubsub_udpmc_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
-    fprintf(out, "\nTopic Senders:\n");
+    fprintf(out, "\n");
+    fprintf(out, "Topic Senders:\n");
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -287,16 +522,32 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
         const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
         const char *scope = pubsub_udpmcTopicSender_scope(sender);
         const char *topic = pubsub_udpmcTopicSender_topic(sender);
-        const char *url = pubsub_udpmcTopicSender_socketAddress(sender);
+        const char *sockAddr = pubsub_udpmcTopicSender_socketAddress(sender);
+        fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+        fprintf(out, "   |- psa type        = %s\n", psaType);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- socket address  = %s\n", sockAddr);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+    fprintf(out, "\n");
+    fprintf(out, "\nTopic Receivers:\n");
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver);
+        const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver);
+        const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
+        const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
         fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
         fprintf(out, "   |- psa type        = %s\n", psaType);
         fprintf(out, "   |- serializer type = %s\n", serType);
-        fprintf(out, "   |- url             = %s\n", url);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
     fprintf(out, "\n");
 
-    //TODO topic receivers
+    //TODO topic receivers/senders connection count
 
     return status;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 011d272..11c7e13 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -23,8 +23,9 @@
 #include "celix_api.h"
 #include "log_helper.h"
 
-#define PUBSUB_UDPMC_ADMIN_TYPE                     "udpmc"
-#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY			"pubsub.udpmc.socket_address"
+#define PUBSUB_UDPMC_ADMIN_TYPE                     "udp_mc"
+#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY			"udpmc.socket_address"
+#define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY            "udpmc.socker_port"
 
 #define PUBSUB_UDPMC_IP_KEY 	                    "PSA_IP"
 #define PUBSUB_UDPMC_ITF_KEY	                    "PSA_INTERFACE"
@@ -34,54 +35,20 @@
 #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT    "224.100"
 #define PUBSUB_UDPMC_VERBOSE_DEFAULT                "true"
 
-
-typedef struct pubsub_udpmc_admin {
-    celix_bundle_context_t *ctx;
-    log_helper_t *log;
-    char* ifIpAddress; // The local interface which is used for multicast communication
-    char* mcIpAddress; // The multicast IP address
-    int sendSocket;
-    double qosSampleScore;
-    double qosControlScore;
-    double defaultScore;
-    bool verbose;
-    const char *fwUUID;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
-    } topicSenders;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
-    } topicReceivers;
-
-    struct {
-        celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = endpoint uuid, value = psa_udpmc_connected_endpoint_entry_t
-    } connectedEndpoints;
-
-} pubsub_udpmc_admin_t;
-
-typedef struct psa_udpmc_connected_endpoint {
-    void *sender; //if connected endpoint is subscriber. todo type
-    void *receiver; //if connected endpoint is publisher. TODO type
-    char *endpointUUID;
-} psa_udpmc_connected_endpoint_t;
+typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
 
 pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
 void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
 
 celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
 celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score);
+celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
 
 celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
 celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
-celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic);
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
 celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
 celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
new file mode 100644
index 0000000..a039e6c
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
@@ -0,0 +1,40 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_udpmc_common.h"
+
+int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
+    *msgTypeId = utils_stringHash(msgType);
+    return 0;
+}
+
+bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_msg_header_t *hdr) {
+    bool check=false;
+    int major=0,minor=0;
+
+    if(msgVersion!=NULL){
+        version_getMajor(msgVersion,&major);
+        version_getMinor(msgVersion,&minor);
+        if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+            check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+        }
+    }
+
+    return check;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
new file mode 100644
index 0000000..e49aaa7
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_UDPMC_COMMON_H
+#define CELIX_PUBSUB_UDPMC_COMMON_H
+
+#include <utils.h>
+
+#include "version.h"
+#include "pubsub_common.h"
+
+typedef struct pubsub_udp_msg {
+    struct pubsub_msg_header header;
+    unsigned int payloadSize;
+    char payload[];
+} pubsub_udp_msg_t;
+
+int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId);
+
+bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_msg_header_t *hdr);
+
+
+#endif //CELIX_PUBSUB_UDPMC_COMMON_H

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
new file mode 100644
index 0000000..f3e320a
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -0,0 +1,450 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <sys/epoll.h>
+#include <assert.h>
+#include <pubsub_endpoint.h>
+#include <arpa/inet.h>
+#include "pubsub_udpmc_topic_receiver.h"
+#include "pubsub_psa_udpmc_constants.h"
+#include "large_udp.h"
+#include "pubsub_udpmc_common.h"
+
+#define MAX_EPOLL_EVENTS        10
+#define RECV_THREAD_TIMEOUT     5
+#define UDP_BUFFER_SIZE         65535
+#define MAX_UDP_SESSIONS        16
+
+struct pubsub_updmc_topic_receiver {
+    celix_bundle_context_t *ctx;
+    char *scope;
+    char *topic;
+    char* ifIpAddress;
+    largeUdp_pt largeUdpHandle;
+    int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
+
+    //serialiser svc
+    long serializerTrackerId;
+    struct {
+
+        celix_thread_mutex_t mutex; //protect svc
+        pubsub_serializer_service_t *svc;
+        const celix_properties_t *props;
+    } serializer;
+
+    struct {
+        celix_thread_t thread;
+        celix_thread_mutex_t mutex;
+        bool running;
+    } recvThread;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = socketAddress, value = psa_udpmc_requested_connection_entry_t*
+    } requestedConnections;
+
+    long subscriberTrackerId;
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = bnd id, value = psa_udpmc_subscriber_entry_t
+    } subscribers;
+};
+
+typedef struct psa_udpmc_requested_connection_entry {
+    char *key;
+    char *socketAddress;
+    long socketPort;
+    bool connected;
+    int recvSocket;
+} psa_udpmc_requested_connection_entry_t;
+
+typedef struct psa_udpmc_subscriber_entry {
+    int usageCount;
+    hash_map_t *msgTypes; //map from serializer svc
+    pubsub_subscriber_t *svc;
+} psa_udpmc_subscriber_entry_t;
+
+
+typedef struct pubsub_msg{
+    pubsub_msg_header_t *header;
+    char* payload;
+    unsigned int payloadSize;
+} pubsub_msg_t;
+
+static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg);
+static void* psa_udpmc_recvThread(void * data);
+
+
+pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+                                                                const char *scope,
+                                                                const char *topic,
+                                                                const char *ifIP,
+                                                                long serializerSvcId) {
+    pubsub_updmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+    receiver->ctx = ctx;
+    receiver->scope = strndup(scope, 1024 * 1024);
+    receiver->topic = strndup(topic, 1024 * 1024);
+    receiver->ifIpAddress = strndup(ifIP, 1024 * 1024);
+    receiver->recvThread.running = true;
+    receiver->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
+    receiver->topicEpollFd = epoll_create1(0);
+
+
+    celixThreadMutex_create(&receiver->serializer.mutex, NULL);
+    celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+    celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+    celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
+
+    receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+    receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    //track serializer svc based on the provided serializerSvcId
+    {
+        char filter[64];
+        snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+        opts.filter.filter = filter;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = receiver;
+        opts.setWithProperties = pubsub_udpmcTopicReceiver_setSerializer;
+        receiver->serializerTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+    //track subscribers
+    {
+        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        char buf[size+1];
+        snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+        opts.filter.filter = buf;
+        opts.callbackHandle = receiver;
+        opts.addWithOwner = pubsub_udpmcTopicReceiver_addSubscriber;
+        opts.removeWithOwner = pubsub_udpmcTopicReceiver_removeSubscriber;
+
+        receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+    celixThread_create(&receiver->recvThread.thread, NULL, psa_udpmc_recvThread, receiver);
+
+    return receiver;
+}
+
+void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
+    if (receiver != NULL) {
+        celix_bundleContext_stopTracker(receiver->ctx, receiver->serializerTrackerId);
+        celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
+        celixThreadMutex_lock(&receiver->recvThread.mutex);
+        receiver->recvThread.running = false;
+        celixThreadMutex_unlock(&receiver->recvThread.mutex);
+        celixThread_join(receiver->recvThread.thread, NULL);
+
+        celixThreadMutex_destroy(&receiver->serializer.mutex);
+        celixThreadMutex_destroy(&receiver->subscribers.mutex);
+        celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+        celixThreadMutex_destroy(&receiver->recvThread.mutex);
+
+        largeUdp_destroy(receiver->largeUdpHandle);
+        //TODO cleanup entries, free map
+
+        //TODO clean up requested connections map
+    }
+    free(receiver);
+}
+
+const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver) {
+    return PSA_UDPMC_PUBSUB_ADMIN_TYPE;
+}
+
+const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver) {
+    const char *result = NULL;
+    celixThreadMutex_lock(&receiver->serializer.mutex);
+    if (receiver->serializer.props != NULL) {
+        result = celix_properties_get(receiver->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+    }
+    celixThreadMutex_unlock(&receiver->serializer.mutex);
+    return result;
+}
+
+const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver) {
+    return receiver->scope;
+}
+const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver) {
+    return receiver->topic;
+}
+
+void pubsub_udpmcTopicReceiver_connectTo(
+        pubsub_updmc_topic_receiver_t *receiver,
+        const char *socketAddress,
+        long socketPort) {
+    printf("[PSA UDPMC] TopicReceiver %s/%s connect to socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
+
+    int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort);
+    char *key = calloc((size_t)len+1, sizeof(char));
+    snprintf(key, (size_t)len+1, "%s:%li", socketAddress, socketPort);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_udpmc_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->key = key;
+        entry->socketAddress = strndup(socketAddress, 1024 * 1024);
+        entry->socketPort = socketPort;
+        entry->connected = false;
+        hashMap_put(receiver->requestedConnections.map, (void*)entry->key, entry);
+    } else {
+        free(key);
+    }
+    if (!entry->connected) {
+        int rc  = 0;
+        entry->recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+        if (entry->recvSocket >= 0) {
+            int reuse = 1;
+            rc = setsockopt(entry->recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse));
+        }
+        if (entry->recvSocket >= 0 && rc >= 0) {
+            struct ip_mreq mc_addr;
+            mc_addr.imr_multiaddr.s_addr = inet_addr(entry->socketAddress);
+            mc_addr.imr_interface.s_addr = inet_addr(receiver->ifIpAddress);
+            printf("Adding MC %s at interface %s\n", entry->socketAddress, receiver->ifIpAddress);
+            rc = setsockopt(entry->recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mc_addr, sizeof(mc_addr));
+        }
+        if (entry->recvSocket >= 0 && rc >= 0) {
+            struct sockaddr_in mcListenAddr;
+            mcListenAddr.sin_family = AF_INET;
+            mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+            mcListenAddr.sin_port = htons((uint16_t )entry->socketPort);
+            rc = bind(entry->recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr));
+        }
+        if (entry->recvSocket >= 0 && rc >= 0) {
+            struct epoll_event ev;
+            memset(&ev, 0, sizeof(ev));
+            ev.events = EPOLLIN;
+            ev.data.fd = entry->recvSocket;
+            rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, &ev);
+        }
+
+        if (entry->recvSocket < 0 || rc < 0) {
+            fprintf(stderr, "[PSA UDPMC] Error connecting TopicReceiver %s/%s to %s:%li. (%s)\n", receiver->scope, receiver->topic, socketAddress, socketPort, strerror(errno));
+        } else {
+            entry->connected = true;
+        }
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) {
+    printf("[PSA UDPMC] TopicReceiver %s/%s disconnect from socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
+
+    int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort);
+    char *key = calloc((size_t)len+1, sizeof(char));
+    snprintf(key, (size_t)len+1, "%s:%li", socketAddress, socketPort);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_udpmc_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, key);
+    free(key);
+    if (entry != NULL && entry->connected) {
+        struct epoll_event ev;
+        memset(&ev, 0, sizeof(ev));
+        int rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_DEL, entry->recvSocket, &ev);
+        if (rc < 0) {
+            fprintf(stderr, "[PSA UDPMC] Error disconnecting TopicReceiver %s/%s to %s:%li.\n%s", receiver->scope, receiver->topic, socketAddress, socketPort, strerror(errno));
+        }
+    }
+    if (entry != NULL) {
+        free(entry->key);
+        free(entry->socketAddress);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+
+static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_updmc_topic_receiver_t *receiver = handle;
+    pubsub_serializer_service_t *ser = svc;
+
+    if (ser == NULL) {
+        //TODO -> no serializer -> remove all publishers
+    }
+
+    celixThreadMutex_lock(&receiver->serializer.mutex);
+    receiver->serializer.svc = ser;
+    receiver->serializer.props = props;
+    celixThreadMutex_unlock(&receiver->serializer.mutex);
+}
+
+static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+    pubsub_updmc_topic_receiver_t *receiver = handle;
+
+    long bndId = celix_bundle_getId(bnd);
+    const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+    if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+        //not the same scope. ignore
+        return;
+    }
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->usageCount += 1;
+    } else {
+        //new create entry
+        entry = calloc(1, sizeof(*entry));
+        entry->usageCount = 1;
+        entry->svc = svc;
+
+        celixThreadMutex_lock(&receiver->serializer.mutex);
+        if (receiver->serializer.svc != NULL) {
+            receiver->serializer.svc->createSerializerMap(receiver->serializer.svc->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+        } else {
+            fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+        }
+        celixThreadMutex_unlock(&receiver->serializer.mutex);
+
+        hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+    pubsub_updmc_topic_receiver_t *receiver = handle;
+
+    long bndId = celix_bundle_getId(bnd);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->usageCount -= 1;
+    }
+    if (entry != NULL && entry->usageCount <= 0) {
+        //remove entry
+        hashMap_remove(receiver->subscribers.map, (void*)bndId);
+        celixThreadMutex_lock(&receiver->serializer.mutex);
+        if (receiver->serializer.svc != NULL) {
+            receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle, entry->msgTypes);
+        } else {
+            fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+        }
+        celixThreadMutex_unlock(&receiver->serializer.mutex);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void* psa_udpmc_recvThread(void * data) {
+    pubsub_updmc_topic_receiver_t *receiver = data;
+
+    struct epoll_event events[MAX_EPOLL_EVENTS];
+
+    celixThreadMutex_lock(&receiver->recvThread.mutex);
+    bool running = receiver->recvThread.running;
+    celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+    while (running) {
+        int nfds = epoll_wait(receiver->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
+        int i;
+        for(i = 0; i < nfds; i++ ) {
+            unsigned int index;
+            unsigned int size;
+            if(largeUdp_dataAvailable(receiver->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
+                // Handle data
+                pubsub_udp_msg_t *udpMsg = NULL;
+                if(largeUdp_read(receiver->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
+                    printf("[PSA_UDPMC]: ERROR largeUdp_read with index %d\n", index);
+                    continue;
+                }
+
+                psa_udpmc_processMsg(receiver, udpMsg);
+
+                free(udpMsg);
+            }
+        }
+        celixThreadMutex_lock(&receiver->recvThread.mutex);
+        running = receiver->recvThread.running;
+        celixThreadMutex_unlock(&receiver->recvThread.mutex);
+    }
+
+    return NULL;
+}
+
+static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg) {
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+
+        pubsub_msg_serializer_t *msgSer = NULL;
+        if (entry->msgTypes != NULL) {
+            msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) msg->header.type);
+        }
+        if (msgSer == NULL) {
+            printf("[PSA_UDPMC] Serializer not available for message %d.\n",msg->header.type);
+        } else{
+            void *msgInst = NULL;
+            bool validVersion = psa_udpmc_checkVersion(msgSer->msgVersion, &msg->header);
+
+            if(validVersion){
+
+                celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst);
+
+                if (status == CELIX_SUCCESS) {
+                    bool release = true;
+                    pubsub_multipart_callbacks_t mp_callbacks;
+                    mp_callbacks.handle = receiver;
+                    mp_callbacks.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
+                    mp_callbacks.getMultipart = NULL;
+
+                    pubsub_subscriber_t *svc = entry->svc;
+                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+
+                    if(release){
+                        msgSer->freeMsg(msgSer,msgInst);
+                    }
+                }
+                else{
+                    printf("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
+                }
+
+            }
+            else{
+                int major=0,minor=0;
+                version_getMajor(msgSer->msgVersion,&major);
+                version_getMinor(msgSer->msgVersion,&minor);
+                printf("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+                       msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
+            }
+
+        }
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
new file mode 100644
index 0000000..ee2c113
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
@@ -0,0 +1,45 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H
+#define CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H
+
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_updmc_topic_receiver pubsub_updmc_topic_receiver_t;
+
+pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx, 
+        const char *scope, 
+        const char *topic, 
+        const char *ifIP, 
+        long serializerSvcId);
+void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver);
+
+const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_socketAddress(pubsub_updmc_topic_receiver_t *receiver);
+
+void pubsub_udpmcTopicReceiver_connectTo(
+        pubsub_updmc_topic_receiver_t *receiver,
+        const char *socketAddress,
+        long socketPort);
+void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort);
+
+#endif //CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 5553d3b..4a6d027 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -1,17 +1,52 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
 #include <pubsub_serializer.h>
 #include <stdlib.h>
 #include <memory.h>
 #include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <pubsub_common.h>
+#include <zconf.h>
+#include <arpa/inet.h>
 #include "pubsub_udpmc_topic_sender.h"
 #include "pubsub_psa_udpmc_constants.h"
+#include "large_udp.h"
+#include "pubsub_udpmc_common.h"
+
+#define FIRST_SEND_DELAY_IN_SECONDS             2
+
+//TODO make configurable
+#define UDP_BASE_PORT	                        49152
+#define UDP_MAX_PORT	                        65000
 
-static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
 
 struct pubsub_updmc_topic_sender {
     celix_bundle_context_t *ctx;
     char *scope;
     char *topic;
     char *socketAddress;
+    long socketPort;
+
+    int sendSocket;
+    struct sockaddr_in destAddr;
 
     long serTrackerId;
     struct {
@@ -19,26 +54,95 @@ struct pubsub_updmc_topic_sender {
         pubsub_serializer_service_t *svc;
         const celix_properties_t *props;
     } serializer;
+
+    struct {
+        long svcId;
+        celix_service_factory_t factory;
+    } publisher;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map;  //key = bndId, value = psa_udpmc_bounded_service_entry_t
+    } boundedServices;
 };
 
-pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, long serializerSvcId) {
+typedef struct psa_udpmc_bounded_service_entry {
+    pubsub_updmc_topic_sender_t *parent;
+    pubsub_publisher_t service;
+    long bndId;
+    hash_map_t *msgTypes;
+    int getCount;
+    largeUdp_pt largeUdpHandle;
+} psa_udpmc_bounded_service_entry_t;
+
+typedef struct pubsub_msg{
+    pubsub_msg_header_t *header;
+    char* payload;
+    unsigned int payloadSize;
+} pubsub_msg_t;
+
+static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
+static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback);
+static unsigned int rand_range(unsigned int min, unsigned int max);
+
+pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
+        celix_bundle_context_t *ctx,
+        const char *scope,
+        const char *topic,
+        long serializerSvcId,
+        int sendSocket,
+        const char *bindIP) {
     pubsub_updmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->scope = strndup(scope, 1024 * 1024);
     sender->topic = strndup(topic, 1024 * 1024);
 
     celixThreadMutex_create(&sender->serializer.mutex, NULL);
+    celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+    sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+    //setting up socket for UDPMC TopicSender
+    {
+        unsigned int port = rand_range(UDP_BASE_PORT, UDP_MAX_PORT);
+        sender->sendSocket = sendSocket;
+        sender->destAddr.sin_family = AF_INET;
+        sender->destAddr.sin_addr.s_addr = inet_addr(bindIP);
+        sender->destAddr.sin_port = htons((uint16_t)port);
+
+        sender->socketAddress = strndup(bindIP, 1024);
+        sender->socketPort = port;
+    }
+
+    //track serializer svc based on the provided serializerSvcId
+    {
+        char filter[64];
+        snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+        opts.filter.filter = filter;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = sender;
+        opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer;
+        sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+    //register publisher services using a service factory
+    {
+        sender->publisher.factory.handle = sender;
+        sender->publisher.factory.getService = psa_udpmc_getPublisherService;
+        sender->publisher.factory.ungetService = psa_udpmc_ungetPublisherService;
 
-    char filter[64];
-    snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
 
-    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-    opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
-    opts.filter.filter = filter;
-    opts.filter.ignoreServiceLanguage = true;
-    opts.callbackHandle = sender;
-    opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer;
-    sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+        sender->publisher.svcId = celix_bundleContext_registerServiceFactory(ctx, &sender->publisher.factory, PUBSUB_PUBLISHER_SERVICE_NAME, props);
+    }
 
     return sender;
 }
@@ -46,11 +150,17 @@ pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context
 void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) {
     if (sender != NULL) {
         celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
+        celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
 
         celixThreadMutex_destroy(&sender->serializer.mutex);
+        celixThreadMutex_destroy(&sender->boundedServices.mutex);
+
+        //TODO loop and cleanup?
+        hashMap_destroy(sender->boundedServices.map, false, true);
 
         free(sender->scope);
         free(sender->topic);
+        free(sender->socketAddress);
         free(sender);
     }
 }
@@ -81,8 +191,12 @@ const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *s
     return sender->socketAddress;
 }
 
+long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender) {
+    return sender->socketPort;
+}
+
 void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
-    //TODO
+    //TODO subscriber count -> topic info
 }
 
 void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
@@ -92,8 +206,173 @@ void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender,
 static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_updmc_topic_sender_t *sender = handle;
     pubsub_serializer_service_t *ser = svc;
+
+    if (ser == NULL) {
+        //TODO -> no serializer -> remove all publishers
+    }
+
     celixThreadMutex_lock(&sender->serializer.mutex);
     sender->serializer.svc = ser;
     sender->serializer.props = props;
     celixThreadMutex_unlock(&sender->serializer.mutex);
-}
\ No newline at end of file
+}
+
+static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+    pubsub_updmc_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_udpmc_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->getCount += 1;
+    } else {
+        entry = calloc(1, sizeof(*entry));
+        entry->getCount = 1;
+        entry->parent = sender;
+        entry->bndId = bndId;
+        entry->largeUdpHandle = largeUdp_create(1);
+
+        celixThreadMutex_lock(&sender->serializer.mutex);
+        celix_status_t rc = CELIX_SUCCESS;
+        if (sender->serializer.svc != NULL) {
+            rc = sender->serializer.svc->createSerializerMap(sender->serializer.svc->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+        }
+        if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+            //TODO destroy and return NULL?
+            fprintf(stderr, "Error creating publisher service, serializer not available / cannot get msg serializer map\n");
+        }
+        celixThreadMutex_unlock(&sender->serializer.mutex);
+
+
+        entry->service.handle = entry;
+        entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
+        entry->service.send = psa_udpmc_topicPublicationSend;
+        entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
+
+        hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+    return &entry->service;
+}
+
+static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+    pubsub_updmc_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_udpmc_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->getCount -= 1;
+    }
+    if (entry != NULL && entry->getCount == 0) {
+        //free entry
+        hashMap_remove(sender->boundedServices.map, (void*)bndId);
+
+
+        celixThreadMutex_lock(&sender->serializer.mutex);
+        celix_status_t rc = CELIX_SUCCESS;
+        if (sender->serializer.svc != NULL) {
+            rc = sender->serializer.svc->destroySerializerMap(sender->serializer.svc->handle, entry->msgTypes);
+        }
+        if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+            fprintf(stderr, "Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
+        }
+        celixThreadMutex_unlock(&sender->serializer.mutex);
+
+
+        largeUdp_destroy(entry->largeUdpHandle);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+}
+
+static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+    psa_udpmc_bounded_service_entry_t *entry = handle;
+    int status = 0;
+
+    pubsub_msg_serializer_t* msgSer = NULL;
+    if (entry->msgTypes != NULL) {
+        msgSer = hashMap_get(entry->msgTypes, (void*)(intptr_t)(msgTypeId));
+    }
+
+    if (msgSer != NULL) {
+        int major=0, minor=0;
+
+        pubsub_msg_header_t *msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
+        strncpy(msg_hdr->topic,entry->parent->topic,MAX_TOPIC_LEN-1);
+        msg_hdr->type = msgTypeId;
+
+
+        if (msgSer->msgVersion != NULL){
+            version_getMajor(msgSer->msgVersion, &major);
+            version_getMinor(msgSer->msgVersion, &minor);
+            msg_hdr->major = major;
+            msg_hdr->minor = minor;
+        }
+
+        void* serializedOutput = NULL;
+        size_t serializedOutputLen = 0;
+        msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+
+        pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
+        msg->header = msg_hdr;
+        msg->payload = (char*)serializedOutput;
+        msg->payloadSize = serializedOutputLen;
+
+
+        if(psa_udpmc_sendMsg(entry, msg,true, NULL) == false) {
+            status = -1;
+        }
+        free(msg_hdr);
+        free(msg);
+        free(serializedOutput);
+
+
+    } else {
+        printf("[PSA_UDPMC/TopicSender] No msg serializer available for msg type id %d\n", msgTypeId);
+        status=-1;
+    }
+    return status;
+}
+
+static void delay_first_send_for_late_joiners(){
+
+    static bool firstSend = true;
+
+    if(firstSend){
+        printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        firstSend = false;
+    }
+}
+
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback) {
+    const int iovec_len = 3; // header + size + payload
+    bool ret = true;
+
+    struct iovec msg_iovec[iovec_len];
+    msg_iovec[0].iov_base = msg->header;
+    msg_iovec[0].iov_len = sizeof(*msg->header);
+    msg_iovec[1].iov_base = &msg->payloadSize;
+    msg_iovec[1].iov_len = sizeof(msg->payloadSize);
+    msg_iovec[2].iov_base = msg->payload;
+    msg_iovec[2].iov_len = msg->payloadSize;
+
+    delay_first_send_for_late_joiners();
+
+    if(largeUdp_sendmsg(entry->largeUdpHandle, entry->parent->sendSocket, msg_iovec, iovec_len, 0, &entry->parent->destAddr, sizeof(entry->parent->destAddr)) == -1) {
+        perror("send_pubsub_msg:sendSocket");
+        ret = false;
+    }
+
+    if(releaseCallback) {
+        releaseCallback->release(msg->payload, entry);
+    }
+    return ret;
+}
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+    double scaled = ((double)random())/((double)RAND_MAX);
+    return (unsigned int)((max-min+1)*scaled + min);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
index 38a9127..89e56ec 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
@@ -23,7 +23,13 @@
 
 typedef struct pubsub_updmc_topic_sender pubsub_updmc_topic_sender_t;
 
-pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, /*TODO rest args*/ const char *scope, const char *topic, long serializerSvcId);
+pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
+        celix_bundle_context_t *ctx,
+        const char *scope,
+        const char *topic,
+        long serializerSvcId,
+        int sendSocket,
+        const char *bindIP);
 void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender);
 
 const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender);
@@ -31,6 +37,8 @@ const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *
 const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender);
 const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender);
 const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender);
+long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender);
+
 //TODO connections etc
 
 void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint);

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index eea8460..2fc80ee 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@ -76,7 +76,7 @@ typedef struct publish_bundle_bound_service {
 
 
 typedef struct pubsub_msg{
-	pubsub_msg_header_pt header;
+	pubsub_msg_header_t *header;
 	char* payload;
 	unsigned int payloadSize;
 } pubsub_msg_t;

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
index 46a1688..387935e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
@@ -110,7 +110,7 @@ typedef struct msg_map_entry{
 static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service);
 static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service);
 static void* zmq_recv_thread_func(void* arg);
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
+static bool checkVersion(version_pt msgVersion,receiver hdr);
 static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part);

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index 8b2a2d4..81e467d 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -29,6 +29,7 @@
 #include "pubsub_common.h"
 #include "pubsub_listeners.h"
 #include "pubsub_discovery_impl.h"
+#include "../../../shell/shell/include/command.h"
 
 typedef struct psd_activator {
 	pubsub_discovery_t *pubsub_discovery;
@@ -38,6 +39,9 @@ typedef struct psd_activator {
 
 	pubsub_announce_endpoint_listener_t listenerSvc;
 	long listenerSvcId;
+
+	command_service_t cmdSvc;
+	long cmdSvcId;
 } psd_activator_t;
 
 static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ctx) {
@@ -48,7 +52,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
 	status = pubsub_discovery_start(act->pubsub_discovery);
 
 	celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-	opts.filter.serviceName = PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE;
+	opts.filter.serviceName = PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE;
 	opts.callbackHandle = act->pubsub_discovery;
 	opts.addWithOwner = pubsub_discovery_discoveredEndpointsListenerAdded;
 	opts.removeWithOwner = pubsub_discovery_discoveredEndpointsListenerRemoved;
@@ -58,6 +62,18 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
 	act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint;
 	act->listenerSvc.removeEndpoint = pubsub_discovery_removeEndpoint;
 
+	//register shell command service
+	//register shell command
+	if (status == CELIX_SUCCESS) {
+		act->cmdSvc.handle = act->pubsub_discovery;
+		act->cmdSvc.executeCommand = pubsub_discovery_executeCommand;
+		celix_properties_t *props = celix_properties_create();
+		properties_set(props, OSGI_SHELL_COMMAND_NAME, "psd_etcd");
+		properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psd_etcd"); //TODO add search topic/scope option
+		properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Overview of discovered/announced endpoints from/to ETCD");
+		act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+	}
+
 	if (status == CELIX_SUCCESS) {
 		act->listenerSvcId = celix_bundleContext_registerService(ctx, &act->listenerSvc, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, NULL);
 	} else {
@@ -70,6 +86,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
 static celix_status_t psd_stop(psd_activator_t *act, celix_bundle_context_t *ctx) {
 	celix_bundleContext_stopTracker(ctx, act->publishAnnounceSvcTrackerId);
 	celix_bundleContext_unregisterService(ctx, act->listenerSvcId);
+	celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
 
 	celix_status_t status = pubsub_discovery_stop(act->pubsub_discovery);
 	pubsub_discovery_destroy(act->pubsub_discovery);

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 98d6be6..237f2f2 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -63,8 +63,11 @@ celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discove
     celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsListenersMutex, NULL);
     celixThreadMutex_create(&(*ps_discovery)->announcedEndpointsMutex, NULL);
     celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsMutex, NULL);
-    celixThreadMutex_create(&(*ps_discovery)->waitMutex, NULL);
-    celixThreadCondition_init(&(*ps_discovery)->waitCond, NULL);
+    pthread_mutex_init(&(*ps_discovery)->waitMutex, NULL);
+    pthread_condattr_t attr;
+    pthread_condattr_init(&attr);
+    pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
+    pthread_cond_init(&(*ps_discovery)->waitCond, &attr);
     celixThreadMutex_create(&(*ps_discovery)->runningMutex, NULL);
     (*ps_discovery)->running = true;
 
@@ -102,8 +105,8 @@ celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) {
     celixThreadMutex_unlock(&ps_discovery->announcedEndpointsMutex);
     celixThreadMutex_destroy(&ps_discovery->announcedEndpointsMutex);
 
-    celixThreadMutex_destroy(&ps_discovery->waitMutex);
-    celixThreadCondition_destroy(&ps_discovery->waitCond);
+    pthread_mutex_destroy(&ps_discovery->waitMutex);
+    pthread_cond_destroy(&ps_discovery->waitCond);
 
     celixThreadMutex_destroy(&ps_discovery->runningMutex);
 
@@ -246,7 +249,7 @@ void* psd_refresh(void *data) {
 
     while (running) {
         struct timespec start;
-        clock_gettime(CLOCK_REALTIME, &start);
+        clock_gettime(CLOCK_MONOTONIC, &start);
 
         celixThreadMutex_lock(&disc->announcedEndpointsMutex);
         hash_map_iterator_t iter = hashMapIterator_construct(disc->announcedEndpoints);
@@ -273,9 +276,9 @@ void* psd_refresh(void *data) {
 
         struct timespec waitTill = start;
         waitTill.tv_sec += disc->sleepInsecBetweenTTLRefresh;
-        celixThreadMutex_lock(&disc->waitMutex);
-        pthread_cond_timedwait(&disc->waitCond, &disc->waitMutex, &waitTill); //TODO add timedwait abs for celixThread
-        celixThreadMutex_unlock(&disc->waitMutex);
+        pthread_mutex_lock(&disc->waitMutex);
+        pthread_cond_timedwait(&disc->waitCond, &disc->waitMutex, &waitTill); //TODO add timedwait abs for celixThread (including MONOTONIC ..)
+        pthread_mutex_unlock(&disc->waitMutex);
 
         celixThreadMutex_lock(&disc->runningMutex);
         running = disc->running;
@@ -438,9 +441,9 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel
     assert(fwUUID != NULL);
 
     if (fwUUID != NULL && strncmp(disc->fwUUID, fwUUID, strlen(disc->fwUUID)) == 0) {
-        if (disc->verbose) {
-            printf("[PSD] Ignoring endpoint %s from own framework\n", uuid);
-        }
+//        if (disc->verbose) {
+//            printf("[PSD] Ignoring endpoint %s from own framework\n", uuid);
+//        }
         return;
     }
 
@@ -553,3 +556,10 @@ static char* pubsub_discovery_createJsonEndpoint(const celix_properties_t *props
     json_decref(jsEndpoint);
     return str;
 }
+
+celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
+    //pubsub_discovery_t *psd = handle;
+    //TODO
+    fprintf(os, "TODO\n");
+    return CELIX_SUCCESS;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index f0a5b22..a1af837 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -56,8 +56,9 @@ typedef struct pubsub_discovery {
 	celix_thread_mutex_t discoveredEndpointsListenersMutex;
 	hash_map_pt discoveredEndpointsListeners; //key=svcId, value=pubsub_discovered_endpoint_listener_t
 
-	celix_thread_mutex_t waitMutex;
-	celix_thread_cond_t  waitCond;
+	//NOTE using pthread instead of celix mutex/cond so that condwait with abs time using a MONOTONIC clock can be used
+	pthread_mutex_t waitMutex;
+	pthread_cond_t  waitCond;
 
 	celix_thread_mutex_t runningMutex;
     bool running;
@@ -91,4 +92,7 @@ void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc
 celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint);
 celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint);
 
+celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine, FILE *os, FILE *errorStream);
+
+
 #endif /* PUBSUB_DISCOVERY_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index 8c15fcf..9157861 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -46,7 +46,7 @@ struct pubsub_admin_service {
 
 	celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
 	celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-	celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, double *score);
+	celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
 
 	//note endpoint is owned by caller
 	celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_common.h b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
index e551031..3231400 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_common.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
@@ -40,7 +40,7 @@ struct pubsub_msg_header{
 	unsigned char minor;
 };
 
-typedef struct pubsub_msg_header* pubsub_msg_header_pt;
+typedef struct pubsub_msg_header pubsub_msg_header_t;
 
 
 #endif /* PUBSUB_COMMON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 14f9bb0..66cc44a 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -60,13 +60,10 @@ double pubsub_utils_matchSubscriber(
         double defaultScore,
         long *outSerializerSvcId);
 
-double pubsub_utils_matchEndpoint(
+bool pubsub_utils_matchEndpoint(
         celix_bundle_context_t *ctx,
         const celix_properties_t *endpoint,
         const char *adminType,
-        double sampleScore,
-        double controlScore,
-        double defaultScore,
         long *outSerializerSvcId);
 
 

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 0b3c742..e10169f 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -85,7 +85,14 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID,
 	}
 }
 
-celix_properties_t* pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, const char* pubsubType, const char* adminType, const char *serType, celix_properties_t *topic_props) {
+celix_properties_t* pubsubEndpoint_create(
+        const char* fwUUID,
+        const char* scope,
+        const char* topic,
+        const char* pubsubType,
+        const char* adminType,
+        const char *serType,
+        celix_properties_t *topic_props) {
 	celix_properties_t *ep = properties_create();
 	pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, adminType, serType, topic_props);
 	if (!pubsubEndpoint_isValid(ep, true, true)) {

http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index 60f7883..c21d597 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -183,7 +183,7 @@ double pubsub_utils_matchSubscriber(
 		score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
 	}
 
-	printf("Score subscriber service match for psa type %s is %f", adminType, score);
+	printf("Score subscriber service match for psa type %s is %f\n", adminType, score);
 
 	if (outSerializerSvcId != NULL) {
 		*outSerializerSvcId = serializerSvcId;
@@ -196,35 +196,32 @@ double pubsub_utils_matchSubscriber(
 	return score;
 }
 
-double pubsub_utils_matchEndpoint(
+bool pubsub_utils_matchEndpoint(
 		celix_bundle_context_t *ctx,
 		const celix_properties_t *ep,
 		const char *adminType,
-		double sampleScore,
-		double controlScore,
-		double defaultScore,
 		long *outSerializerSvcId) {
 
-	const char *requested_admin 		= NULL;
-	const char *requested_qos			= NULL;
-	if (ep != NULL) {
-		requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
-		requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
+	bool psaMatch = false;
+	const char *configured_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+	if (configured_admin != NULL) {
+		psaMatch = strncmp(configured_admin, adminType, strlen(adminType)) == 0;
 	}
 
-	double score = getPSAScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
-
-	const char *requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
-	long serializerSvcId = getPSASerializer(ctx, requested_serializer);
-	if (serializerSvcId < 0) {
-		score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
+	bool serMatch = false;
+	long serializerSvcId = -1L;
+	if (psaMatch) {
+		const char *configured_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+		serializerSvcId = getPSASerializer(ctx, configured_serializer);
+		serMatch = serializerSvcId >= 0;
 	}
 
-	printf("Score endpoint match for psa type %s is %f", adminType, score);
+	bool match = psaMatch && serMatch;
+	printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
 
 	if (outSerializerSvcId != NULL) {
 		*outSerializerSvcId = serializerSvcId;
 	}
 
-	return score;
+	return match;
 }
\ No newline at end of file