You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:21 UTC

[04/34] celix git commit: CELIX-454: Refactors pubsub discovery and topology manager trying to prevent race conditions

CELIX-454: Refactors pubsub discovery and topology manager trying to prevent race conditions

Also:
 - Adds etcd_refresh to the etcdlib which can be used to refresh a ttl of a etcd key
 - Adds cond_timedwait for celix threads


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/e30a70f7
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/e30a70f7
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/e30a70f7

Branch: refs/heads/develop
Commit: e30a70f7233631a0b9ad8f2e1f1876cd8956ea68
Parents: 7158795
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Thu Sep 20 17:48:14 2018 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Thu Sep 20 17:48:14 2018 +0200

----------------------------------------------------------------------
 .../pubsub_admin_udp_mc/src/pubsub_admin_impl.c | 157 +++--
 .../pubsub_admin_udp_mc/src/pubsub_admin_impl.h |   3 +
 .../pubsub_admin_udp_mc/src/topic_publication.c |  24 +-
 .../src/topic_subscription.c                    |  20 +-
 .../pubsub_admin_zmq/src/pubsub_admin_impl.c    | 155 ++--
 .../pubsub_admin_zmq/src/pubsub_admin_impl.h    |   1 -
 .../src/pubsub_psa_zmq_constants.h              |   2 +
 .../pubsub_admin_zmq/src/topic_publication.c    |  13 +-
 bundles/pubsub/pubsub_discovery/CMakeLists.txt  |   3 -
 .../pubsub/pubsub_discovery/src/etcd_common.c   |  82 ---
 .../pubsub/pubsub_discovery/src/etcd_common.h   |  28 -
 .../pubsub/pubsub_discovery/src/etcd_watcher.c  | 322 ---------
 .../pubsub/pubsub_discovery/src/etcd_watcher.h  |  38 -
 .../pubsub/pubsub_discovery/src/etcd_writer.c   | 217 ------
 .../pubsub/pubsub_discovery/src/etcd_writer.h   |  39 --
 .../pubsub/pubsub_discovery/src/psd_activator.c | 155 +---
 .../pubsub_discovery/src/pubsub_discovery.h     |  26 -
 .../src/pubsub_discovery_impl.c                 | 699 +++++++++----------
 .../src/pubsub_discovery_impl.h                 |  78 ++-
 .../include/publisher_endpoint_announce.h       |  40 --
 .../pubsub/pubsub_spi/include/pubsub_common.h   |  16 +-
 .../pubsub/pubsub_spi/include/pubsub_endpoint.h |  55 +-
 .../pubsub_spi/include/pubsub_listeners.h       |  50 ++
 .../pubsub/pubsub_spi/src/pubsub_admin_match.c  |  14 +-
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 285 ++++----
 .../src/pstm_activator.c                        | 243 ++-----
 .../src/pubsub_topology_manager.c               | 418 +++++------
 .../src/pubsub_topology_manager.h               |  44 +-
 bundles/shell/shell/src/lb_command.c            |   5 +-
 libs/etcdlib/api/etcd.h                         |  10 +-
 libs/etcdlib/src/etcd.c                         |  46 +-
 libs/framework/include/celix_bundle_context.h   |  35 +-
 libs/framework/src/bundle_context.c             |  32 +
 libs/framework/src/service_tracker.c            |  33 +-
 libs/utils/include/celix_properties.h           |   5 +-
 libs/utils/include/celix_threads.h              |   2 +
 libs/utils/private/test/properties_test.cpp     |  61 ++
 libs/utils/src/celix_threads.c                  |   7 +
 libs/utils/src/properties.c                     |  24 +-
 39 files changed, 1306 insertions(+), 2181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
index 1e3cef0..0638efb 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
@@ -342,7 +342,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 		else{
 			if (admin->verbose) {
 				printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-					   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+					   properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 			}
 
 			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -363,8 +363,8 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 				if(topic_publishers!=NULL){
 					for(i=0;i<arrayList_size(topic_publishers);i++){
 						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
-							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+						if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 						}
 					}
 					arrayList_destroy(topic_publishers);
@@ -381,8 +381,8 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 				if(ext_pub_list!=NULL){
 					for(i=0;i<arrayList_size(ext_pub_list);i++){
 						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
-							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+						if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 						}
 					}
 				}
@@ -412,7 +412,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
+	if(strcmp(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
 		return pubsubAdmin_addAnySubscription(admin,subEP);
 	}
 
@@ -422,7 +422,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	celixThreadMutex_lock(&admin->localPublicationsLock);
 	celixThreadMutex_lock(&admin->externalPublicationsLock);
 
-	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 	service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 	array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
@@ -437,11 +437,11 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 			pubsub_serializer_service_t *best_serializer = NULL;
             const char *serType = NULL;
 			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
-				status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription);
+				status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription);
 			} else {
 				if (admin->verbose) {
 					printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-						   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+						   properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 				}
 
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -459,8 +459,8 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 					if(topic_publishers!=NULL){
 						for(i=0;i<arrayList_size(topic_publishers);i++){
 							pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-							if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
-								status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+							if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+								status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 							}
 						}
 						arrayList_destroy(topic_publishers);
@@ -472,8 +472,8 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 				if(ext_pub_list!=NULL){
 					for(i=0;i<arrayList_size(ext_pub_list);i++){
 						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
-							status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+						if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+							status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 						}
 					}
 				}
@@ -504,16 +504,16 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 
     if (admin->verbose) {
-        printf("PSA_UDPMC: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-               properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+        printf("[PSA_UDPMC] Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
+        printf("[PSA_UDPMC] \t [endpoint socket address = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
     }
 
 	return status;
@@ -524,19 +524,19 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
 	celix_status_t status = CELIX_SUCCESS;
 
     if (admin->verbose) {
-        printf("PSA_UDPMC: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-               properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+        printf("[PSA_UDPMC] Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
+        printf("[PSA_UDPMC] \t [endpoint url = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
     }
 
-	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 	celixThreadMutex_lock(&admin->subscriptionsLock);
 	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -575,22 +575,22 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
         return CELIX_INVALID_BUNDLE_CONTEXT;
     }
 
-    const char *epFwUUID = properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+    const char *epFwUUID = properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
     bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
 
     if (isOwn) {
         //should be null, willl be set in this call
-        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == NULL);
-        assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
     }
 
     if (isOwn) {
-        properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+        properties_set(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
     }
 
-	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-	if ((strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
+	if ((strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) == NULL)) {
 
 		celixThreadMutex_lock(&admin->localPublicationsLock);
 
@@ -603,11 +603,11 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
 				status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, serType, admin->mcIpAddress, &pub);
                 if (isOwn) {
-                    properties_set(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+                    properties_set(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY, serType);
                 }
 			} else {
 				printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
-					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+					   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 				arrayList_add(admin->noSerializerPublications,pubEP);
@@ -621,10 +621,9 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 					connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
 				}
 			} else {
-				printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %s).\n",
-					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
-					   properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
+				printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s.\n",
+					   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+					   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 			}
 		} else {
 			//just add the new EP to the list
@@ -671,14 +670,14 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 	celixThreadMutex_lock(&admin->subscriptionsLock);
 
 	topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
-	if (sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
-		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+	if (sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
+		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 	}
 
 	/* And check also for ANY subscription */
 	topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-	if (any_sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
-		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+	if (any_sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
+		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 	}
 
 	free(scope_topic);
@@ -687,16 +686,16 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 
     if (admin->verbose) {
         printf("PSA_UDPMC: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-               properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+               properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
         printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+               properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),
                isOwn);
     }
 
@@ -710,15 +709,15 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 
     if (admin->verbose) {
         printf("PSA_UDPMC: Adding publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-               properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+               properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
     }
 
 	const char* fwUUID = NULL;
@@ -728,9 +727,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 		printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
 		return CELIX_INVALID_BUNDLE_CONTEXT;
 	}
-	char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-	if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
+	if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
 
 		celixThreadMutex_lock(&admin->localPublicationsLock);
 		service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -767,7 +766,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 			// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
 			for(i=0; i<arrayList_size(ext_pub_list);i++) {
 				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-				if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 0) {
+				if (strcmp(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),properties_get(p->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)) == 0) {
 					count++;
 				}
 			}
@@ -789,14 +788,14 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 	celixThreadMutex_lock(&admin->subscriptionsLock);
 
 	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-	if(sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
-		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+	if(sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
+		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 	}
 
 	/* And check also for ANY subscription */
 	topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-	if(any_sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
-		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+	if(any_sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
+		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 	}
 
 	free(scope_topic);
@@ -895,7 +894,7 @@ static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip)
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 	array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
 	if(pendingListPerTopic==NULL){
 		arrayList_create(&pendingListPerTopic);
@@ -992,8 +991,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 				/* Remove the publication */
 				pubsubAdmin_removePublication(admin, pubEP);
 				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
-				if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
-					properties_unset(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL);
+				if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
+					properties_unset(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
 				}
 				/* Add the orphan endpoint to the noSerializer pending list */
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -1043,8 +1042,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 				/* Remove the subscription */
 				pubsubAdmin_removeSubscription(admin, subEP);
 				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
-				if(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
-					properties_unset(subEP->endpoint_props, PUBSUB_ENDPOINT_URL);
+				if(properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
+					properties_unset(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
 				}
 				/* Add the orphan endpoint to the noSerializer pending list */
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -1109,7 +1108,7 @@ static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsu
 	service_reference_pt svcRef = NULL;
 
 	celixThreadMutex_lock(&admin->serializerListLock);
-	status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, &svcRef);
+	status = pubsub_admin_get_best_serializer(ep->properties, admin->serializerList, &svcRef);
 	celixThreadMutex_unlock(&admin->serializerListLock);
 
 	if (svcRef != NULL) {

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
index 3529a8f..b82e8a1 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
@@ -31,6 +31,9 @@
 #include "pubsub_admin.h"
 #include "log_helper.h"
 
+
+#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY			"pubsub.udpmc.socket_address"
+
 struct pubsub_admin {
 
 	bundle_context_pt bundle_context;

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index 7e9bdbb..046feb6 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@ -26,6 +26,7 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
+#include <celix_api.h>
 
 #include "array_list.h"
 #include "celixbool.h"
@@ -41,6 +42,7 @@
 
 #include "pubsub_serializer.h"
 #include "pubsub_psa_udpmc_constants.h"
+#include "pubsub_admin_impl.h"
 
 #define EP_ADDRESS_LEN		32
 
@@ -101,7 +103,7 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt
 	char* ep = malloc(EP_ADDRESS_LEN);
 	memset(ep,0,EP_ADDRESS_LEN);
 
-	long serviceId =strtol(properties_getWithDefault(pubEP->endpoint_props, PUBSUB_ENDPOINT_SERVICE_ID, "0"), NULL, 10);
+	long serviceId = celix_properties_getAsLong(pubEP->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
 
 	unsigned int port = serviceId + rand_range(UDP_BASE_PORT+serviceId+3, UDP_MAX_PORT);
 	snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
@@ -176,8 +178,8 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 		factory->ungetService = pubsub_topicPublicationUngetService;
 
 		properties_pt props = properties_create();
-		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
-		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
                 properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
 
 
@@ -185,12 +187,10 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 
 		if(status != CELIX_SUCCESS){
 			properties_destroy(props);
-			printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %s).\n",
-				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
-				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_BUNDLE_ID));
-		}
-		else{
+			printf("[PSA UDPMC] Cannot register ServiceFactory for topic %s, topic %s.\n",
+				   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+				   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+		} else {
 			*svcFactory = factory;
 		}
 	}
@@ -209,7 +209,7 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep) {
 
 	celixThreadMutex_lock(&(pub->tp_lock));
-	pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
+	pubsubEndpoint_setField(ep, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, pub->endpoint);
 	pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
 	pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type);
 	arrayList_add(pub->pub_ep_list,ep);
@@ -402,8 +402,8 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		}
 
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-		bound->scope=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
-		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+		bound->scope=strdup(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+		bound->topic=strdup(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 		bound->largeUdpHandle = largeUdp_create(1);
 
 		bound->service.handle = bound;

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
index 6d0768b..82adf24 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
@@ -254,15 +254,15 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 	return status;
 }
 
-celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL) {
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* socketAddress) {
 
-	printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
+	printf("[PSA UDPMC] topicSubscriptionConnectPublisher : socket address = %s\n", socketAddress);
 
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->socketMap_lock);
 
-	if(!hashMap_containsKey(ts->socketMap, pubURL)){
+	if(!hashMap_containsKey(ts->socketMap, socketAddress)){
 
 		int *recvSocket = calloc(sizeof(int), 1);
 		*recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
@@ -282,7 +282,7 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
 		if(status == CELIX_SUCCESS){
 			// TODO Check if there is a better way to parse the URL to IP/Portnr
 			//replace ':' by spaces
-			char *url = strdup(pubURL);
+			char *url = strdup(socketAddress);
 			char *pt = url;
 			while((pt=strchr(pt, ':')) != NULL) {
 				*pt = ' ';
@@ -292,7 +292,7 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
 			sscanf(url, "udp //%s %hu", mcIp, &mcPort);
 			free(url);
 
-			printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
+			printf("[PSA UDPMC] topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
 
 			struct ip_mreq mc_addr;
 			mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
@@ -332,7 +332,7 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
 		}
 
 		if (status == CELIX_SUCCESS){
-			hashMap_put(ts->socketMap, strdup(pubURL), (void*)recvSocket);
+			hashMap_put(ts->socketMap, strdup(socketAddress), (void*)recvSocket);
 		}
 		else{
 			free(recvSocket);
@@ -344,18 +344,18 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
 	return status;
 }
 
-celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
+celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* socketAddress) {
 	celix_status_t status = CELIX_SUCCESS;
-	char *url = strdup(pubURL);
+	char *url = strdup(socketAddress);
 	celixThreadMutex_lock(&ts->pendingConnections_lock);
 	arrayList_add(ts->pendingConnections, url);
 	celixThreadMutex_unlock(&ts->pendingConnections_lock);
 	return status;
 }
 
-celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
+celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* socketAddress) {
 	celix_status_t status = CELIX_SUCCESS;
-	char *url = strdup(pubURL);
+	char *url = strdup(socketAddress);
 	celixThreadMutex_lock(&ts->pendingDisconnections_lock);
 	arrayList_add(ts->pendingDisconnections, url);
 	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
index e9bb6c3..0af73e4 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@ -342,7 +342,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
         }
         else{
             printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-                    properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                    properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
             celixThreadMutex_lock(&admin->noSerializerPendingsLock);
             arrayList_add(admin->noSerializerSubscriptions,subEP);
             celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -361,8 +361,8 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
                 if(topic_publishers!=NULL){
                     for(i=0;i<arrayList_size(topic_publishers);i++){
                         pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                        if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
-                            status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                        if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){
+                            status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
                         }
                     }
                     arrayList_destroy(topic_publishers);
@@ -379,8 +379,8 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
                 if(ext_pub_list!=NULL){
                     for(i=0;i<arrayList_size(ext_pub_list);i++){
                         pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                        if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
-                            status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                        if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){
+                            status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
                         }
                     }
                 }
@@ -415,7 +415,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
     celix_status_t status = CELIX_SUCCESS;
 
-    if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), PUBSUB_ANY_SUB_TOPIC)==0) {
+    if(strcmp(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME), PUBSUB_ANY_SUB_TOPIC)==0) {
         return pubsubAdmin_addAnySubscription(admin,subEP);
     }
 
@@ -425,7 +425,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
     celixThreadMutex_lock(&admin->localPublicationsLock);
     celixThreadMutex_lock(&admin->externalPublicationsLock);
 
-    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
     service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
     array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
@@ -441,11 +441,11 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
             pubsub_serializer_service_t *best_serializer = NULL;
             const char *serType = NULL;
             if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
-                status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, serType, &subscription);
+                status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, serType, &subscription);
             }
             else{
                 printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-                        properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                        properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
                 celixThreadMutex_lock(&admin->noSerializerPendingsLock);
                 arrayList_add(admin->noSerializerSubscriptions,subEP);
                 celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -461,8 +461,8 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
                     if(topic_publishers!=NULL){
                         for(i=0;i<arrayList_size(topic_publishers);i++){
                             pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                            if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
-                                status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                            if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){
+                                status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
                             }
                         }
                         arrayList_destroy(topic_publishers);
@@ -474,8 +474,8 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
                 if(ext_pub_list!=NULL){
                     for(i=0;i<arrayList_size(ext_pub_list);i++){
                         pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                        if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
-                            status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                        if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){
+                            status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*) properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
                         }
                     }
                 }
@@ -508,15 +508,15 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 
     if (admin->verbose) {
         printf("PSA_ZMQ: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-                properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
-                properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
     }
 
     return status;
@@ -528,18 +528,18 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
 
     if (admin->verbose) {
         printf("PSA_ZMQ: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-                properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
-                properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
-                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
     }
 
-    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
     celixThreadMutex_lock(&admin->subscriptionsLock);
     topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -586,28 +586,28 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
         return CELIX_INVALID_BUNDLE_CONTEXT;
     }
 
-    const char *epFwUUID = properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+    const char *epFwUUID = properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
     bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
 
     if (isOwn) {
         //should be null, willl be set in this call
-        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == NULL);
-        assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
     } else {
         //inverse ADMIN_TYPE_KEY and SERIALIZER_TYPE shoudl not be null
-        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) != NULL);
-        assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) != NULL);
+        assert(properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY) != NULL);
+        assert(properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY) != NULL);
     }
 
     if (isOwn) {
-        properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+        properties_set(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
     }
 
 
-    char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-    if ((strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) &&
-            (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
+    if ((strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) &&
+            (properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) == NULL)) {
 
         celixThreadMutex_lock(&admin->localPublicationsLock);
 
@@ -620,12 +620,12 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
             if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
                 status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, serType, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
                 if (isOwn) {
-                    properties_set(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+                    properties_set(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY, serType);
                 }
             }
             else {
                 printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
-                        properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                        properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
                 celixThreadMutex_lock(&admin->noSerializerPendingsLock);
                 arrayList_add(admin->noSerializerPublications,pubEP);
                 celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -638,10 +638,9 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
                     connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
                 }
             } else {
-                printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %s).\n",
-                        properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                        properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
-                        properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
+                printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s.\n",
+                        properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                        properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
             }
         } else {
             //just add the new EP to the list
@@ -689,14 +688,14 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
     celixThreadMutex_lock(&admin->subscriptionsLock);
 
     topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
-    if (sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
-        pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    if (sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) {
+        pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
     }
 
     /* And check also for ANY subscription */
     topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-    if (any_sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
-        pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    if (any_sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) {
+        pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
     }
 
     free(scope_topic);
@@ -706,16 +705,16 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
 
     if (admin->verbose) {
         printf("PSA_ZMQ: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-                properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
-                properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+                properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
         printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+                properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY),
                 isOwn);
     }
 
@@ -730,15 +729,15 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 
     if (admin->verbose) {
         printf("PSA_ZMQ: Removing publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-                properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
-                properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
     }
 
     const char* fwUUID = NULL;
@@ -748,9 +747,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
         fprintf(stderr, "ERROR PSA_ZMQ: Cannot retrieve fwUUID.\n");
         return CELIX_INVALID_BUNDLE_CONTEXT;
     }
-    char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-    if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
+    if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
 
         celixThreadMutex_lock(&admin->localPublicationsLock);
         service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -786,7 +785,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
             // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
             for(i=0; i<arrayList_size(ext_pub_list);i++) {
                 pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 0) {
+                if (strcmp(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY),properties_get(p->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)) == 0) {
                     count++;
                 }
             }
@@ -808,14 +807,14 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
     celixThreadMutex_lock(&admin->subscriptionsLock);
 
     topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-    if(sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
-        pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    if(sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)!=NULL && count == 0){
+        pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
     }
 
     /* And check also for ANY subscription */
     topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-    if(any_sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
-        pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    if(any_sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)!=NULL && count == 0){
+        pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
     }
 
     free(scope_topic);
@@ -917,7 +916,7 @@ static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip)
 
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
     celix_status_t status = CELIX_SUCCESS;
-    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
     array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
     if(pendingListPerTopic==NULL){
         arrayList_create(&pendingListPerTopic);
@@ -1013,8 +1012,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
                 /* Remove the publication */
                 pubsubAdmin_removePublication(admin, pubEP);
                 /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
-                if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
-                    properties_unset(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL);
+                if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)!=NULL){
+                    properties_unset(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY);
                 }
                 /* Add the orphan endpoint to the noSerializer pending list */
                 celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -1064,8 +1063,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
                 /* Remove the subscription */
                 pubsubAdmin_removeSubscription(admin, subEP);
                 /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
-                if(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
-                    properties_unset(subEP->endpoint_props, PUBSUB_ENDPOINT_URL);
+                if(properties_get(subEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)!=NULL){
+                    properties_unset(subEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY);
                 }
                 /* Add the orphan endpoint to the noSerializer pending list */
                 celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -1130,7 +1129,7 @@ static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub
     service_reference_pt svcRef = NULL;
 
     celixThreadMutex_lock(&admin->serializerListLock);
-    status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, &svcRef);
+    status = pubsub_admin_get_best_serializer(ep->properties, admin->serializerList, &svcRef);
     celixThreadMutex_unlock(&admin->serializerListLock);
 
     if (svcRef != NULL) {

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
index c788382..4569a97 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
@@ -43,7 +43,6 @@
 #include "log_helper.h"
 #include "command.h"
 
-
 struct pubsub_admin {
 
 	bundle_context_pt bundle_context;

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
index 211439e..3cc699a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -44,5 +44,7 @@
 #define PSA_ZMQ_VERBOSE_KEY		 			    "PSA_ZMQ_VERBOSE"
 
 
+#define PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY			"pubsub.zmq.url"
+
 
 #endif /* PUBSUB_PSA_ZMQ_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
index c81107d..35da907 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
@@ -288,17 +288,16 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 		factory->ungetService = pubsub_topicPublicationUngetService;
 
 		properties_pt props = properties_create();
-		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
-		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE));
 		properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
 
 		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 
 		if(status != CELIX_SUCCESS){
 			properties_destroy(props);
-			printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %s).\n",
-				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
-				   properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
+			printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s.\n",
+				   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 		}
 		else{
 			*svcFactory = factory;
@@ -321,7 +320,7 @@ celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, p
 	celixThreadMutex_lock(&(pub->tp_lock));
 	pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
 	pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type);
-    pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
+    pubsubEndpoint_setField(ep, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY, pub->endpoint);
 	arrayList_add(pub->pub_ep_list,ep);
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
@@ -590,7 +589,7 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		arrayList_create(&bound->mp_parts);
 
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+		bound->topic=strdup(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 		bound->service.handle = bound;
 		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/CMakeLists.txt b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
index 9085f14..96a8af1 100644
--- a/bundles/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
@@ -25,9 +25,6 @@ add_celix_bundle(celix_pubsub_discovery_etcd
     SOURCES
         src/psd_activator.c
         src/pubsub_discovery_impl.c
-        src/etcd_common.c
-        src/etcd_watcher.c
-        src/etcd_writer.c
 )
 target_include_directories(celix_pubsub_discovery_etcd PRIVATE src)
 target_include_directories(celix_pubsub_discovery_etcd SYSTEM PRIVATE 

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_common.c b/bundles/pubsub/pubsub_discovery/src/etcd_common.c
deleted file mode 100644
index c757801..0000000
--- a/bundles/pubsub/pubsub_discovery/src/etcd_common.c
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdbool.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-
-#include "celix_log.h"
-#include "constants.h"
-
-#include <curl/curl.h>
-#include "etcd.h"
-#include "etcd_watcher.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_discovery_impl.h"
-
-
-#define MAX_ROOTNODE_LENGTH		128
-#define MAX_LOCALNODE_LENGTH 	4096
-#define MAX_FIELD_LENGTH		128
-
-#define CFG_ETCD_SERVER_IP		"PUBSUB_DISCOVERY_ETCD_SERVER_IP"
-#define DEFAULT_ETCD_SERVER_IP	"127.0.0.1"
-
-#define CFG_ETCD_SERVER_PORT	"PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
-#define DEFAULT_ETCD_SERVER_PORT 2379
-
-// be careful - this should be higher than the curl timeout
-#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
-#define DEFAULT_ETCD_TTL 30
-
-
-celix_status_t etcdCommon_init(bundle_context_pt context) {
-    celix_status_t status = CELIX_SUCCESS;
-    const char* etcd_server = NULL;
-    const char* etcd_port_string = NULL;
-    int etcd_port = 0;
-
-    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS) || !etcd_server) {
-        etcd_server = DEFAULT_ETCD_SERVER_IP;
-    }
-
-    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_PORT, &etcd_port_string) != CELIX_SUCCESS) || !etcd_port_string) {
-        etcd_port = DEFAULT_ETCD_SERVER_PORT;
-    } else {
-        char* endptr = NULL;
-        errno = 0;
-        etcd_port = strtol(etcd_port_string, &endptr, 10);
-        if (*endptr || errno != 0) {
-            etcd_port = DEFAULT_ETCD_SERVER_PORT;
-        }
-    }
-
-    printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port);
-
-    if (etcd_init(etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) {
-        status = CELIX_BUNDLE_EXCEPTION;
-    } else {
-        status = CELIX_SUCCESS;
-    }
-
-    return status;
-}
-

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/etcd_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_common.h b/bundles/pubsub/pubsub_discovery/src/etcd_common.h
deleted file mode 100644
index 7a3e7b6..0000000
--- a/bundles/pubsub/pubsub_discovery/src/etcd_common.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef ETCD_COMMON_H_
-#define ETCD_COMMON_H_
-
-#include "bundle_context.h"
-#include "celix_errno.h"
-
-celix_status_t etcdCommon_init(bundle_context_pt context);
-
-#endif /* ETCD_COMMON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_watcher.c b/bundles/pubsub/pubsub_discovery/src/etcd_watcher.c
deleted file mode 100644
index c698172..0000000
--- a/bundles/pubsub/pubsub_discovery/src/etcd_watcher.c
+++ /dev/null
@@ -1,322 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdbool.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-#include <jansson.h>
-
-#include "celix_log.h"
-#include "constants.h"
-
-#include "etcd.h"
-#include "etcd_watcher.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_discovery_impl.h"
-
-
-
-#define MAX_ROOTNODE_LENGTH             128
-#define MAX_LOCALNODE_LENGTH            4096
-#define MAX_FIELD_LENGTH                128
-
-#define CFG_ETCD_ROOT_PATH              "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
-#define DEFAULT_ETCD_ROOTPATH           "pubsub/discovery"
-
-#define CFG_ETCD_SERVER_IP              "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
-#define DEFAULT_ETCD_SERVER_IP          "127.0.0.1"
-
-#define CFG_ETCD_SERVER_PORT            "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
-#define DEFAULT_ETCD_SERVER_PORT        2379
-
-// be careful - this should be higher than the curl timeout
-#define CFG_ETCD_TTL                    "DISCOVERY_ETCD_TTL"
-#define DEFAULT_ETCD_TTL                30
-
-
-struct etcd_watcher {
-	pubsub_discovery_pt pubsub_discovery;
-
-	celix_thread_mutex_t watcherLock;
-	celix_thread_t watcherThread;
-
-	char *scope;
-	char *topic;
-	volatile bool running;
-};
-
-struct etcd_writer {
-	pubsub_discovery_pt pubsub_discovery;
-	celix_thread_mutex_t localPubsLock;
-	array_list_pt localPubs;
-	volatile bool running;
-	celix_thread_t writerThread;
-};
-
-
-// note that the rootNode shouldn't have a leading slash
-static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, const char *scope, const char *topic, char* rootNode, int rootNodeLen) {
-	celix_status_t status = CELIX_SUCCESS;
-	const char* rootPath = NULL;
-
-	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
-		snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic);
-	} else {
-		snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic);
-	}
-
-	return status;
-}
-
-static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) {
-	celix_status_t status = CELIX_SUCCESS;
-	const char* rootPath = NULL;
-
-	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
-		strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
-	} else {
-		strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
-	}
-
-	return status;
-}
-
-
-static void add_node(const char *key, const char *value, void* arg) {
-	pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
-	pubsub_endpoint_pt pubEP = NULL;
-	celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
-	if(status == CELIX_SUCCESS) {
-		pubsub_discovery_addNode(ps_discovery, pubEP);
-	}
-}
-
-static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) {
-	celix_status_t status = CELIX_SUCCESS;
-	if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) {
-		status = CELIX_ILLEGAL_ARGUMENT;
-	}
-	return status;
-}
-
-// gets everything from provided key
-celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsub_discovery, const char* etcdKey, const char* etcdValue, pubsub_endpoint_pt* pubEP) {
-
-	celix_status_t status = CELIX_SUCCESS;
-
-	char rootPath[MAX_ROOTNODE_LENGTH];
-	char *expr = NULL;
-	char scope[MAX_FIELD_LENGTH];
-	char topic[MAX_FIELD_LENGTH];
-	char fwUUID[MAX_FIELD_LENGTH];
-	char pubsubUUID[MAX_FIELD_LENGTH];
-
-	memset(rootPath,0,MAX_ROOTNODE_LENGTH);
-	memset(topic,0,MAX_FIELD_LENGTH);
-	memset(fwUUID,0,MAX_FIELD_LENGTH);
-	memset(pubsubUUID,0,MAX_FIELD_LENGTH);
-
-	etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
-
-	asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
-	if(expr) {
-		int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, pubsubUUID);
-		free(expr);
-		if (foundItems != 4) { // Could happen when a directory is removed, just don't process this.
-			status = CELIX_ILLEGAL_STATE;
-		}
-		else{
-
-			// etcdValue contains the json formatted string
-			json_error_t error;
-			json_t* jsonRoot = json_loads(etcdValue, JSON_DECODE_ANY, &error);
-
-			properties_t *discovered_props = properties_create();
-
-			if (json_is_object(jsonRoot)) {
-
-                void *iter = json_object_iter(jsonRoot);
-
-                const char *key;
-                json_t *value;
-
-                while (iter) {
-                    key = json_object_iter_key(iter);
-                    value = json_object_iter_value(iter);
-
-                    properties_set(discovered_props, key, json_string_value(value));
-                    iter = json_object_iter_next(jsonRoot, iter);
-                }
-            }
-
-
-            status = pubsubEndpoint_createFromDiscoveredProperties(discovered_props, pubEP);
-            if (status != CELIX_SUCCESS) {
-                properties_destroy(discovered_props);
-            }
-
-			if (jsonRoot != NULL) {
-				json_decref(jsonRoot);
-			}
-		}
-	}
-	return status;
-}
-
-/*
- * performs (blocking) etcd_watch calls to check for
- * changing discovery endpoint information within etcd.
- */
-static void* etcdWatcher_run(void* data) {
-	etcd_watcher_pt watcher = (etcd_watcher_pt) data;
-	time_t timeBeforeWatch = time(NULL);
-	char rootPath[MAX_ROOTNODE_LENGTH];
-	long long highestModified = 0;
-
-	pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
-	bundle_context_pt context = ps_discovery->context;
-
-	memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
-
-	//TODO: add topic to etcd key
-	etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
-	etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified);
-
-	while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) {
-
-		char *rkey = NULL;
-		char *value = NULL;
-		char *preValue = NULL;
-		char *action = NULL;
-		long long modIndex;
-
-		celixThreadMutex_unlock(&watcher->watcherLock);
-
-		if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
-			pubsub_endpoint_pt pubEP = NULL;
-			if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) {
-				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
-					pubsub_discovery_addNode(ps_discovery, pubEP);
-				}
-			} else if (strcmp(action, "delete") == 0) {
-				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
-					pubsub_discovery_removeNode(ps_discovery, pubEP);
-				}
-			} else if (strcmp(action, "expire") == 0) {
-				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
-					pubsub_discovery_removeNode(ps_discovery, pubEP);
-				}
-			} else if (strcmp(action, "update") == 0) {
-				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
-					pubsub_discovery_addNode(ps_discovery, pubEP);
-				}
-			} else {
-				fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action);
-			}
-			highestModified = modIndex;
-		} else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
-			sleep(DEFAULT_ETCD_TTL / 4);
-		}
-
-		FREE_MEM(action);
-		FREE_MEM(value);
-		FREE_MEM(preValue);
-		FREE_MEM(rkey);
-
-		/* prevent busy waiting, in case etcd_watch returns false */
-
-
-		if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
-			timeBeforeWatch = time(NULL);
-		}
-
-	}
-
-	if (watcher->running == false) {
-		celixThreadMutex_unlock(&watcher->watcherLock);
-	}
-
-	return NULL;
-}
-
-celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) {
-	celix_status_t status = CELIX_SUCCESS;
-
-
-	if (pubsub_discovery == NULL) {
-		return CELIX_BUNDLE_EXCEPTION;
-	}
-
-	(*watcher) = calloc(1, sizeof(struct etcd_watcher));
-
-	if(*watcher == NULL){
-		return CELIX_ENOMEM;
-	}
-
-	(*watcher)->pubsub_discovery = pubsub_discovery;
-	(*watcher)->scope = strdup(scope);
-	(*watcher)->topic = strdup(topic);
-
-
-	celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
-
-	celixThreadMutex_lock(&(*watcher)->watcherLock);
-
-	status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher);
-	if (status == CELIX_SUCCESS) {
-		(*watcher)->running = true;
-	}
-
-	celixThreadMutex_unlock(&(*watcher)->watcherLock);
-
-
-	return status;
-}
-
-celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
-
-	celix_status_t status = CELIX_SUCCESS;
-
-	char rootPath[MAX_ROOTNODE_LENGTH];
-	etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
-	celixThreadMutex_destroy(&(watcher->watcherLock));
-
-	free(watcher->scope);
-	free(watcher->topic);
-	free(watcher);
-
-	return status;
-}
-
-celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&(watcher->watcherLock));
-	watcher->running = false;
-	celixThreadMutex_unlock(&(watcher->watcherLock));
-
-	celixThread_join(watcher->watcherThread, NULL);
-
-	return status;
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/etcd_watcher.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_watcher.h b/bundles/pubsub/pubsub_discovery/src/etcd_watcher.h
deleted file mode 100644
index c425e60..0000000
--- a/bundles/pubsub/pubsub_discovery/src/etcd_watcher.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef ETCD_WATCHER_H_
-#define ETCD_WATCHER_H_
-
-#include "bundle_context.h"
-#include "celix_errno.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_endpoint.h"
-
-typedef struct etcd_watcher *etcd_watcher_pt;
-
-celix_status_t etcdWatcher_create(pubsub_discovery_pt discovery,  bundle_context_pt context, const char *scope, const char* topic, etcd_watcher_pt *watcher);
-celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
-celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher);
-
-celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt discovery, const char* key, const char* value, pubsub_endpoint_pt* pubEP);
-
-
-#endif /* ETCD_WATCHER_H_ */