You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:20 UTC

[03/34] celix git commit: CELIX-454: Refactors pubsub discovery and topology manager trying to prevent race conditions

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_writer.c b/bundles/pubsub/pubsub_discovery/src/etcd_writer.c
deleted file mode 100644
index 37220cc..0000000
--- a/bundles/pubsub/pubsub_discovery/src/etcd_writer.c
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdbool.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-#include <jansson.h>
-
-#include "celix_log.h"
-#include "constants.h"
-
-#include "etcd.h"
-#include "etcd_writer.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_discovery_impl.h"
-
-#define MAX_ROOTNODE_LENGTH		128
-
-#define CFG_ETCD_ROOT_PATH		"PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
-#define DEFAULT_ETCD_ROOTPATH	"pubsub/discovery"
-
-#define CFG_ETCD_SERVER_IP		"PUBSUB_DISCOVERY_ETCD_SERVER_IP"
-#define DEFAULT_ETCD_SERVER_IP	"127.0.0.1"
-
-#define CFG_ETCD_SERVER_PORT	"PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
-#define DEFAULT_ETCD_SERVER_PORT 2379
-
-// be careful - this should be higher than the curl timeout
-#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
-#define DEFAULT_ETCD_TTL 30
-
-struct etcd_writer {
-	pubsub_discovery_pt pubsub_discovery;
-	celix_thread_mutex_t localPubsLock;
-	array_list_pt localPubs;
-	volatile bool running;
-	celix_thread_t writerThread;
-};
-
-
-static const char* etcdWriter_getRootPath(bundle_context_pt context);
-static void* etcdWriter_run(void* data);
-
-
-etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
-	etcd_writer_pt writer = calloc(1, sizeof(*writer));
-	if(writer) {
-		celixThreadMutex_create(&writer->localPubsLock, NULL);
-		arrayList_create(&writer->localPubs);
-		writer->pubsub_discovery = disc;
-		writer->running = true;
-		celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer);
-	}
-	return writer;
-}
-
-void etcdWriter_destroy(etcd_writer_pt writer) {
-	char dir[MAX_ROOTNODE_LENGTH];
-	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-	writer->running = false;
-	celixThread_join(writer->writerThread, NULL);
-
-	celixThreadMutex_lock(&writer->localPubsLock);
-	for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
-		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
-		memset(dir,0,MAX_ROOTNODE_LENGTH);
-		snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",
-				 rootPath,
-				 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-				 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
-				 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID));
-
-		etcd_del(dir);
-		pubsubEndpoint_destroy(pubEP);
-	}
-	arrayList_destroy(writer->localPubs);
-
-	celixThreadMutex_unlock(&writer->localPubsLock);
-	celixThreadMutex_destroy(&(writer->localPubsLock));
-
-	free(writer);
-}
-
-celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){
-	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
-
-	if(storeEP){
-		const char *fwUUID = NULL;
-		bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-		if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) {
-			celixThreadMutex_lock(&writer->localPubsLock);
-			pubsub_endpoint_pt p = NULL;
-			pubsubEndpoint_clone(pubEP, &p);
-			arrayList_add(writer->localPubs,p);
-			celixThreadMutex_unlock(&writer->localPubsLock);
-		}
-	}
-
-	char *key;
-
-	const char* ttlStr = NULL;
-	int ttl = 0;
-
-	// determine ttl
-	if ((bundleContext_getProperty(writer->pubsub_discovery->context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) {
-		ttl = DEFAULT_ETCD_TTL;
-	} else {
-		char* endptr = NULL;
-		errno = 0;
-		ttl = strtol(ttlStr, &endptr, 10);
-		if (*endptr || errno != 0) {
-			ttl = DEFAULT_ETCD_TTL;
-		}
-	}
-
-	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-	asprintf(&key,"%s/%s/%s/%s/%s",
-			 rootPath,
-			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
-			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
-
-
-	json_t *jsEndpoint = json_object();
-	const char* propKey = NULL;
-	PROPERTIES_FOR_EACH(pubEP->endpoint_props, propKey) {
-		const char* val = properties_get(pubEP->endpoint_props, propKey);
-		json_t* jsVal = json_string(val);
-		json_object_set(jsEndpoint, propKey, jsVal);
-	}
-	char* jsonEndpointStr = json_dumps(jsEndpoint, JSON_COMPACT);
-
-	if (!etcd_set(key,jsonEndpointStr,ttl,false)) {
-		status = CELIX_ILLEGAL_ARGUMENT;
-	}
-	FREE_MEM(jsonEndpointStr);
-	json_decref(jsEndpoint);
-
-	return status;
-}
-
-celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) {
-	celix_status_t status = CELIX_SUCCESS;
-	char *key = NULL;
-
-	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-	asprintf(&key, "%s/%s/%s/%s/%s",
-			 rootPath,
-			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
-			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
-
-	celixThreadMutex_lock(&writer->localPubsLock);
-	for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
-		pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
-		if (pubsubEndpoint_equals(ep, pubEP)) {
-			arrayList_remove(writer->localPubs, i);
-			pubsubEndpoint_destroy(ep);
-			break;
-		}
-	}
-	celixThreadMutex_unlock(&writer->localPubsLock);
-
-	if (etcd_del(key)) {
-		printf("Failed to remove key %s from ETCD\n",key);
-		status = CELIX_ILLEGAL_ARGUMENT;
-	}
-	FREE_MEM(key);
-	return status;
-}
-
-static void* etcdWriter_run(void* data) {
-	etcd_writer_pt writer = (etcd_writer_pt)data;
-	while(writer->running) {
-		celixThreadMutex_lock(&writer->localPubsLock);
-		for(int i=0; i < arrayList_size(writer->localPubs); i++) {
-			etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
-		}
-		celixThreadMutex_unlock(&writer->localPubsLock);
-		sleep(DEFAULT_ETCD_TTL / 2);
-	}
-
-	return NULL;
-}
-
-static const char* etcdWriter_getRootPath(bundle_context_pt context) {
-	const char* rootPath = NULL;
-	bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
-	if(rootPath == NULL) {
-		rootPath = DEFAULT_ETCD_ROOTPATH;
-	}
-	return rootPath;
-}
-

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/etcd_writer.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_writer.h b/bundles/pubsub/pubsub_discovery/src/etcd_writer.h
deleted file mode 100644
index 3ff98b9..0000000
--- a/bundles/pubsub/pubsub_discovery/src/etcd_writer.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef ETCD_WRITER_H_
-#define ETCD_WRITER_H_
-
-#include "bundle_context.h"
-#include "celix_errno.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_endpoint.h"
-
-typedef struct etcd_writer *etcd_writer_pt;
-
-
-etcd_writer_pt etcdWriter_create(pubsub_discovery_pt discovery);
-void etcdWriter_destroy(etcd_writer_pt writer);
-
-celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP,bool storeEP);
-celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP);
-
-
-#endif /* ETCD_WRITER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index ad1cc4a..8b2a2d4 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -21,150 +21,61 @@
 #include <stdlib.h>
 #include <string.h>
 
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "service_registration.h"
+#include "celix_bundle_context.h"
+#include "celix_bundle_activator.h"
 #include "constants.h"
 #include "celix_log.h"
 
 #include "pubsub_common.h"
-#include "publisher_endpoint_announce.h"
+#include "pubsub_listeners.h"
 #include "pubsub_discovery_impl.h"
 
-struct activator {
-	bundle_context_pt context;
-	pubsub_discovery_pt pubsub_discovery;
+typedef struct psd_activator {
+	pubsub_discovery_t *pubsub_discovery;
 
-	service_tracker_pt pstmPublishersTracker;
+	long publishAnnounceSvcTrackerId;
+	//service_tracker_pt pstmPublishersTracker;
 
-	publisher_endpoint_announce_pt publisherEPAnnounce;
-	service_registration_pt publisherEPAnnounceService;
-};
+	pubsub_announce_endpoint_listener_t listenerSvc;
+	long listenerSvcId;
+} psd_activator_t;
 
-static celix_status_t createTMPublisherAnnounceTracker(struct activator *activator, service_tracker_pt *tracker) {
-	celix_status_t status = CELIX_SUCCESS;
+static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ctx) {
+	celix_status_t status;
 
-	service_tracker_customizer_pt customizer = NULL;
+	pubsub_discovery_create(ctx, &act->pubsub_discovery);
+	// pubsub_discovery_start needs to be first to initialize
+	status = pubsub_discovery_start(act->pubsub_discovery);
 
-	status = serviceTrackerCustomizer_create(activator->pubsub_discovery,
-			NULL,
-			pubsub_discovery_tmPublisherAnnounceAdded,
-			pubsub_discovery_tmPublisherAnnounceModified,
-			pubsub_discovery_tmPublisherAnnounceRemoved,
-			&customizer);
+	celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+	opts.filter.serviceName = PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE;
+	opts.callbackHandle = act->pubsub_discovery;
+	opts.addWithOwner = pubsub_discovery_discoveredEndpointsListenerAdded;
+	opts.removeWithOwner = pubsub_discovery_discoveredEndpointsListenerRemoved;
+	act->publishAnnounceSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
 
-	if (status == CELIX_SUCCESS) {
-		status = serviceTracker_create(activator->context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, customizer, tracker);
-	}
-
-	return status;
-}
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
-	celix_status_t status = CELIX_SUCCESS;
+	act->listenerSvc.handle = act->pubsub_discovery;
+	act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint;
+	act->listenerSvc.removeEndpoint = pubsub_discovery_removeEndpoint;
 
-	struct activator* activator = calloc(1, sizeof(*activator));
-
-	if (activator) {
-		activator->context = context;
-		activator->pstmPublishersTracker = NULL;
-		activator->publisherEPAnnounce = NULL;
-		activator->publisherEPAnnounceService = NULL;
-
-		status = pubsub_discovery_create(context, &activator->pubsub_discovery);
-
-		if (status == CELIX_SUCCESS) {
-			status = createTMPublisherAnnounceTracker(activator, &(activator->pstmPublishersTracker));
-		}
-
-		if (status == CELIX_SUCCESS) {
-			*userData = activator;
-		} else {
-			free(activator);
-		}
+	if (status == CELIX_SUCCESS) {
+		act->listenerSvcId = celix_bundleContext_registerService(ctx, &act->listenerSvc, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, NULL);
 	} else {
-		status = CELIX_ENOMEM;
+		act->listenerSvcId = -1L;
 	}
 
 	return status;
-
 }
 
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	struct activator *activator = userData;
-
-	publisher_endpoint_announce_pt pubEPAnnouncer = calloc(1, sizeof(*pubEPAnnouncer));
-
-	if (pubEPAnnouncer) {
-
-		pubEPAnnouncer->handle = activator->pubsub_discovery;
-		pubEPAnnouncer->announcePublisher = pubsub_discovery_announcePublisher;
-		pubEPAnnouncer->removePublisher = pubsub_discovery_removePublisher;
-		pubEPAnnouncer->interestedInTopic = pubsub_discovery_interestedInTopic;
-		pubEPAnnouncer->uninterestedInTopic = pubsub_discovery_uninterestedInTopic;
-		activator->publisherEPAnnounce = pubEPAnnouncer;
+static celix_status_t psd_stop(psd_activator_t *act, celix_bundle_context_t *ctx) {
+	celix_bundleContext_stopTracker(ctx, act->publishAnnounceSvcTrackerId);
+	celix_bundleContext_unregisterService(ctx, act->listenerSvcId);
 
-		properties_pt props = properties_create();
-		properties_set(props, "PUBSUB_DISCOVERY", "true");
-
-		// pubsub_discovery_start needs to be first to initalize the propert etcd_watcher values
-		status = pubsub_discovery_start(activator->pubsub_discovery);
-
-		if (status == CELIX_SUCCESS) {
-			status = serviceTracker_open(activator->pstmPublishersTracker);
-		}
-
-		if (status == CELIX_SUCCESS) {
-			status = bundleContext_registerService(context, (char *) PUBSUB_DISCOVERY_SERVICE, pubEPAnnouncer, props, &activator->publisherEPAnnounceService);
-		}
-
-
-	}
-	else{
-		status = CELIX_ENOMEM;
-	}
-
-	if(status!=CELIX_SUCCESS && pubEPAnnouncer!=NULL){
-		free(pubEPAnnouncer);
-	}
-
-
-	return status;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
-	celix_status_t status = CELIX_SUCCESS;
-	struct activator *activator = userData;
-
-	status += pubsub_discovery_stop(activator->pubsub_discovery);
-
-	status += serviceTracker_close(activator->pstmPublishersTracker);
-
-	status += serviceRegistration_unregister(activator->publisherEPAnnounceService);
-
-	if (status == CELIX_SUCCESS) {
-		free(activator->publisherEPAnnounce);
-	}
+	celix_status_t status = pubsub_discovery_stop(act->pubsub_discovery);
+	pubsub_discovery_destroy(act->pubsub_discovery);
 
 	return status;
 }
 
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
-	celix_status_t status = CELIX_SUCCESS;
-	struct activator *activator = userData;
 
-	status += serviceTracker_destroy(activator->pstmPublishersTracker);
-	status += pubsub_discovery_destroy(activator->pubsub_discovery);
-
-	activator->publisherEPAnnounce = NULL;
-	activator->publisherEPAnnounceService = NULL;
-	activator->pstmPublishersTracker = NULL;
-	activator->pubsub_discovery = NULL;
-	activator->context = NULL;
-
-	free(activator);
-
-	return status;
-}
+CELIX_GEN_BUNDLE_ACTIVATOR(psd_activator_t, psd_start, psd_stop);

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/pubsub_discovery.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery.h
deleted file mode 100644
index f77905a..0000000
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery.h
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef PUBSUB_DISCOVERY_H_
-#define PUBSUB_DISCOVERY_H_
-
-typedef struct pubsub_discovery *pubsub_discovery_pt;
-
-
-#endif /* PUBSUB_DISCOVERY_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index f0b94c5..fcf0823 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -24,28 +24,30 @@
 #include <stdbool.h>
 #include <netdb.h>
 #include <netinet/in.h>
+#include <jansson.h>
+#include <assert.h>
+#include <sys/time.h>
 
+#include "celix_bundle_context.h"
+#include "celix_properties.h"
 #include "constants.h"
 #include "celix_threads.h"
-#include "bundle_context.h"
 #include "array_list.h"
 #include "utils.h"
 #include "celix_errno.h"
 #include "filter.h"
-#include "service_reference.h"
-#include "service_registration.h"
 
-#include "publisher_endpoint_announce.h"
-#include "etcd_common.h"
-#include "etcd_watcher.h"
-#include "etcd_writer.h"
+#include "pubsub_listeners.h"
 #include "pubsub_endpoint.h"
 #include "pubsub_discovery_impl.h"
 
-static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp);
+static celix_properties_t* pubsub_discovery_parseEndpoint(const char *value);
+static char* pubsub_discovery_createJsonEndpoint(const celix_properties_t *props);
+static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, celix_properties_t *endpoint);
+static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc, const char *uuid);
 
 /* Discovery activator functions */
-celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) {
+celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_t **ps_discovery) {
     celix_status_t status = CELIX_SUCCESS;
 
     *ps_discovery = calloc(1, sizeof(**ps_discovery));
@@ -55,476 +57,401 @@ celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discove
     }
 
     (*ps_discovery)->context = context;
-    (*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-    (*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
-    (*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL);
-    (*ps_discovery)->verbose = PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE;
-    celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL);
-    celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL);
-    celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL);
-
-    const char *verboseStr = NULL;
-    bundleContext_getProperty(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, &verboseStr);
-    if (verboseStr != NULL) {
-        (*ps_discovery)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
-    }
-
-    return status;
-}
+    (*ps_discovery)->discoveredEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    (*ps_discovery)->announcedEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    (*ps_discovery)->discoveredEndpointsListeners = hashMap_create(NULL, NULL, NULL, NULL);
+    celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsListenersMutex, NULL);
+    celixThreadMutex_create(&(*ps_discovery)->announcedEndpointsMutex, NULL);
+    celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsMutex, NULL);
+    celixThreadMutex_create(&(*ps_discovery)->waitMutex, NULL);
+    celixThreadCondition_init(&(*ps_discovery)->waitCond, NULL);
+    celixThreadMutex_create(&(*ps_discovery)->runningMutex, NULL);
+    (*ps_discovery)->running = true;
 
-celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
 
-    hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->discoveredPubs);
+    (*ps_discovery)->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE);
 
-    while (hashMapIterator_hasNext(iter)) {
-        array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
+    const char* etcdIp = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_IP_KEY, PUBSUB_DISCOVERY_SERVER_IP_DEFAULT);
+    long etcdPort = celix_bundleContext_getPropertyAsLong(context, PUBSUB_DISCOVERY_SERVER_PORT_KEY, PUBSUB_DISCOVERY_SERVER_PORT_DEFAULT);
+    long ttl = celix_bundleContext_getPropertyAsLong(context, PUBSUB_DISCOVERY_ETCD_TTL_KEY, PUBSUB_DISCOVERY_ETCD_TTL_DEFAULT);
 
-        for(int i=0; i < arrayList_size(pubEP_list); i++) {
-            pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i)));
-        }
-        arrayList_destroy(pubEP_list);
-    }
+    etcd_init(etcdIp, (int)etcdPort, ETCDLIB_NO_CURL_INITIALIZATION);
+    (*ps_discovery)->ttlForEntries = (int)ttl;
+    (*ps_discovery)->sleepInsecBetweenTTLRefresh = (int)(((float)ttl)/2.0);
+    (*ps_discovery)->pubsubPath = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_PATH_KEY, PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT);
 
-    hashMapIterator_destroy(iter);
-
-    hashMap_destroy(ps_discovery->discoveredPubs, true, false);
-    ps_discovery->discoveredPubs = NULL;
-
-    celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
+    return status;
+}
 
-    celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex);
+celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) {
+    celix_status_t status = CELIX_SUCCESS;
 
+    celixThreadMutex_lock(&ps_discovery->discoveredEndpointsMutex);
+    hashMap_destroy(ps_discovery->discoveredEndpoints, false, false);
+    celixThreadMutex_unlock(&ps_discovery->discoveredEndpointsMutex);
+    celixThreadMutex_destroy(&ps_discovery->discoveredEndpointsMutex);
 
-    celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex);
+    celixThreadMutex_lock(&ps_discovery->discoveredEndpointsListenersMutex);
+    hashMap_destroy(ps_discovery->discoveredEndpointsListeners, false, false);
+    celixThreadMutex_unlock(&ps_discovery->discoveredEndpointsListenersMutex);
+    celixThreadMutex_destroy(&ps_discovery->discoveredEndpointsListenersMutex);
 
-    hashMap_destroy(ps_discovery->listenerReferences, false, false);
-    ps_discovery->listenerReferences = NULL;
+    celixThreadMutex_lock(&ps_discovery->announcedEndpointsMutex);
+    hashMap_destroy(ps_discovery->announcedEndpoints, false, false);
+    celixThreadMutex_unlock(&ps_discovery->announcedEndpointsMutex);
+    celixThreadMutex_destroy(&ps_discovery->announcedEndpointsMutex);
 
-    celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex);
+    celixThreadMutex_destroy(&ps_discovery->waitMutex);
+    celixThreadCondition_destroy(&ps_discovery->waitCond);
 
-    celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex);
+    celixThreadMutex_destroy(&ps_discovery->runningMutex);
 
     free(ps_discovery);
 
     return status;
 }
 
-celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) {
-    celix_status_t status = CELIX_SUCCESS;
-    status = etcdCommon_init(ps_discovery->context);
-    ps_discovery->writer = etcdWriter_create(ps_discovery);
-
-    return status;
-}
-
-celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    const char* fwUUID = NULL;
+void* psd_watch(void *data) {
+    pubsub_discovery_t *disc = data;
+
+    celixThreadMutex_lock(&disc->runningMutex);
+    bool running = disc->running;
+    celixThreadMutex_unlock(&disc->runningMutex);
+
+    long long prevIndex = 0L;
+
+    while (running) {
+        char *action = NULL;
+        char *value = NULL;
+        char *readKey = NULL;
+        long long mIndex;
+        //TODO add interruptable etcd_wait -> which returns a handle to interrupt and a can be used for a wait call
+        int rc = etcd_watch(disc->pubsubPath, prevIndex, &action, NULL, &value, &readKey, &mIndex);
+        if (rc == ETCDLIB_RC_TIMEOUT) {
+            //nop
+        } else if (rc == ETCDLIB_RC_ERROR) {
+            printf("WARNING PSD: Error communicating with etcd.\n");
+        } else {
+            if (strncmp(ETCDLIB_ACTION_CREATE, action, strlen(ETCDLIB_ACTION_CREATE)) == 0 ||
+                strncmp(ETCDLIB_ACTION_SET, action, strlen(ETCDLIB_ACTION_SET)) == 0 ||
+                strncmp(ETCDLIB_ACTION_UPDATE, action, strlen(ETCDLIB_ACTION_UPDATE)) == 0) {
+                celix_properties_t *props = pubsub_discovery_parseEndpoint(value);
+                if (props != NULL) {
+                    pubsub_discovery_addDiscoveredEndpoint(disc, props);
+                }
+            } else if (strncmp(ETCDLIB_ACTION_DELETE, action, strlen(ETCDLIB_ACTION_DELETE)) == 0 ||
+                        strncmp(ETCDLIB_ACTION_EXPIRE, action, strlen(ETCDLIB_ACTION_EXPIRE)) == 0) {
+                celix_properties_t *props = pubsub_discovery_parseEndpoint(value);
+                if (props != NULL) {
+                    const char *uuid = celix_properties_get(props, PUBSUB_ENDPOINT_UUID, NULL);
+                    pubsub_discovery_removeDiscoveredEndpoint(disc, uuid);
+                }
+            } else {
+                //ETCDLIB_ACTION_GET -> nop
+            }
 
-    bundleContext_getProperty(ps_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-    if (fwUUID == NULL) {
-        fprintf(stderr, "ERROR PSD: Cannot retrieve fwUUID.\n");
-        return CELIX_INVALID_BUNDLE_CONTEXT;
-    }
+            free(action);
+            free(value);
+            free(readKey);
+            prevIndex = mIndex;
+        }
 
-    celixThreadMutex_lock(&ps_discovery->watchersMutex);
 
-    hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->watchers);
-    while (hashMapIterator_hasNext(iter)) {
-        struct watcher_info * wi = hashMapIterator_nextValue(iter);
-        etcdWatcher_stop(wi->watcher);
+        celixThreadMutex_lock(&disc->runningMutex);
+        running = disc->running;
+        celixThreadMutex_unlock(&disc->runningMutex);
     }
-    hashMapIterator_destroy(iter);
-
-    celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
 
-    /* Unexport all publishers for the local framework, and also delete from ETCD publisher belonging to the local framework */
-
-    iter = hashMapIterator_create(ps_discovery->discoveredPubs);
-    while (hashMapIterator_hasNext(iter)) {
-        array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
+    return NULL;
+}
 
-        int i;
-        for (i = 0; i < arrayList_size(pubEP_list); i++) {
-            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(pubEP_list, i);
-            if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) {
-                etcdWriter_deletePublisherEndpoint(ps_discovery->writer, pubEP);
+void* psd_refresh(void *data) {
+    pubsub_discovery_t *disc = data;
+
+    celixThreadMutex_lock(&disc->runningMutex);
+    bool running = disc->running;
+    celixThreadMutex_unlock(&disc->runningMutex);
+
+    while (running) {
+        struct timeval start;
+        gettimeofday(&start, NULL);
+
+        celixThreadMutex_lock(&disc->announcedEndpointsMutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(disc->announcedEndpoints);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (entry->isSet) {
+                //only refresh ttl -> no index update -> no watch trigger
+                int rc = etcd_refresh(entry->key, disc->ttlForEntries);
+                if (rc != ETCDLIB_RC_OK) {
+                    fprintf(stderr, "[PSD] Warning: error refreshing etcd key %s\n", entry->key);
+                    entry->isSet = false;
+                }
             } else {
-                pubsub_discovery_informPublishersListeners(ps_discovery, pubEP, false);
-                arrayList_remove(pubEP_list, i);
-                pubsubEndpoint_destroy(pubEP);
-                i--;
+                char *str = pubsub_discovery_createJsonEndpoint(entry->properties);
+                int rc = etcd_set(entry->key, str, disc->ttlForEntries, false);
+                if (rc == ETCDLIB_RC_OK) {
+                    entry->isSet = true;
+                } else {
+                    fprintf(stderr, "[PSD] Warning: error setting endpoint in etcd for key %s\n", entry->key);
+                }
             }
         }
-    }
-
-    hashMapIterator_destroy(iter);
-
-    celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
-    etcdWriter_destroy(ps_discovery->writer);
+        celixThreadMutex_unlock(&disc->announcedEndpointsMutex);
+
+        struct timeval end;
+        gettimeofday(&end, NULL);
+
+        double s = start.tv_sec + (start.tv_usec / 1000.0 / 1000.0 );
+        double e = end.tv_sec + (end.tv_usec / 1000.0 / 1000.0 );
+        double elapsedInsec = e - s;
+        double sleepNeededInSec = disc->sleepInsecBetweenTTLRefresh - elapsedInsec;
+        if (sleepNeededInSec > 0) {
+            celixThreadMutex_lock(&disc->waitMutex);
+            double waitTill = sleepNeededInSec + end.tv_sec + (end.tv_usec / 1000.0 / 1000.0);
+            long sec = (long)waitTill;
+            long nsec = (long)((waitTill - sec) * 1000 * 1000 * 1000);
+            celixThreadCondition_timedwait(&disc->waitCond, &disc->waitMutex, sec, nsec);
+            celixThreadMutex_unlock(&disc->waitMutex);
+        }
 
-    iter = hashMapIterator_create(ps_discovery->watchers);
-    while (hashMapIterator_hasNext(iter)) {
-        struct watcher_info * wi = hashMapIterator_nextValue(iter);
-        etcdWatcher_destroy(wi->watcher);
+        celixThreadMutex_lock(&disc->runningMutex);
+        running = disc->running;
+        celixThreadMutex_unlock(&disc->runningMutex);
     }
-    hashMapIterator_destroy(iter);
-    hashMap_destroy(ps_discovery->watchers, true, true);
-    celixThreadMutex_unlock(&ps_discovery->watchersMutex);
-    return status;
+    return NULL;
 }
 
-/* Functions called by the etcd_watcher */
-
-celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
+celix_status_t pubsub_discovery_start(pubsub_discovery_t *ps_discovery) {
     celix_status_t status = CELIX_SUCCESS;
 
-    bool valid = pubsub_discovery_isEndpointValid(pubEP);
-    if (!valid) {
-        status = CELIX_ILLEGAL_STATE;
-        return status;
-    }
-
-    bool inform = false;
-    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-
-    char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
-    array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
-    if(pubEP_list==NULL){
-        arrayList_create(&pubEP_list);
-        arrayList_add(pubEP_list,pubEP);
-        hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list);
-        inform=true;
-    }
-    else{
-        int i;
-        bool found = false;
-        for(i=0;i<arrayList_size(pubEP_list) && !found;i++){
-            found = pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i));
-        }
-        if(found){
-            pubsubEndpoint_destroy(pubEP);
-        }
-        else{
-            arrayList_add(pubEP_list,pubEP);
-            inform=true;
-        }
-    }
-    free(pubs_key);
-
-    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
-
-    if(inform){
-        status = pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true);
-    }
-
+    celixThread_create(&ps_discovery->watchThread, NULL, psd_watch, ps_discovery);
+    celixThread_create(&ps_discovery->refreshTTLThread, NULL, psd_refresh, ps_discovery);
     return status;
 }
 
-celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
+celix_status_t pubsub_discovery_stop(pubsub_discovery_t *disc) {
     celix_status_t status = CELIX_SUCCESS;
-    pubsub_endpoint_pt p = NULL;
-    bool found = false;
-
-    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-    char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
-    array_list_pt pubEP_list = (array_list_pt) hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
-    free(pubs_key);
-    if (pubEP_list == NULL) {
-        printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
-        status = CELIX_ILLEGAL_STATE;
-    } else {
-        int i;
-
-        for (i = 0; !found && i < arrayList_size(pubEP_list); i++) {
-            p = arrayList_get(pubEP_list, i);
-            found = pubsubEndpoint_equals(pubEP, p);
-            if (found) {
-                arrayList_remove(pubEP_list, i);
-                pubsubEndpoint_destroy(p);
-            }
-        }
-    }
-
-    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
-    if (found) {
-        status = pubsub_discovery_informPublishersListeners(pubsub_discovery, pubEP, false);
-    }
-    pubsubEndpoint_destroy(pubEP);
 
-    return status;
-}
+    celixThreadMutex_lock(&disc->runningMutex);
+    disc->running = false;
+    celixThreadMutex_unlock(&disc->runningMutex);
 
-/* Callback to the pubsub_topology_manager */
-celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) {
-    celix_status_t status = CELIX_SUCCESS;
+    celixThreadMutex_lock(&disc->waitMutex);
+    celixThreadCondition_broadcast(&disc->waitCond);
+    celixThreadMutex_unlock(&disc->waitMutex);
 
-    // Inform listeners of new publisher endpoint
-    celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+    celixThread_join(disc->watchThread, NULL);
+    celixThread_join(disc->refreshTTLThread, NULL);
 
-    if (pubsub_discovery->listenerReferences != NULL) {
-        hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->listenerReferences);
-        while (hashMapIterator_hasNext(iter)) {
-            service_reference_pt reference = hashMapIterator_nextKey(iter);
+    //TODO NOTE double lock , check if this is always done in the same order
+    celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpoints);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_properties_t *props = hashMapIterator_nextValue(&iter);
 
-            publisher_endpoint_announce_pt listener = NULL;
+        celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(disc->discoveredEndpointsListeners);
+        while (hashMapIterator_hasNext(&iter2)) {
+            pubsub_discovered_endpoint_listener_t *listener = hashMapIterator_nextValue(&iter2);
+            listener->removeDiscoveredEndpoint(listener->handle, props);
+        }
+        celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex);
 
-            bundleContext_getService(pubsub_discovery->context, reference, (void**) &listener);
-            if (epAdded) {
-                listener->announcePublisher(listener->handle, pubEP);
-            } else {
-                listener->removePublisher(listener->handle, pubEP);
-            }
-            bundleContext_ungetService(pubsub_discovery->context, reference, NULL);
+        celix_properties_destroy(props);
+    }
+    hashMap_clear(disc->discoveredEndpoints, false, false);
+    celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
+
+    celixThreadMutex_lock(&disc->announcedEndpointsMutex);
+    iter = hashMapIterator_construct(disc->announcedEndpoints);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry->isSet) {
+            etcd_del(entry->key);
         }
-        hashMapIterator_destroy(iter);
+        celix_properties_destroy(entry->properties);
+        free(entry);
     }
-
-    celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+    hashMap_clear(disc->announcedEndpoints, false, false);
+    celixThreadMutex_unlock(&disc->announcedEndpointsMutex);
 
     return status;
 }
 
+void pubsub_discovery_discoveredEndpointsListenerAdded(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+    pubsub_discovery_t *disc = handle;
+    pubsub_discovered_endpoint_listener_t *listener = svc;
 
-/* Service's functions implementation */
-celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) {
-    celix_status_t status = CELIX_SUCCESS;
-    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
-
-    bool valid = pubsub_discovery_isEndpointValid(pubEP);
-    if (!valid) {
-        status = CELIX_ILLEGAL_ARGUMENT;
-        return status;
-    }
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
+    hashMap_put(disc->discoveredEndpointsListeners, (void*)svcId, listener);
+    celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex);
 
-    if (pubsub_discovery->verbose) {
-        printf("pubsub_discovery_announcePublisher : %s / %s\n",
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
-                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpoints);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_properties_t *props = hashMapIterator_nextValue(&iter);
+        listener->addDiscoveredEndpoint(listener->handle, props);
     }
+    celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
+}
 
+void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+    pubsub_discovery_t *disc = handle;
+    pubsub_discovered_endpoint_listener_t *listener = svc;
 
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
+    hashMap_put(disc->discoveredEndpointsListeners, (void*)svcId, listener);
+    celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex);
+}
 
-    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-
-    char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
-    array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
+celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_discovery_t *disc = handle;
+    celix_status_t status = CELIX_SUCCESS;
 
-    if(pubEP_list==NULL){
-        arrayList_create(&pubEP_list);
-        hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list);
+    bool valid = pubsubEndpoint_isValid(endpoint, true, true);
+    const char *config = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+    const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+
+    if (valid) {
+        pubsub_announce_entry_t *entry = calloc(1, sizeof(*entry));
+        entry->isSet = false;
+        entry->properties = celix_properties_copy(endpoint);
+        asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope, topic, uuid);
+
+        const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, NULL);
+        celixThreadMutex_lock(&disc->announcedEndpointsMutex);
+        hashMap_put(disc->announcedEndpoints, (void*)hashKey, entry);
+        celixThreadMutex_unlock(&disc->announcedEndpointsMutex);
+
+        celixThreadMutex_lock(&disc->waitMutex);
+        celixThreadCondition_broadcast(&disc->waitCond);
+        celixThreadMutex_unlock(&disc->waitMutex);
+    } else {
+        printf("[PSD] Error cannot announce endpoint. missing some mandatory properties\n");
     }
-    free(pub_key);
-    pubsub_endpoint_pt p = NULL;
-    pubsubEndpoint_clone(pubEP, &p);
-
-    arrayList_add(pubEP_list,p);
-
-    status = etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true);
-
-    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 
     return status;
 }
-
-celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) {
+celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_discovery_t *disc = handle;
     celix_status_t status = CELIX_SUCCESS;
-    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 
-    bool valid = pubsub_discovery_isEndpointValid(pubEP);
-    if (!valid) {
-        status = CELIX_ILLEGAL_ARGUMENT;
-        return status;
-    }
-
-    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+    pubsub_announce_entry_t *entry = NULL;
 
-    char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
-    array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
-    free(pub_key);
-    if(pubEP_list==NULL){
-        printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
-        status = CELIX_ILLEGAL_STATE;
+    if (uuid != NULL) {
+        celixThreadMutex_lock(&disc->announcedEndpointsMutex);
+        entry = hashMap_remove(disc->announcedEndpoints, uuid);
+        celixThreadMutex_unlock(&disc->announcedEndpointsMutex);
+    } else {
+        printf("[PSD] Error cannot remove announced endpoint. missing endpoint uuid property\n");
     }
-    else{
-
-        int i;
-        bool found = false;
-        pubsub_endpoint_pt p = NULL;
-
-        for(i=0;!found && i<arrayList_size(pubEP_list);i++){
-            p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-            found = pubsubEndpoint_equals(pubEP,p);
-        }
-
-        if(!found){
-            printf("WARNING PSD: Trying to remove a not existing endpoint. Something is not consistent.\n");
-            status = CELIX_ILLEGAL_STATE;
-        }
-        else{
-
-            arrayList_removeElement(pubEP_list,p);
 
-            status = etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p);
-
-            pubsubEndpoint_destroy(p);
+    if (entry != NULL) {
+        if (entry->isSet) {
+            etcd_del(entry->key);
         }
+        free(entry->key);
+        celix_properties_destroy(entry->properties);
+        free(entry);
     }
 
-    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
-
     return status;
 }
 
-celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) {
-    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 
-    char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
-    struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, scope_topic_key);
-    if(wi) {
-        wi->nr_references++;
-        free(scope_topic_key);
-    } else {
-        wi = calloc(1, sizeof(*wi));
-        etcdWatcher_create(pubsub_discovery, pubsub_discovery->context, scope, topic, &wi->watcher);
-        wi->nr_references = 1;
-        hashMap_put(pubsub_discovery->watchers, scope_topic_key, wi);
-    }
+static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, celix_properties_t *endpoint) {
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+    assert(uuid != NULL); //note endpoint should already be check to be valid pubsubEndpoint_isValid
 
-    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
+    celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
+    bool exists = hashMap_containsKey(disc->discoveredEndpoints, (void*)uuid);
+    hashMap_put(disc->discoveredEndpoints, (void*)uuid, endpoint);
+    celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
 
-    return CELIX_SUCCESS;
+    if (!exists) {
+        celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpointsListeners);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_discovered_endpoint_listener_t *listener = hashMapIterator_nextValue(&iter);
+            listener->addDiscoveredEndpoint(listener->handle, endpoint);
+        }
+        celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex);
+    } else {
+        fprintf(stderr, "[PSD] Warning unexpected update from an already existing endpoint (uuid is %s)\n", uuid);
+    }
 }
 
-celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic) {
-    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
-
-    char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
-
-    hash_map_entry_pt entry =  hashMap_getEntry(pubsub_discovery->watchers, scope_topic_key);
-    if(entry) {
-        struct watcher_info * wi = hashMapEntry_getValue(entry);
-        wi->nr_references--;
-        if(wi->nr_references == 0) {
-            char *key = hashMapEntry_getKey(entry);
-            hashMap_remove(pubsub_discovery->watchers, scope_topic_key);
-            free(key);
-            free(scope_topic_key);
-            etcdWatcher_stop(wi->watcher);
-            etcdWatcher_destroy(wi->watcher);
-            free(wi);
+static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc, const char *uuid) {
+    celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
+    bool exists = hashMap_containsKey(disc->discoveredEndpoints, (void*)uuid);
+    celix_properties_t *endpoint = hashMap_remove(disc->discoveredEndpoints, (void*)uuid);
+    celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
+
+    if (exists && endpoint != NULL) {
+        celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpointsListeners);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_discovered_endpoint_listener_t *listener = hashMapIterator_nextValue(&iter);
+            listener->removeDiscoveredEndpoint(listener->handle, endpoint);
         }
+        celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex);
     } else {
-        fprintf(stderr, "[DISC] Inconsistency error: Removing unknown topic %s\n", topic);
+        fprintf(stderr, "[PSD] Warning unexpected remove from non existing endpoint (uuid is %s)\n", uuid);
     }
-    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
-    return CELIX_SUCCESS;
 }
 
-/* pubsub_topology_manager tracker callbacks */
+celix_properties_t* pubsub_discovery_parseEndpoint(const char* etcdValue) {
+    properties_t *props = properties_create();
 
-celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) {
-    celix_status_t status = CELIX_SUCCESS;
+    // etcdValue contains the json formatted string
+    json_error_t error;
+    json_t* jsonRoot = json_loads(etcdValue, JSON_DECODE_ANY, &error);
+
+    if (json_is_object(jsonRoot)) {
 
-    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
-    publisher_endpoint_announce_pt listener = (publisher_endpoint_announce_pt)service;
+        void *iter = json_object_iter(jsonRoot);
 
-    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-    celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+        const char *key;
+        json_t *value;
 
-    /* Notify the PSTM about discovered publisher endpoints */
-    hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->discoveredPubs);
-    while(hashMapIterator_hasNext(iter)){
-        array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
-        int i;
-        for(i=0;i<arrayList_size(pubEP_list);i++){
-            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-            status += listener->announcePublisher(listener->handle, pubEP);
+        while (iter) {
+            key = json_object_iter_key(iter);
+            value = json_object_iter_value(iter);
+            properties_set(props, key, json_string_value(value));
+            iter = json_object_iter_next(jsonRoot, iter);
         }
     }
 
-    hashMapIterator_destroy(iter);
-
-    hashMap_put(pubsub_discovery->listenerReferences, reference, NULL);
-
-    celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
-    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
-
-    if (pubsub_discovery->verbose) {
-        printf("PSD: pubsub_tm_announce_publisher added.\n");
+    if (jsonRoot != NULL) {
+        json_decref(jsonRoot);
     }
 
-    return status;
-}
-
-celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, service);
-    if (status == CELIX_SUCCESS) {
-        status = pubsub_discovery_tmPublisherAnnounceAdded(handle, reference, service);
+    bool valid = pubsubEndpoint_isValid(props, true, true);
+    if (!valid) {
+        fprintf(stderr, "[PSD] Warning retrieved endpoint is not valid\n");
+        celix_properties_destroy(props);
+        props = NULL;
     }
 
-    return status;
+    return props;
 }
 
-celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service) {
-    celix_status_t status = CELIX_SUCCESS;
-    pubsub_discovery_pt pubsub_discovery = handle;
+static char* pubsub_discovery_createJsonEndpoint(const celix_properties_t *props) {
+    //note props is already check for validity (pubsubEndpoint_isValid)
 
-    celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
-
-    if (pubsub_discovery->listenerReferences != NULL) {
-        if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) {
-            if (pubsub_discovery->verbose) {
-                printf("PSD: pubsub_tm_announce_publisher removed.\n");
-            }
-        }
-    }
-    celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
-
-    return status;
-}
-
-static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp) {
-    //required properties
-    bool valid = true;
-    static const char* keys[] = {
-        PUBSUB_ENDPOINT_UUID,
-        PUBSUB_ENDPOINT_FRAMEWORK_UUID,
-        PUBSUB_ENDPOINT_TYPE,
-        PUBSUB_ENDPOINT_ADMIN_TYPE,
-        PUBSUB_ENDPOINT_SERIALIZER,
-        PUBSUB_ENDPOINT_TOPIC_NAME,
-        PUBSUB_ENDPOINT_TOPIC_SCOPE,
-        NULL };
-    int i;
-    for (i = 0; keys[i] != NULL; ++i) {
-        const char *val = properties_get(psEp->endpoint_props, keys[i]);
-        if (val == NULL) { //missing required key
-            fprintf(stderr, "[ERROR] PSD: Invalid endpoint missing key: '%s'\n", keys[i]);
-            valid = false;
-        }
-    }
-    if (!valid) {
-        const char *key = NULL;
-        fprintf(stderr, "PubSubEndpoint entries:\n");
-        PROPERTIES_FOR_EACH(psEp->endpoint_props, key) {
-            fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key));
-        }
-        if (psEp->topic_props != NULL) {
-            fprintf(stderr, "PubSubEndpoint topic properties entries:\n");
-            PROPERTIES_FOR_EACH(psEp->topic_props, key) {
-                fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key));
-            }
-        }
+    json_t *jsEndpoint = json_object();
+    const char* propKey = NULL;
+    PROPERTIES_FOR_EACH((celix_properties_t*)props, propKey) {
+        const char* val = celix_properties_get(props, propKey, NULL);
+        json_object_set_new(jsEndpoint, propKey, json_string(val));
     }
-    return valid;
+    char* str = json_dumps(jsEndpoint, JSON_COMPACT);
+    json_decref(jsEndpoint);
+    return str;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index eaf8e85..11663cb 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -23,55 +23,75 @@
 #include "bundle_context.h"
 #include "service_reference.h"
 
-#include "etcd_watcher.h"
-#include "etcd_writer.h"
 #include "pubsub_endpoint.h"
+#include "etcd.h"
 
 #define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
 
 #define PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY "PUBSUB_ETCD_DISCOVERY_VERBOSE"
 #define PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE false
 
-struct watcher_info {
-    etcd_watcher_pt watcher;
-    int nr_references;
-};
 
-struct pubsub_discovery {
+#define PUBSUB_DISCOVERY_SERVER_IP_KEY 		    "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define PUBSUB_DISCOVERY_SERVER_PORT_KEY 		"PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define PUBSUB_DISCOVERY_SERVER_PATH_KEY 		"PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define PUBSUB_DISCOVERY_ETCD_TTL_KEY           "PUBSUB_DISCOVERY_ETCD_TTL"
+
+
+#define PUBSUB_DISCOVERY_SERVER_IP_DEFAULT 		"127.0.0.1"
+#define PUBSUB_DISCOVERY_SERVER_PORT_DEFAULT	2379
+#define PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT 	"pubsub/discovery"
+#define PUBSUB_DISCOVERY_ETCD_TTL_DEFAULT       30
+
+typedef struct pubsub_discovery {
 	bundle_context_pt context;
 
-	celix_thread_mutex_t discoveredPubsMutex;
-	hash_map_pt discoveredPubs; //<topic,List<pubsub_endpoint_pt>>
+	celix_thread_mutex_t discoveredEndpointsMutex;
+	hash_map_pt discoveredEndpoints; //<key = uuid,celix_properties_t /*endpoint*/>>
 
-	celix_thread_mutex_t listenerReferencesMutex;
-	hash_map_pt listenerReferences; //key=serviceReference, value=nop
+	celix_thread_mutex_t announcedEndpointsMutex;
+	hash_map_pt announcedEndpoints; //<key = char* (etcd key),pubsub_announce_entry_t /*endpoint*/>>
 
-	celix_thread_mutex_t watchersMutex;
-	hash_map_pt watchers; //key = topicname, value = struct watcher_info
+	celix_thread_mutex_t discoveredEndpointsListenersMutex;
+	hash_map_pt discoveredEndpointsListeners; //key=svcId, value=pubsub_discover_listener_t
 
-	etcd_writer_pt writer;
+	celix_thread_mutex_t waitMutex;
+	celix_thread_cond_t  waitCond;
 
+	celix_thread_mutex_t runningMutex;
+    bool running;
+    celix_thread_t watchThread;
+    celix_thread_t refreshTTLThread;
+
+
+    //configurable by config/env.
+	const char *pubsubPath;
 	bool verbose;
-};
+	int ttlForEntries;
+	int sleepInsecBetweenTTLRefresh;
+} pubsub_discovery_t;
+
+typedef struct pubsub_announce_entry {
+	char *key; //etcd key
+	bool isSet; //whether the value is already set (in case of unavailable etcd server this can linger)
+	celix_properties_t *properties; //the endpoint properties
+} pubsub_announce_entry_t;
 
 
-celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt* node_discovery);
-celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt node_discovery);
-celix_status_t pubsub_discovery_start(pubsub_discovery_pt node_discovery);
-celix_status_t pubsub_discovery_stop(pubsub_discovery_pt node_discovery);
+celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_t **out);
+celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *node_discovery);
+celix_status_t pubsub_discovery_start(pubsub_discovery_t *node_discovery);
+celix_status_t pubsub_discovery_stop(pubsub_discovery_t *node_discovery);
 
-celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_addNode(pubsub_discovery_t *node_discovery, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_removeNode(pubsub_discovery_t *node_discovery, pubsub_endpoint_pt pubEP);
 
-celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service);
-celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service);
+void pubsub_discovery_discoveredEndpointsListenerAdded(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
 
-celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic);
-celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic);
+celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint);
 
-celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt discovery, pubsub_endpoint_pt endpoint, bool endpointAdded);
+celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_t *discovery, pubsub_endpoint_pt endpoint, bool endpointAdded);
 
 #endif /* PUBSUB_DISCOVERY_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/publisher_endpoint_announce.h b/bundles/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
deleted file mode 100644
index 607e83a..0000000
--- a/bundles/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_
-#define PUBLISHER_ENDPOINT_ANNOUNCE_H_
-
-#include "pubsub_endpoint.h"
-
-
-//TODO refactor to pubsub_endpoint_announce
-//can be used to announce and remove publisher and subscriber endpoints
-
-struct publisher_endpoint_announce {
-	void *handle;
-	celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP);
-	celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt pubEP);
-	celix_status_t (*interestedInTopic)(void* handle, const char *scope, const char *topic);
-	celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, const char *topic);
-};
-
-typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt;
-
-
-#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_spi/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_common.h b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
index 5dfd8fd..cfed5d9 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_common.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
@@ -27,17 +27,15 @@
 #ifndef PUBSUB_COMMON_H_
 #define PUBSUB_COMMON_H_
 
-#define PUBSUB_SERIALIZER_SERVICE 		"pubsub_serializer"
-#define PUBSUB_ADMIN_SERVICE 			"pubsub_admin"
-#define PUBSUB_DISCOVERY_SERVICE		"pubsub_discovery"
-#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE    "pubsub_tm_announce_publisher"
+#define PUBSUB_SERIALIZER_SERVICE 					"pubsub_serializer"
+#define PUBSUB_ADMIN_SERVICE 						"pubsub_admin"
+#define PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE	"pubsub_announce_endpoint_listener"
+#define PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE	"pubsub_discovered_endpoint_listener"
 
-#define PUBSUB_ANY_SUB_TOPIC		        "any"
+#define PUBSUB_ANY_SUB_TOPIC		        		"any"
 
-#define	PUBSUB_BUNDLE_ID			"bundle.id"
-
-#define MAX_SCOPE_LEN                           1024
-#define MAX_TOPIC_LEN				1024
+#define MAX_SCOPE_LEN                           	1024
+#define MAX_TOPIC_LEN								1024
 
 struct pubsub_msg_header{
 	char topic[MAX_TOPIC_LEN];

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index c0492f5..0d8c513 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -16,20 +16,12 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * pubsub_endpoint.h
- *
- *  \date       Sep 21, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
 
 #ifndef PUBSUB_ENDPOINT_H_
 #define PUBSUB_ENDPOINT_H_
 
-#include "service_reference.h"
-#include "listener_hook_service.h"
-#include "properties.h"
+#include "celix_bundle_context.h"
+#include "celix_properties.h"
 
 #include "pubsub/publisher.h"
 #include "pubsub/subscriber.h"
@@ -37,18 +29,14 @@
 #include "pubsub_constants.h"
 
 //required for valid endpoint
+#define PUBSUB_ENDPOINT_TOPIC_NAME      "pubsub.topic.name"
+#define PUBSUB_ENDPOINT_TOPIC_SCOPE     "pubsub.topic.scope"
+
 #define PUBSUB_ENDPOINT_UUID            "pubsub.endpoint.uuid" //required
 #define PUBSUB_ENDPOINT_FRAMEWORK_UUID  "pubsub.framework.uuid" //required
 #define PUBSUB_ENDPOINT_TYPE            "pubsub.endpoint.type" //PUBSUB_PUBLISHER_ENDPOINT_TYPE or PUBSUB_SUBSCRIBER_ENDPOINT_TYPE
 #define PUBSUB_ENDPOINT_ADMIN_TYPE       PUBSUB_ADMIN_TYPE_KEY
 #define PUBSUB_ENDPOINT_SERIALIZER       PUBSUB_SERIALIZER_TYPE_KEY
-#define PUBSUB_ENDPOINT_TOPIC_NAME      "pubsub.topic.name"
-#define PUBSUB_ENDPOINT_TOPIC_SCOPE     "pubsub.topic.scope"
-
-//optional
-#define PUBSUB_ENDPOINT_SERVICE_ID      "pubsub.service.id"
-#define PUBSUB_ENDPOINT_BUNDLE_ID       "pubsub.bundle.id"
-#define PUBSUB_ENDPOINT_URL             "pubsub.url"
 
 
 #define PUBSUB_PUBLISHER_ENDPOINT_TYPE  "pubsub.publisher"
@@ -56,25 +44,36 @@
 
 
 struct pubsub_endpoint {
-    properties_pt endpoint_props;
-    properties_pt topic_props;
+    const char *topicName;
+    const char *topicScope;
+
+    const char *uuid;
+    const char *frameworkUUid;
+    const char *type;
+    const char *adminType;
+    const char *serializerType;
+
+    celix_properties_t *properties;
 };
 
 typedef struct pubsub_endpoint *pubsub_endpoint_pt;
+typedef struct pubsub_endpoint pubsub_endpoint_t;
 
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char* pubsubType, properties_pt topic_props, pubsub_endpoint_pt* psEp);
-celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t* ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* out);
-celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t* ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out);
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, const char* pubsubType, const char* adminType, const char *serType, celix_properties_t *topic_props, pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_createFromProperties(const celix_properties_t *props, pubsub_endpoint_t **out);
+celix_status_t pubsubEndpoint_createFromSvc(bundle_context_t* ctx, const celix_bundle_t *bnd, const celix_properties_t *svcProps, bool isPublisher, pubsub_endpoint_pt* out);
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx, const celix_service_tracker_info_t *info, bool isPublisher, pubsub_endpoint_pt* out);
 celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out);
+
 void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
-celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value);
+bool pubsubEndpoint_equalsWithProperties(pubsub_endpoint_pt psEp1, const celix_properties_t *props);
+
+void pubsubEndpoint_setField(pubsub_endpoint_t *ep, const char *key, const char *val);
+
+//check if the required properties are available for the endpoint
+bool pubsubEndpoint_isValid(const celix_properties_t *endpointProperties, bool requireAdminType, bool requireSerializerType);
 
-/**
- * Creates a pubsub_endpoint based on discovered properties.
- * Will take ownership over the discovredProperties
- */
-celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t *discoveredProperties, pubsub_endpoint_pt* out);
 
 char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
new file mode 100644
index 0000000..7e4bfec
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
@@ -0,0 +1,50 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_LISTENERS_H_
+#define PUBSUB_LISTENERS_H_
+
+#include "celix_properties.h"
+
+//Informs the topology manager that pub/sub endpoints are discovered in the network
+struct pubsub_discovered_endpoint_listener {
+	void *handle;
+
+	celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *properties);
+	celix_status_t (*removeDiscoveredEndpoint)(void *handle, const celix_properties_t *properties);
+};
+typedef struct pubsub_discovered_endpoint_listener pubsub_discovered_endpoint_listener_t;
+
+
+
+//Informs the discovery admins to publish info into the network
+struct pubsub_announce_endpoint_listener {
+	void *handle;
+
+	celix_status_t (*announceEndpoint)(void *handle, const celix_properties_t *properties);
+	celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *properties);
+
+	//getCurrentSubscriberEndPoints
+	//getCurrentPublisherEndPoints
+};
+
+typedef struct pubsub_announce_endpoint_listener pubsub_announce_endpoint_listener_t;
+
+
+#endif /* PUBSUB_LISTENERS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_spi/src/pubsub_admin_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_admin_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_admin_match.c
index 5d0fcc9..ff62af8 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_admin_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_admin_match.c
@@ -54,13 +54,13 @@ celix_status_t pubsub_admin_match(
 	const char *requested_admin_type 		= NULL;
 	const char *requested_qos_type			= NULL;
 
-	if (endpoint->endpoint_props != NULL) {
-		endpointFrameworkUuid = properties_get(endpoint->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
-		endpointAdminType = properties_get(endpoint->endpoint_props, PUBSUB_ENDPOINT_ADMIN_TYPE);
+	if (endpoint->properties != NULL) {
+		endpointFrameworkUuid = properties_get(endpoint->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+		endpointAdminType = properties_get(endpoint->properties, PUBSUB_ENDPOINT_ADMIN_TYPE);
 	}
-	if (endpoint->topic_props != NULL) {
-		requested_admin_type = properties_get(endpoint->topic_props, PUBSUB_ADMIN_TYPE_KEY);
-		requested_qos_type = properties_get(endpoint->topic_props, QOS_ATTRIBUTE_KEY);
+	if (endpoint->properties != NULL) {
+		requested_admin_type = properties_get(endpoint->properties, PUBSUB_ADMIN_TYPE_KEY);
+		requested_qos_type = properties_get(endpoint->properties, QOS_ATTRIBUTE_KEY);
 	}
 
 	if (endpointFrameworkUuid != NULL && frameworkUuid != NULL && strncmp(frameworkUuid, endpointFrameworkUuid, 128) == 0) {
@@ -87,7 +87,7 @@ celix_status_t pubsub_admin_match(
 		//NOTE serializer influence the score if a specific serializer is configured and not available.
 		//get best serializer. This is based on service raking or requested serializer. In the case of a request NULL is return if not request match is found.
 		service_reference_pt serSvcRef = NULL;
-		pubsub_admin_get_best_serializer(endpoint->topic_props, serializerList, &serSvcRef);
+		pubsub_admin_get_best_serializer(endpoint->properties, serializerList, &serSvcRef);
 		const char *serType = NULL; //for printing info
 		if (serSvcRef == NULL) {
 			score = 0;