You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/11/12 11:15:31 UTC

celix git commit: CELIX-454: Adds pubsub_updmc_tests

Repository: celix
Updated Branches:
  refs/heads/feature/CELIX-454-pubsub-disc ac47373d2 -> 1df73a01f


CELIX-454: Adds pubsub_updmc_tests


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/1df73a01
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/1df73a01
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/1df73a01

Branch: refs/heads/feature/CELIX-454-pubsub-disc
Commit: 1df73a01f8d5ad5ac06e5cd991d0147782fb1e97
Parents: ac47373
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Mon Nov 12 12:15:07 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Mon Nov 12 12:15:07 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_psa_udpmc_constants.h            |  29 ++-
 .../src/pubsub_udpmc_admin.c                    |  50 ++--
 .../src/pubsub_udpmc_admin.h                    |  12 +-
 .../src/pubsub_udpmc_topic_receiver.c           | 231 +++++++++++++++----
 .../src/pubsub_udpmc_topic_receiver.h           |   8 +-
 .../src/pubsub_udpmc_topic_sender.c             |  24 +-
 .../src/pubsub_udpmc_topic_sender.h             |   5 +-
 .../src/pubsub_psa_zmq_constants.h              |  42 +++-
 .../pubsub_admin_zmq/src/pubsub_zmq_admin.c     |  10 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_admin.h     |  35 +--
 .../src/pubsub_discovery_impl.c                 |  12 +-
 bundles/pubsub/test/CMakeLists.txt              |  20 +-
 bundles/pubsub/test/meta_data/ping.properties   |   2 +
 13 files changed, 355 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
index 2a02da8..95779b7 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
@@ -31,9 +31,34 @@
 #define PSA_UDPMC_QOS_CONTROL_SCORE_KEY 		"PSA_UDPMC_QOS_CONTROL_SCORE"
 #define PSA_UDPMC_DEFAULT_SCORE_KEY 			"PSA_UDPMC_DEFAULT_SCORE"
 
-#define PSA_UDPMC_DEFAULT_VERBOSE 				false
 
-#define PSA_UDPMC_VERBOSE_KEY    			    "PSA_UDPMC_VERBOSE"
+#define PUBSUB_UDPMC_ADMIN_TYPE                     "udp_mc"
+#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY			"udpmc.socket_address"
+#define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY            "udpmc.socker_port"
+
+#define PUBSUB_UDPMC_IP_KEY 	                    "PSA_IP"
+#define PUBSUB_UDPMC_ITF_KEY	                    "PSA_INTERFACE"
+#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY        "PSA_MC_PREFIX"
+#define PUBSUB_UDPMC_VERBOSE_KEY                    "PSA_UDPMC_VERBOSE"
+
+#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT    "224.100"
+#define PUBSUB_UDPMC_VERBOSE_DEFAULT                true
+
+/**
+ * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_UDPMC_STATIC_CONFIGURED                  "updmc.static.configured"
+
+/**
+ * Can be set in the topic properties to fix a static mc port for topic senders
+ */
+#define PUBSUB_UDPMC_STATIC_BIND_PORT                   "udpmc.static.bind.port"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_UDPMC_STATIC_CONNECT_SOCKET_ADDRESSES    "udpmc.static.connect.socket_addresses"
 
 
 #endif /* PUBSUB_PSA_UDPMC_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 73fab20..d7add8b 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -109,9 +109,7 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
     }
 
 
-    const char *mc_prefix = celix_bundleContext_getProperty(ctx,
-            PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY,
-            PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT);
+    const char *mc_prefix = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY, PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT);
     const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_ITF_KEY, NULL);
     if (udpmc_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
         L_WARN("[PSA_UDPMC] Could not retrieve IP address for interface %s", interface);
@@ -234,7 +232,7 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
     hashMap_destroy(psa->topicReceivers.map, true, false);
 
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
-    hashMap_destroy(psa->discoveredEndpoints.map, true, false);
+    hashMap_destroy(psa->discoveredEndpoints.map, false, false);
 
     celixThreadMutex_destroy(&psa->serializers.mutex);
     hashMap_destroy(psa->serializers.map, false, false);
@@ -296,13 +294,12 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
     if (sender == NULL) {
         psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
         if (serEntry != NULL) {
-            sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, serEntry->svc, psa->sendSocket, psa->mcIpAddress);
+            sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, serEntry->svc, psa->sendSocket, psa->mcIpAddress, topicProps);
         }
         if (sender != NULL) {
             const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
             const char *serType = serEntry->serType;
-            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                    PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
             celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
             //if available also set container name
@@ -310,6 +307,8 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
             if (cn != NULL) {
                 celix_properties_set(newEndpoint, "container_name", cn);
             }
+            const char *configuredPort = celix_properties_get(topicProps, PUBSUB_UDPMC_STATIC_BIND_PORT, NULL);
+            celix_properties_setBool(newEndpoint, PUBSUB_UDPMC_STATIC_CONFIGURED, configuredPort != NULL);
             hashMap_put(psa->topicSenders.map, key, sender);
         } else {
             free(key);
@@ -370,7 +369,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
     if (receiver == NULL) {
         psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
         if (serEntry != NULL) {
-            receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, scope, topic, psa->ifIpAddress, serializerSvcId, serEntry->svc);
+            receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, psa->log, scope, topic, psa->ifIpAddress, topicProps, serializerSvcId, serEntry->svc);
         }
         if (receiver != NULL) {
             const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
@@ -443,18 +442,23 @@ static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_a
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
     const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
     const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-    const char *sockAdress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
+    const char *sockAddress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
     long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L);
 
-    if (type == NULL || sockAdress == NULL || sockPort < 0) {
-        fprintf(stderr, "[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type");
+    bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+
+    if (publisher && (sockAddress == NULL || sockPort < 0)) {
+        L_WARN("[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type. Properties:");
+        const char *key = NULL;
+        CELIX_PROPERTIES_FOR_EACH(endpoint, key) {
+            L_WARN("[PSA UPDMC] |- %s=%s\n", key, celix_properties_get(endpoint, key, NULL));
+        }
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eScope != NULL && eTopic != NULL && type != NULL &&
+        if (eScope != NULL && eTopic != NULL && publisher &&
             strncmp(eScope, scope, 1024 * 1024) == 0 &&
-            strncmp(eTopic, topic, 1024 * 1024) == 0 &&
-            strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-            pubsub_udpmcTopicReceiver_connectTo(receiver, sockAdress, sockPort);
+            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+            pubsub_udpmcTopicReceiver_connectTo(receiver, sockAddress, sockPort);
         }
     }
 
@@ -550,9 +554,11 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
         const char *scope = pubsub_udpmcTopicSender_scope(sender);
         const char *topic = pubsub_udpmcTopicSender_topic(sender);
         const char *sockAddr = pubsub_udpmcTopicSender_socketAddress(sender);
+        long sockPort = pubsub_udpmcTopicSender_socketPort(sender);
+        const char *postAddr = pubsub_udpmcTopicSender_isStatic(sender) ? " (static port)" : "";
         fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
         fprintf(out, "   |- serializer type = %s\n", serType);
-        fprintf(out, "   |- socket address  = %s\n", sockAddr);
+        fprintf(out, "   |- socket address  = %s:%li%s\n", sockAddr, sockPort, postAddr);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
     celixThreadMutex_unlock(&psa->serializers.mutex);
@@ -569,15 +575,23 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
         const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
         const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
         const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
+        celix_array_list_t *connections = celix_arrayList_create();
+        pubsub_udpmcTopicReceiver_listConnections(receiver, connections);
+
         fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
         fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- connections (%i):\n", celix_arrayList_size(connections));
+        for (int i = 0 ; i < celix_arrayList_size(connections); ++i) {
+            char *conn = celix_arrayList_get(connections, i);
+            fprintf(out, "      |- address      = %s\n", conn);
+            free(conn);
+        }
+        celix_arrayList_destroy(connections);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     celixThreadMutex_unlock(&psa->serializers.mutex);
     fprintf(out, "\n");
 
-    //TODO topic receivers/senders connection count
-
     return status;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 17b8957..469fa9c 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -22,18 +22,8 @@
 
 #include "celix_api.h"
 #include "log_helper.h"
+#include "pubsub_psa_udpmc_constants.h"
 
-#define PUBSUB_UDPMC_ADMIN_TYPE                     "udp_mc"
-#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY			"udpmc.socket_address"
-#define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY            "udpmc.socker_port"
-
-#define PUBSUB_UDPMC_IP_KEY 	                    "PSA_IP"
-#define PUBSUB_UDPMC_ITF_KEY	                    "PSA_INTERFACE"
-#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY        "PSA_MC_PREFIX"
-#define PUBSUB_UDPMC_VERBOSE_KEY                    "PSA_UDPMC_VERBOSE"
-
-#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT    "224.100"
-#define PUBSUB_UDPMC_VERBOSE_DEFAULT                true
 
 typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index e1e5a42..5e2ed57 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -26,6 +26,7 @@
 #include <assert.h>
 #include <pubsub_endpoint.h>
 #include <arpa/inet.h>
+#include <log_helper.h>
 #include "pubsub_udpmc_topic_receiver.h"
 #include "pubsub_psa_udpmc_constants.h"
 #include "large_udp.h"
@@ -36,8 +37,18 @@
 #define UDP_BUFFER_SIZE         65535
 #define MAX_UDP_SESSIONS        16
 
+#define L_DEBUG(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
 struct pubsub_udpmc_topic_receiver {
     celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
     long serializerSvcId;
     pubsub_serializer_service_t *serializer;
     char *scope;
@@ -55,12 +66,14 @@ struct pubsub_udpmc_topic_receiver {
     struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = socketAddress, value = psa_udpmc_requested_connection_entry_t*
+        bool allConnected; //true if all requestedConnectection are connected
     } requestedConnections;
 
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = bnd id, value = psa_udpmc_subscriber_entry_t
+        bool allInitialized;
     } subscribers;
 };
 
@@ -68,14 +81,18 @@ typedef struct psa_udpmc_requested_connection_entry {
     char *key;
     char *socketAddress;
     long socketPort;
-    bool connected;
     int recvSocket;
+
+    bool connected;
+    bool statically; //true if the connection is statically configured through the topic properties.
 } psa_udpmc_requested_connection_entry_t;
 
 typedef struct psa_udpmc_subscriber_entry {
     int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
     pubsub_subscriber_t *svc;
+
+    bool initialized; //true if the init function is called through the receive thread
 } psa_udpmc_subscriber_entry_t;
 
 
@@ -89,16 +106,20 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
 static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg);
 static void* psa_udpmc_recvThread(void * data);
-
+static void psa_udpmc_connectToAllRequestedConnections(pubsub_udpmc_topic_receiver_t *receiver);
+static void psa_udpmc_initializeAllSubscribers(pubsub_udpmc_topic_receiver_t *receiver);
 
 pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+                                                                log_helper_t *logHelper,
                                                                 const char *scope,
                                                                 const char *topic,
                                                                 const char *ifIP,
+                                                                const celix_properties_t *topicProperties,
                                                                 long serializerSvcId,
                                                                 pubsub_serializer_service_t *serializer) {
     pubsub_udpmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
+    receiver->logHelper = logHelper;
     receiver->serializerSvcId = serializerSvcId;
     receiver->serializer = serializer;
     receiver->scope = strndup(scope, 1024 * 1024);
@@ -114,7 +135,9 @@ pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
     celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
 
     receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+    receiver->subscribers.allInitialized = false;
     receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    receiver->requestedConnections.allConnected = false;
 
     //track subscribers
     {
@@ -132,6 +155,44 @@ pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
         receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
     }
 
+    const char *staticConnects = celix_properties_get(topicProperties, PUBSUB_UDPMC_STATIC_CONNECT_SOCKET_ADDRESSES, NULL);
+    if (staticConnects != NULL) {
+        char *copy = strndup(staticConnects, 1024*1024);
+        char* addr;
+        char* save = copy;
+
+        while ((addr = strtok_r(save, " ", &save))) {
+            char *colon = strchr(addr, ':');
+            if (colon == NULL) {
+                continue;
+            }
+
+            char *sockAddr = NULL;
+            asprintf(&sockAddr, "%.*s", (int)(colon - addr), addr);
+
+            long sockPort = atol((colon + 1));
+
+            char *key = NULL;
+            asprintf(&key, "%s:%li", sockAddr, sockPort);
+
+
+            if (sockPort > 0) {
+                psa_udpmc_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+                entry->key = key;
+                entry->socketAddress = sockAddr;
+                entry->socketPort = sockPort;
+                entry->connected = false;
+                entry->statically = true;
+                hashMap_put(receiver->requestedConnections.map, (void *) entry->key, entry);
+            } else {
+                L_WARN("[PSA_UDPMC_TR] Invalid static socket address %s", addr);
+                free(key);
+                free(sockAddr);
+            }
+        }
+        free(copy);
+    }
+
     celixThread_create(&receiver->recvThread.thread, NULL, psa_udpmc_recvThread, receiver);
 
     return receiver;
@@ -205,9 +266,8 @@ void pubsub_udpmcTopicReceiver_connectTo(
         long socketPort) {
     printf("[PSA UDPMC] TopicReceiver %s/%s connect to socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
 
-    int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort);
-    char *key = calloc((size_t)len+1, sizeof(char));
-    snprintf(key, (size_t)len+1, "%s:%li", socketAddress, socketPort);
+    char *key = NULL;
+    asprintf(&key, "%s:%li", socketAddress, socketPort);
 
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
     psa_udpmc_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key);
@@ -217,46 +277,15 @@ void pubsub_udpmcTopicReceiver_connectTo(
         entry->socketAddress = strndup(socketAddress, 1024 * 1024);
         entry->socketPort = socketPort;
         entry->connected = false;
+        entry->statically = false;
         hashMap_put(receiver->requestedConnections.map, (void*)entry->key, entry);
+        receiver->requestedConnections.allConnected = false;
     } else {
         free(key);
     }
-    if (!entry->connected) {
-        int rc  = 0;
-        entry->recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
-        if (entry->recvSocket >= 0) {
-            int reuse = 1;
-            rc = setsockopt(entry->recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse));
-        }
-        if (entry->recvSocket >= 0 && rc >= 0) {
-            struct ip_mreq mc_addr;
-            mc_addr.imr_multiaddr.s_addr = inet_addr(entry->socketAddress);
-            mc_addr.imr_interface.s_addr = inet_addr(receiver->ifIpAddress);
-            printf("Adding MC %s at interface %s\n", entry->socketAddress, receiver->ifIpAddress);
-            rc = setsockopt(entry->recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mc_addr, sizeof(mc_addr));
-        }
-        if (entry->recvSocket >= 0 && rc >= 0) {
-            struct sockaddr_in mcListenAddr;
-            mcListenAddr.sin_family = AF_INET;
-            mcListenAddr.sin_addr.s_addr = INADDR_ANY;
-            mcListenAddr.sin_port = htons((uint16_t )entry->socketPort);
-            rc = bind(entry->recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr));
-        }
-        if (entry->recvSocket >= 0 && rc >= 0) {
-            struct epoll_event ev;
-            memset(&ev, 0, sizeof(ev));
-            ev.events = EPOLLIN;
-            ev.data.fd = entry->recvSocket;
-            rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, &ev);
-        }
-
-        if (entry->recvSocket < 0 || rc < 0) {
-            fprintf(stderr, "[PSA UDPMC] Error connecting TopicReceiver %s/%s to %s:%li. (%s)\n", receiver->scope, receiver->topic, socketAddress, socketPort, strerror(errno));
-        } else {
-            entry->connected = true;
-        }
-    }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    psa_udpmc_connectToAllRequestedConnections(receiver);
 }
 
 void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_udpmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) {
@@ -304,6 +333,8 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
         entry = calloc(1, sizeof(*entry));
         entry->usageCount = 1;
         entry->svc = svc;
+        entry->initialized = false;
+        receiver->subscribers.allInitialized = false;
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
         if (rc == 0) {
@@ -347,7 +378,22 @@ static void* psa_udpmc_recvThread(void * data) {
     bool running = receiver->recvThread.running;
     celixThreadMutex_unlock(&receiver->recvThread.mutex);
 
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    bool allConnected = receiver->requestedConnections.allConnected;
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    bool allInitialized = receiver->subscribers.allInitialized;
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
     while (running) {
+        if (!allConnected) {
+            psa_udpmc_connectToAllRequestedConnections(receiver);
+        }
+        if (!allInitialized) {
+            psa_udpmc_initializeAllSubscribers(receiver);
+        }
+
         int nfds = epoll_wait(receiver->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
         int i;
         for(i = 0; i < nfds; i++ ) {
@@ -366,9 +412,18 @@ static void* psa_udpmc_recvThread(void * data) {
                 free(udpMsg);
             }
         }
+
         celixThreadMutex_lock(&receiver->recvThread.mutex);
         running = receiver->recvThread.running;
         celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+        celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+        allConnected = receiver->requestedConnections.allConnected;
+        celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+        celixThreadMutex_lock(&receiver->subscribers.mutex);
+        allInitialized = receiver->subscribers.allInitialized;
+        celixThreadMutex_unlock(&receiver->subscribers.mutex);
     }
 
     return NULL;
@@ -420,3 +475,99 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
+
+void pubsub_udpmcTopicReceiver_listConnections(pubsub_udpmc_topic_receiver_t *receiver, celix_array_list_t *connections) {
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_udpmc_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+        char *url = NULL;
+        const char *post = entry->statically ? " (static)" : "";
+        asprintf(&url, "%s:%li%s", entry->socketAddress, entry->socketPort, post);
+        celix_arrayList_add(connections, url);
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static bool psa_udpmc_connectToEntry(pubsub_udpmc_topic_receiver_t *receiver, psa_udpmc_requested_connection_entry_t *entry) {
+    bool connected = true;
+    int rc  = 0;
+    entry->recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+    if (entry->recvSocket >= 0) {
+        int reuse = 1;
+        rc = setsockopt(entry->recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse));
+    }
+    if (entry->recvSocket >= 0 && rc >= 0) {
+        struct ip_mreq mc_addr;
+        mc_addr.imr_multiaddr.s_addr = inet_addr(entry->socketAddress);
+        mc_addr.imr_interface.s_addr = inet_addr(receiver->ifIpAddress);
+        L_INFO("[PSA_UDPMC_TR] Add MC %s at interface %s\n", entry->socketAddress, receiver->ifIpAddress);
+        rc = setsockopt(entry->recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mc_addr, sizeof(mc_addr));
+    }
+    if (entry->recvSocket >= 0 && rc >= 0) {
+        struct sockaddr_in mcListenAddr;
+        mcListenAddr.sin_family = AF_INET;
+        mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+        mcListenAddr.sin_port = htons((uint16_t )entry->socketPort);
+        rc = bind(entry->recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr));
+    }
+    if (entry->recvSocket >= 0 && rc >= 0) {
+        struct epoll_event ev;
+        memset(&ev, 0, sizeof(ev));
+        ev.events = EPOLLIN;
+        ev.data.fd = entry->recvSocket;
+        rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, &ev);
+    }
+
+    if (entry->recvSocket < 0 || rc < 0) {
+        L_WARN("[PSA UDPMC] Error connecting TopicReceiver %s/%s to %s:%li. (%s)\n", receiver->scope, receiver->topic, entry->socketAddress, entry->socketPort, strerror(errno));
+        connected = false;
+    }
+    return connected;
+}
+
+
+static void psa_udpmc_connectToAllRequestedConnections(pubsub_udpmc_topic_receiver_t *receiver) {
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    if (!receiver->requestedConnections.allConnected) {
+        bool allConnected = true;
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_udpmc_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->connected){
+                if (psa_udpmc_connectToEntry(receiver, entry)) {
+                    entry->connected = true;
+                } else {
+                    allConnected = false;
+                }
+            }
+        }
+        receiver->requestedConnections.allConnected = allConnected;
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_udpmc_initializeAllSubscribers(pubsub_udpmc_topic_receiver_t *receiver) {
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    if (!receiver->subscribers.allInitialized) {
+        bool allInitialized = true;
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->initialized) {
+                int rc = 0;
+                if (entry->svc != NULL && entry->svc->init != NULL) {
+                    rc = entry->svc->init(entry->svc->handle);
+                }
+                if (rc == 0) {
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
+                }
+            }
+        }
+        receiver->subscribers.allInitialized = allInitialized;
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
index 2610489..7eab09a 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
@@ -21,13 +21,16 @@
 
 #include "celix_bundle_context.h"
 #include "pubsub_serializer.h"
+#include "log_helper.h"
 
 typedef struct pubsub_udpmc_topic_receiver pubsub_udpmc_topic_receiver_t;
 
-pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx, 
+pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
         const char *scope, 
         const char *topic, 
-        const char *ifIP, 
+        const char *ifIP,
+        const celix_properties_t *topicProperties,
         long serializerSvcId,
         pubsub_serializer_service_t *serializer);
 void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver);
@@ -35,6 +38,7 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver);
 const char* pubsub_udpmcTopicReceiver_scope(pubsub_udpmc_topic_receiver_t *receiver);
 const char* pubsub_udpmcTopicReceiver_topic(pubsub_udpmc_topic_receiver_t *receiver);
 const char* pubsub_udpmcTopicReceiver_socketAddress(pubsub_udpmc_topic_receiver_t *receiver);
+void pubsub_udpmcTopicReceiver_listConnections(pubsub_udpmc_topic_receiver_t *receiver, celix_array_list_t *connections);
 
 long pubsub_udpmcTopicReceiver_serializerSvcId(pubsub_udpmc_topic_receiver_t *receiver);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 55bf96b..1b7c1db 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -46,6 +46,7 @@ struct pubsub_udpmc_topic_sender {
     char *topic;
     char *socketAddress;
     long socketPort;
+    bool staticallyConfigured;
 
     int sendSocket;
     struct sockaddr_in destAddr;
@@ -89,7 +90,8 @@ pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
         long serializerSvcId,
         pubsub_serializer_service_t *serializer,
         int sendSocket,
-        const char *bindIP) {
+        const char *bindIP,
+        const celix_properties_t *topicProperties) {
     pubsub_udpmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->serializerSvcId = serializerSvcId;
@@ -100,9 +102,17 @@ pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
     celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
     sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
 
+    unsigned int port = rand_range(UDP_BASE_PORT, UDP_MAX_PORT);
+    long configuredPort = celix_properties_getAsLong(topicProperties, PUBSUB_UDPMC_STATIC_BIND_PORT, -1L);
+    if (configuredPort > 0) {
+        port = (unsigned int)configuredPort;
+        sender->staticallyConfigured = true;
+    } else {
+        sender->staticallyConfigured = false;
+    }
+
     //setting up socket for UDPMC TopicSender
     {
-        unsigned int port = rand_range(UDP_BASE_PORT, UDP_MAX_PORT);
         sender->sendSocket = sendSocket;
         sender->destAddr.sin_family = AF_INET;
         sender->destAddr.sin_addr.s_addr = inet_addr(bindIP);
@@ -265,15 +275,15 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
             if (msgSer->msgVersion != NULL){
                 version_getMajor(msgSer->msgVersion, &major);
                 version_getMinor(msgSer->msgVersion, &minor);
-                msg_hdr->major = major;
-                msg_hdr->minor = minor;
+                msg_hdr->major = (unsigned char)major;
+                msg_hdr->minor = (unsigned char)minor;
             }
 
 
             pubsub_msg_t *msg = calloc(1, sizeof(pubsub_msg_t));
             msg->header = msg_hdr;
             msg->payload = (char *) serializedOutput;
-            msg->payloadSize = serializedOutputLen;
+            msg->payloadSize = (unsigned int)serializedOutputLen;
 
 
             if (psa_udpmc_sendMsg(entry, msg) == false) {
@@ -335,3 +345,7 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
 long pubsub_udpmcTopicSender_serializerSvcId(pubsub_udpmc_topic_sender_t *sender) {
     return sender->serializerSvcId;
 }
+
+bool pubsub_udpmcTopicSender_isStatic(pubsub_udpmc_topic_sender_t *sender) {
+    return sender->staticallyConfigured;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
index 07c1301..e4ccbfb 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
@@ -31,13 +31,16 @@ pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
         long serializerSvcId,
         pubsub_serializer_service_t *serializer,
         int sendSocket,
-        const char *bindIP);
+        const char *bindIP,
+        const celix_properties_t *topicProperties);
 void pubsub_udpmcTopicSender_destroy(pubsub_udpmc_topic_sender_t *sender);
 
 const char* pubsub_udpmcTopicSender_scope(pubsub_udpmc_topic_sender_t *sender);
 const char* pubsub_udpmcTopicSender_topic(pubsub_udpmc_topic_sender_t *sender);
 const char* pubsub_udpmcTopicSender_socketAddress(pubsub_udpmc_topic_sender_t *sender);
 long pubsub_udpmcTopicSender_socketPort(pubsub_udpmc_topic_sender_t *sender);
+bool pubsub_udpmcTopicSender_isStatic(pubsub_udpmc_topic_sender_t *sender);
+
 
 long pubsub_udpmcTopicSender_serializerSvcId(pubsub_udpmc_topic_sender_t *sender);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
index 3cc699a..2bd4438 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -23,9 +23,6 @@
 #define PUBSUB_PSA_ZMQ_CONSTANTS_H_
 
 
-
-#define PSA_ZMQ_PUBSUB_ADMIN_TYPE	            "zmq"
-
 #define PSA_ZMQ_BASE_PORT                       "PSA_ZMQ_BASE_PORT"
 #define PSA_ZMQ_MAX_PORT                        "PSA_ZMQ_MAX_PORT"
 
@@ -40,11 +37,44 @@
 #define PSA_ZMQ_QOS_CONTROL_SCORE_KEY 		    "PSA_ZMQ_QOS_CONTROL_SCORE"
 #define PSA_ZMQ_DEFAULT_SCORE_KEY 			    "PSA_ZMQ_DEFAULT_SCORE"
 
-#define PSA_ZMQ_DEFAULT_VERBOSE 			    false
-#define PSA_ZMQ_VERBOSE_KEY		 			    "PSA_ZMQ_VERBOSE"
+
+#define PUBSUB_ZMQ_VERBOSE_KEY      "PSA_ZMQ_VERBOSE"
+#define PUBSUB_ZMQ_VERBOSE_DEFAULT  true
+
+#define PUBSUB_ZMQ_PSA_IP_KEY       "PSA_IP"
+#define PUBSUB_ZMQ_PSA_ITF_KEY		"PSA_INTERFACE"
+#define PUBSUB_ZMQ_NR_THREADS_KEY   "PSA_ZMQ_NR_THREADS"
+
+#define PUBSUB_ZMQ_DEFAULT_IP       "127.0.0.1"
+
+#define PUBSUB_ZMQ_ADMIN_TYPE       "zmq"
+
+/**
+ * The ZMQ url key for the topic sender endpoints
+ */
+#define PUBSUB_ZMQ_URL_KEY          "zmq.url"
+
 
 
-#define PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY			"pubsub.zmq.url"
+/**
+ * Can be set in the topic properties to fix a static bind url
+ */
+#define PUBSUB_ZMQ_STATIC_BIND_URL       "zmq.static.bind.url"
+
+/**
+ * Can be set in the topic properties to fix a static url used for discovery
+ */
+#define PUBSUB_ZMQ_STATIC_DISCOVER_URL       "zmq.static.bind.url"
 
+/**
+ * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_ZMQ_STATIC_CONFIGURED       "zmq.static.configured"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_ZMQ_STATIC_CONNECT_URLS    "zmq.static.connect.urls"
 
 #endif /* PUBSUB_PSA_ZMQ_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index c5bfb99..8c9e163 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -377,10 +377,18 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
             //if configured use a static discover url
             const char *staticDiscUrl = celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_DISCOVER_URL, NULL);
             if (staticDiscUrl != NULL) {
-                celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl);
+                celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl);
             }
             celix_properties_setBool(newEndpoint, PUBSUB_ZMQ_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL);
 
+            //if url starts with ipc:// constrain discovery to host visibility, else use system visibility
+            const char *u = celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, "");
+            if (strncmp("ipc://", u, strlen("ipc://")) == 0) {
+                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_HOST_VISIBLITY);
+            } else {
+                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBLITY);
+            }
+
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index a249b5a..802bc38 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -22,40 +22,7 @@
 
 #include "celix_api.h"
 #include "log_helper.h"
-
-#define PUBSUB_ZMQ_ADMIN_TYPE       "zmq"
-#define PUBSUB_ZMQ_URL_KEY          "zmq.url"
-
-#define PUBSUB_ZMQ_VERBOSE_KEY      "PSA_ZMQ_VERBOSE"
-#define PUBSUB_ZMQ_VERBOSE_DEFAULT  true
-
-#define PUBSUB_ZMQ_PSA_IP_KEY       "PSA_IP"
-#define PUBSUB_ZMQ_PSA_ITF_KEY		"PSA_INTERFACE"
-#define PUBSUB_ZMQ_NR_THREADS_KEY   "PSA_ZMQ_NR_THREADS"
-
-#define PUBSUB_ZMQ_DEFAULT_IP       "127.0.0.1"
-
-/**
- * Can be set in the topic properties to fix a static bind url
- */
-#define PUBSUB_ZMQ_STATIC_BIND_URL       "zmq.static.bind.url"
-
-/**
- * Can be set in the topic properties to fix a static url used for discovery
- */
-#define PUBSUB_ZMQ_STATIC_DISCOVER_URL       "zmq.static.bind.url"
-
-/**
- * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
- */
-#define PUBSUB_ZMQ_STATIC_CONFIGURED       "zmq.static.configured"
-
-/**
- * The static url which a subscriber should try to connect to.
- * The urls are space separated
- */
-#define PUBSUB_ZMQ_STATIC_CONNECT_URLS    "zmq.static.connect.urls"
-
+#include "pubsub_psa_zmq_constants.h"
 
 typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 673bdfd..a86178c 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -452,10 +452,16 @@ celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_propert
 
 static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, celix_properties_t *endpoint) {
     const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+    assert(uuid != NULL);
 
     celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
     bool exists = hashMap_containsKey(disc->discoveredEndpoints, (void*)uuid);
-    hashMap_put(disc->discoveredEndpoints, (void*)uuid, endpoint);
+    if (exists) {
+        //if exists -> keep old and free properties
+        celix_properties_destroy(endpoint);
+    } else {
+        hashMap_put(disc->discoveredEndpoints, (void*)uuid, endpoint);
+    }
     celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
 
     if (!exists) {
@@ -512,7 +518,7 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
 }
 
 celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, const char* etcdValue) {
-    properties_t *props = properties_create();
+    properties_t *props = celix_properties_create();
 
     // etcdValue contains the json formatted string
     json_error_t error;
@@ -528,7 +534,7 @@ celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, con
         while (iter) {
             key = json_object_iter_key(iter);
             value = json_object_iter_value(iter);
-            properties_set(props, key, json_string_value(value));
+            celix_properties_set(props, key, json_string_value(value));
             iter = json_object_iter_next(jsonRoot, iter);
         }
     }

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index 941f245..2d7130e 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -52,6 +52,23 @@ celix_bundle_files(pubsub_tst
 )
 
 
+add_celix_container(pubsub_udpmc_tests
+        GEN_BUNDLES_CONFIG #ensures that a config.properties will be created with the launch bundles.
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+        DIR ${CMAKE_CURRENT_BINARY_DIR}
+        PROPERTIES
+            LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+        BUNDLES
+            Celix::pubsub_serializer_json
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_udp_multicast
+            pubsub_sut
+            pubsub_tst
+)
+target_link_libraries(pubsub_udpmc_tests PRIVATE Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
+target_include_directories(pubsub_udpmc_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
+add_test(NAME run_pubsub_udpmc_tests COMMAND pubsub_udpmc_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_udpmc_tests,CONTAINER_LOC>)
+SETUP_TARGET_FOR_COVERAGE(pubsub_udpmc_tests pubsub_udpmc_tests ${CMAKE_BINARY_DIR}/coverage/pubsub/pubsub_udpmc)
 
 if (BUILD_PUBSUB_PSA_ZMQ)
     add_celix_container(pubsub_zmq_tests
@@ -71,5 +88,4 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     target_include_directories(pubsub_zmq_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
     add_test(NAME run_pubsub_zmq_tests COMMAND pubsub_zmq_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_tests,CONTAINER_LOC>)
     SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_tests pubsub_zmq_tests ${CMAKE_BINARY_DIR}/coverage/pubsub/pubsub_zmq)
-endif ()
-
+endif ()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/test/meta_data/ping.properties
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/ping.properties
index 09284fc..4cb9981 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/ping.properties
@@ -16,4 +16,6 @@
 # under the License.
 zmq.static.bind.url=ipc:///tmp/pubsub-pingtest
 zmq.static.connect.urls=ipc:///tmp/pubsub-pingtest
+udpmc.static.bind.port=50678
+udpmc.static.connect.socket_addresses=224.100.0.1:50678