You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/05/27 18:52:45 UTC

[37/60] [abbrv] [partial] celix git commit: CELIX-424: Cleans up the directory structure. Moves all libraries to the libs subdir and all bundles to the bundles subdir

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_watcher.c b/bundles/pubsub/pubsub_discovery/src/etcd_watcher.c
new file mode 100644
index 0000000..c698172
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/etcd_watcher.c
@@ -0,0 +1,322 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <jansson.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include "etcd.h"
+#include "etcd_watcher.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+
+
+#define MAX_ROOTNODE_LENGTH             128
+#define MAX_LOCALNODE_LENGTH            4096
+#define MAX_FIELD_LENGTH                128
+
+#define CFG_ETCD_ROOT_PATH              "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH           "pubsub/discovery"
+
+#define CFG_ETCD_SERVER_IP              "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP          "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT            "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT        2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL                    "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL                30
+
+
+struct etcd_watcher {
+	pubsub_discovery_pt pubsub_discovery;
+
+	celix_thread_mutex_t watcherLock;
+	celix_thread_t watcherThread;
+
+	char *scope;
+	char *topic;
+	volatile bool running;
+};
+
+struct etcd_writer {
+	pubsub_discovery_pt pubsub_discovery;
+	celix_thread_mutex_t localPubsLock;
+	array_list_pt localPubs;
+	volatile bool running;
+	celix_thread_t writerThread;
+};
+
+
+// note that the rootNode shouldn't have a leading slash
+static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, const char *scope, const char *topic, char* rootNode, int rootNodeLen) {
+	celix_status_t status = CELIX_SUCCESS;
+	const char* rootPath = NULL;
+
+	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+		snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic);
+	} else {
+		snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic);
+	}
+
+	return status;
+}
+
+static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) {
+	celix_status_t status = CELIX_SUCCESS;
+	const char* rootPath = NULL;
+
+	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+		strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
+	} else {
+		strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
+	}
+
+	return status;
+}
+
+
+static void add_node(const char *key, const char *value, void* arg) {
+	pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
+	pubsub_endpoint_pt pubEP = NULL;
+	celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
+	if(status == CELIX_SUCCESS) {
+		pubsub_discovery_addNode(ps_discovery, pubEP);
+	}
+}
+
+static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) {
+	celix_status_t status = CELIX_SUCCESS;
+	if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) {
+		status = CELIX_ILLEGAL_ARGUMENT;
+	}
+	return status;
+}
+
+// gets everything from provided key
+celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsub_discovery, const char* etcdKey, const char* etcdValue, pubsub_endpoint_pt* pubEP) {
+
+	celix_status_t status = CELIX_SUCCESS;
+
+	char rootPath[MAX_ROOTNODE_LENGTH];
+	char *expr = NULL;
+	char scope[MAX_FIELD_LENGTH];
+	char topic[MAX_FIELD_LENGTH];
+	char fwUUID[MAX_FIELD_LENGTH];
+	char pubsubUUID[MAX_FIELD_LENGTH];
+
+	memset(rootPath,0,MAX_ROOTNODE_LENGTH);
+	memset(topic,0,MAX_FIELD_LENGTH);
+	memset(fwUUID,0,MAX_FIELD_LENGTH);
+	memset(pubsubUUID,0,MAX_FIELD_LENGTH);
+
+	etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
+
+	asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
+	if(expr) {
+		int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, pubsubUUID);
+		free(expr);
+		if (foundItems != 4) { // Could happen when a directory is removed, just don't process this.
+			status = CELIX_ILLEGAL_STATE;
+		}
+		else{
+
+			// etcdValue contains the json formatted string
+			json_error_t error;
+			json_t* jsonRoot = json_loads(etcdValue, JSON_DECODE_ANY, &error);
+
+			properties_t *discovered_props = properties_create();
+
+			if (json_is_object(jsonRoot)) {
+
+                void *iter = json_object_iter(jsonRoot);
+
+                const char *key;
+                json_t *value;
+
+                while (iter) {
+                    key = json_object_iter_key(iter);
+                    value = json_object_iter_value(iter);
+
+                    properties_set(discovered_props, key, json_string_value(value));
+                    iter = json_object_iter_next(jsonRoot, iter);
+                }
+            }
+
+
+            status = pubsubEndpoint_createFromDiscoveredProperties(discovered_props, pubEP);
+            if (status != CELIX_SUCCESS) {
+                properties_destroy(discovered_props);
+            }
+
+			if (jsonRoot != NULL) {
+				json_decref(jsonRoot);
+			}
+		}
+	}
+	return status;
+}
+
+/*
+ * performs (blocking) etcd_watch calls to check for
+ * changing discovery endpoint information within etcd.
+ */
+static void* etcdWatcher_run(void* data) {
+	etcd_watcher_pt watcher = (etcd_watcher_pt) data;
+	time_t timeBeforeWatch = time(NULL);
+	char rootPath[MAX_ROOTNODE_LENGTH];
+	long long highestModified = 0;
+
+	pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
+	bundle_context_pt context = ps_discovery->context;
+
+	memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
+
+	//TODO: add topic to etcd key
+	etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
+	etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified);
+
+	while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) {
+
+		char *rkey = NULL;
+		char *value = NULL;
+		char *preValue = NULL;
+		char *action = NULL;
+		long long modIndex;
+
+		celixThreadMutex_unlock(&watcher->watcherLock);
+
+		if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
+			pubsub_endpoint_pt pubEP = NULL;
+			if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) {
+				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
+					pubsub_discovery_addNode(ps_discovery, pubEP);
+				}
+			} else if (strcmp(action, "delete") == 0) {
+				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
+					pubsub_discovery_removeNode(ps_discovery, pubEP);
+				}
+			} else if (strcmp(action, "expire") == 0) {
+				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
+					pubsub_discovery_removeNode(ps_discovery, pubEP);
+				}
+			} else if (strcmp(action, "update") == 0) {
+				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
+					pubsub_discovery_addNode(ps_discovery, pubEP);
+				}
+			} else {
+				fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action);
+			}
+			highestModified = modIndex;
+		} else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
+			sleep(DEFAULT_ETCD_TTL / 4);
+		}
+
+		FREE_MEM(action);
+		FREE_MEM(value);
+		FREE_MEM(preValue);
+		FREE_MEM(rkey);
+
+		/* prevent busy waiting, in case etcd_watch returns false */
+
+
+		if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
+			timeBeforeWatch = time(NULL);
+		}
+
+	}
+
+	if (watcher->running == false) {
+		celixThreadMutex_unlock(&watcher->watcherLock);
+	}
+
+	return NULL;
+}
+
+celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) {
+	celix_status_t status = CELIX_SUCCESS;
+
+
+	if (pubsub_discovery == NULL) {
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	(*watcher) = calloc(1, sizeof(struct etcd_watcher));
+
+	if(*watcher == NULL){
+		return CELIX_ENOMEM;
+	}
+
+	(*watcher)->pubsub_discovery = pubsub_discovery;
+	(*watcher)->scope = strdup(scope);
+	(*watcher)->topic = strdup(topic);
+
+
+	celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
+
+	celixThreadMutex_lock(&(*watcher)->watcherLock);
+
+	status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher);
+	if (status == CELIX_SUCCESS) {
+		(*watcher)->running = true;
+	}
+
+	celixThreadMutex_unlock(&(*watcher)->watcherLock);
+
+
+	return status;
+}
+
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
+
+	celix_status_t status = CELIX_SUCCESS;
+
+	char rootPath[MAX_ROOTNODE_LENGTH];
+	etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
+	celixThreadMutex_destroy(&(watcher->watcherLock));
+
+	free(watcher->scope);
+	free(watcher->topic);
+	free(watcher);
+
+	return status;
+}
+
+celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&(watcher->watcherLock));
+	watcher->running = false;
+	celixThreadMutex_unlock(&(watcher->watcherLock));
+
+	celixThread_join(watcher->watcherThread, NULL);
+
+	return status;
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/etcd_watcher.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_watcher.h b/bundles/pubsub/pubsub_discovery/src/etcd_watcher.h
new file mode 100644
index 0000000..c425e60
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/etcd_watcher.h
@@ -0,0 +1,38 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_WATCHER_H_
+#define ETCD_WATCHER_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_endpoint.h"
+
+typedef struct etcd_watcher *etcd_watcher_pt;
+
+celix_status_t etcdWatcher_create(pubsub_discovery_pt discovery,  bundle_context_pt context, const char *scope, const char* topic, etcd_watcher_pt *watcher);
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
+celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher);
+
+celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt discovery, const char* key, const char* value, pubsub_endpoint_pt* pubEP);
+
+
+#endif /* ETCD_WATCHER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_writer.c b/bundles/pubsub/pubsub_discovery/src/etcd_writer.c
new file mode 100644
index 0000000..37220cc
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/etcd_writer.c
@@ -0,0 +1,217 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <jansson.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include "etcd.h"
+#include "etcd_writer.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+#define MAX_ROOTNODE_LENGTH		128
+
+#define CFG_ETCD_ROOT_PATH		"PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH	"pubsub/discovery"
+
+#define CFG_ETCD_SERVER_IP		"PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP	"127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT	"PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+
+struct etcd_writer {
+	pubsub_discovery_pt pubsub_discovery;
+	celix_thread_mutex_t localPubsLock;
+	array_list_pt localPubs;
+	volatile bool running;
+	celix_thread_t writerThread;
+};
+
+
+static const char* etcdWriter_getRootPath(bundle_context_pt context);
+static void* etcdWriter_run(void* data);
+
+
+etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
+	etcd_writer_pt writer = calloc(1, sizeof(*writer));
+	if(writer) {
+		celixThreadMutex_create(&writer->localPubsLock, NULL);
+		arrayList_create(&writer->localPubs);
+		writer->pubsub_discovery = disc;
+		writer->running = true;
+		celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer);
+	}
+	return writer;
+}
+
+void etcdWriter_destroy(etcd_writer_pt writer) {
+	char dir[MAX_ROOTNODE_LENGTH];
+	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+	writer->running = false;
+	celixThread_join(writer->writerThread, NULL);
+
+	celixThreadMutex_lock(&writer->localPubsLock);
+	for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
+		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
+		memset(dir,0,MAX_ROOTNODE_LENGTH);
+		snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",
+				 rootPath,
+				 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+				 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+				 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID));
+
+		etcd_del(dir);
+		pubsubEndpoint_destroy(pubEP);
+	}
+	arrayList_destroy(writer->localPubs);
+
+	celixThreadMutex_unlock(&writer->localPubsLock);
+	celixThreadMutex_destroy(&(writer->localPubsLock));
+
+	free(writer);
+}
+
+celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){
+	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+	if(storeEP){
+		const char *fwUUID = NULL;
+		bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+		if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) {
+			celixThreadMutex_lock(&writer->localPubsLock);
+			pubsub_endpoint_pt p = NULL;
+			pubsubEndpoint_clone(pubEP, &p);
+			arrayList_add(writer->localPubs,p);
+			celixThreadMutex_unlock(&writer->localPubsLock);
+		}
+	}
+
+	char *key;
+
+	const char* ttlStr = NULL;
+	int ttl = 0;
+
+	// determine ttl
+	if ((bundleContext_getProperty(writer->pubsub_discovery->context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) {
+		ttl = DEFAULT_ETCD_TTL;
+	} else {
+		char* endptr = NULL;
+		errno = 0;
+		ttl = strtol(ttlStr, &endptr, 10);
+		if (*endptr || errno != 0) {
+			ttl = DEFAULT_ETCD_TTL;
+		}
+	}
+
+	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+	asprintf(&key,"%s/%s/%s/%s/%s",
+			 rootPath,
+			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
+
+
+	json_t *jsEndpoint = json_object();
+	const char* propKey = NULL;
+	PROPERTIES_FOR_EACH(pubEP->endpoint_props, propKey) {
+		const char* val = properties_get(pubEP->endpoint_props, propKey);
+		json_t* jsVal = json_string(val);
+		json_object_set(jsEndpoint, propKey, jsVal);
+	}
+	char* jsonEndpointStr = json_dumps(jsEndpoint, JSON_COMPACT);
+
+	if (!etcd_set(key,jsonEndpointStr,ttl,false)) {
+		status = CELIX_ILLEGAL_ARGUMENT;
+	}
+	FREE_MEM(jsonEndpointStr);
+	json_decref(jsEndpoint);
+
+	return status;
+}
+
+celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) {
+	celix_status_t status = CELIX_SUCCESS;
+	char *key = NULL;
+
+	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+	asprintf(&key, "%s/%s/%s/%s/%s",
+			 rootPath,
+			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
+
+	celixThreadMutex_lock(&writer->localPubsLock);
+	for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
+		pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
+		if (pubsubEndpoint_equals(ep, pubEP)) {
+			arrayList_remove(writer->localPubs, i);
+			pubsubEndpoint_destroy(ep);
+			break;
+		}
+	}
+	celixThreadMutex_unlock(&writer->localPubsLock);
+
+	if (etcd_del(key)) {
+		printf("Failed to remove key %s from ETCD\n",key);
+		status = CELIX_ILLEGAL_ARGUMENT;
+	}
+	FREE_MEM(key);
+	return status;
+}
+
+static void* etcdWriter_run(void* data) {
+	etcd_writer_pt writer = (etcd_writer_pt)data;
+	while(writer->running) {
+		celixThreadMutex_lock(&writer->localPubsLock);
+		for(int i=0; i < arrayList_size(writer->localPubs); i++) {
+			etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
+		}
+		celixThreadMutex_unlock(&writer->localPubsLock);
+		sleep(DEFAULT_ETCD_TTL / 2);
+	}
+
+	return NULL;
+}
+
+static const char* etcdWriter_getRootPath(bundle_context_pt context) {
+	const char* rootPath = NULL;
+	bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
+	if(rootPath == NULL) {
+		rootPath = DEFAULT_ETCD_ROOTPATH;
+	}
+	return rootPath;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/etcd_writer.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_writer.h b/bundles/pubsub/pubsub_discovery/src/etcd_writer.h
new file mode 100644
index 0000000..3ff98b9
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/etcd_writer.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_WRITER_H_
+#define ETCD_WRITER_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_endpoint.h"
+
+typedef struct etcd_writer *etcd_writer_pt;
+
+
+etcd_writer_pt etcdWriter_create(pubsub_discovery_pt discovery);
+void etcdWriter_destroy(etcd_writer_pt writer);
+
+celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP,bool storeEP);
+celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP);
+
+
+#endif /* ETCD_WRITER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
new file mode 100644
index 0000000..ad1cc4a
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -0,0 +1,170 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "bundle_activator.h"
+#include "service_tracker.h"
+#include "service_registration.h"
+#include "constants.h"
+#include "celix_log.h"
+
+#include "pubsub_common.h"
+#include "publisher_endpoint_announce.h"
+#include "pubsub_discovery_impl.h"
+
+struct activator {
+	bundle_context_pt context;
+	pubsub_discovery_pt pubsub_discovery;
+
+	service_tracker_pt pstmPublishersTracker;
+
+	publisher_endpoint_announce_pt publisherEPAnnounce;
+	service_registration_pt publisherEPAnnounceService;
+};
+
+static celix_status_t createTMPublisherAnnounceTracker(struct activator *activator, service_tracker_pt *tracker) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	service_tracker_customizer_pt customizer = NULL;
+
+	status = serviceTrackerCustomizer_create(activator->pubsub_discovery,
+			NULL,
+			pubsub_discovery_tmPublisherAnnounceAdded,
+			pubsub_discovery_tmPublisherAnnounceModified,
+			pubsub_discovery_tmPublisherAnnounceRemoved,
+			&customizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(activator->context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, customizer, tracker);
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	struct activator* activator = calloc(1, sizeof(*activator));
+
+	if (activator) {
+		activator->context = context;
+		activator->pstmPublishersTracker = NULL;
+		activator->publisherEPAnnounce = NULL;
+		activator->publisherEPAnnounceService = NULL;
+
+		status = pubsub_discovery_create(context, &activator->pubsub_discovery);
+
+		if (status == CELIX_SUCCESS) {
+			status = createTMPublisherAnnounceTracker(activator, &(activator->pstmPublishersTracker));
+		}
+
+		if (status == CELIX_SUCCESS) {
+			*userData = activator;
+		} else {
+			free(activator);
+		}
+	} else {
+		status = CELIX_ENOMEM;
+	}
+
+	return status;
+
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	struct activator *activator = userData;
+
+	publisher_endpoint_announce_pt pubEPAnnouncer = calloc(1, sizeof(*pubEPAnnouncer));
+
+	if (pubEPAnnouncer) {
+
+		pubEPAnnouncer->handle = activator->pubsub_discovery;
+		pubEPAnnouncer->announcePublisher = pubsub_discovery_announcePublisher;
+		pubEPAnnouncer->removePublisher = pubsub_discovery_removePublisher;
+		pubEPAnnouncer->interestedInTopic = pubsub_discovery_interestedInTopic;
+		pubEPAnnouncer->uninterestedInTopic = pubsub_discovery_uninterestedInTopic;
+		activator->publisherEPAnnounce = pubEPAnnouncer;
+
+		properties_pt props = properties_create();
+		properties_set(props, "PUBSUB_DISCOVERY", "true");
+
+		// pubsub_discovery_start needs to be first to initalize the propert etcd_watcher values
+		status = pubsub_discovery_start(activator->pubsub_discovery);
+
+		if (status == CELIX_SUCCESS) {
+			status = serviceTracker_open(activator->pstmPublishersTracker);
+		}
+
+		if (status == CELIX_SUCCESS) {
+			status = bundleContext_registerService(context, (char *) PUBSUB_DISCOVERY_SERVICE, pubEPAnnouncer, props, &activator->publisherEPAnnounceService);
+		}
+
+
+	}
+	else{
+		status = CELIX_ENOMEM;
+	}
+
+	if(status!=CELIX_SUCCESS && pubEPAnnouncer!=NULL){
+		free(pubEPAnnouncer);
+	}
+
+
+	return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	status += pubsub_discovery_stop(activator->pubsub_discovery);
+
+	status += serviceTracker_close(activator->pstmPublishersTracker);
+
+	status += serviceRegistration_unregister(activator->publisherEPAnnounceService);
+
+	if (status == CELIX_SUCCESS) {
+		free(activator->publisherEPAnnounce);
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	status += serviceTracker_destroy(activator->pstmPublishersTracker);
+	status += pubsub_discovery_destroy(activator->pubsub_discovery);
+
+	activator->publisherEPAnnounce = NULL;
+	activator->publisherEPAnnounceService = NULL;
+	activator->pstmPublishersTracker = NULL;
+	activator->pubsub_discovery = NULL;
+	activator->context = NULL;
+
+	free(activator);
+
+	return status;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery.h
new file mode 100644
index 0000000..f77905a
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery.h
@@ -0,0 +1,26 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_DISCOVERY_H_
+#define PUBSUB_DISCOVERY_H_
+
+typedef struct pubsub_discovery *pubsub_discovery_pt;
+
+
+#endif /* PUBSUB_DISCOVERY_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
new file mode 100644
index 0000000..f0b94c5
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -0,0 +1,530 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <netdb.h>
+#include <netinet/in.h>
+
+#include "constants.h"
+#include "celix_threads.h"
+#include "bundle_context.h"
+#include "array_list.h"
+#include "utils.h"
+#include "celix_errno.h"
+#include "filter.h"
+#include "service_reference.h"
+#include "service_registration.h"
+
+#include "publisher_endpoint_announce.h"
+#include "etcd_common.h"
+#include "etcd_watcher.h"
+#include "etcd_writer.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_discovery_impl.h"
+
+static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp);
+
+/* Discovery activator functions */
+celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *ps_discovery = calloc(1, sizeof(**ps_discovery));
+
+    if (*ps_discovery == NULL) {
+        return CELIX_ENOMEM;
+    }
+
+    (*ps_discovery)->context = context;
+    (*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    (*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
+    (*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL);
+    (*ps_discovery)->verbose = PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE;
+    celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL);
+    celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL);
+    celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL);
+
+    const char *verboseStr = NULL;
+    bundleContext_getProperty(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, &verboseStr);
+    if (verboseStr != NULL) {
+        (*ps_discovery)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
+
+    hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->discoveredPubs);
+
+    while (hashMapIterator_hasNext(iter)) {
+        array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
+
+        for(int i=0; i < arrayList_size(pubEP_list); i++) {
+            pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i)));
+        }
+        arrayList_destroy(pubEP_list);
+    }
+
+    hashMapIterator_destroy(iter);
+
+    hashMap_destroy(ps_discovery->discoveredPubs, true, false);
+    ps_discovery->discoveredPubs = NULL;
+
+    celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
+
+    celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex);
+
+
+    celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex);
+
+    hashMap_destroy(ps_discovery->listenerReferences, false, false);
+    ps_discovery->listenerReferences = NULL;
+
+    celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex);
+
+    celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex);
+
+    free(ps_discovery);
+
+    return status;
+}
+
+celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) {
+    celix_status_t status = CELIX_SUCCESS;
+    status = etcdCommon_init(ps_discovery->context);
+    ps_discovery->writer = etcdWriter_create(ps_discovery);
+
+    return status;
+}
+
+celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char* fwUUID = NULL;
+
+    bundleContext_getProperty(ps_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+    if (fwUUID == NULL) {
+        fprintf(stderr, "ERROR PSD: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
+
+    celixThreadMutex_lock(&ps_discovery->watchersMutex);
+
+    hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->watchers);
+    while (hashMapIterator_hasNext(iter)) {
+        struct watcher_info * wi = hashMapIterator_nextValue(iter);
+        etcdWatcher_stop(wi->watcher);
+    }
+    hashMapIterator_destroy(iter);
+
+    celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
+
+    /* Unexport all publishers for the local framework, and also delete from ETCD publisher belonging to the local framework */
+
+    iter = hashMapIterator_create(ps_discovery->discoveredPubs);
+    while (hashMapIterator_hasNext(iter)) {
+        array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
+
+        int i;
+        for (i = 0; i < arrayList_size(pubEP_list); i++) {
+            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(pubEP_list, i);
+            if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) {
+                etcdWriter_deletePublisherEndpoint(ps_discovery->writer, pubEP);
+            } else {
+                pubsub_discovery_informPublishersListeners(ps_discovery, pubEP, false);
+                arrayList_remove(pubEP_list, i);
+                pubsubEndpoint_destroy(pubEP);
+                i--;
+            }
+        }
+    }
+
+    hashMapIterator_destroy(iter);
+
+    celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
+    etcdWriter_destroy(ps_discovery->writer);
+
+    iter = hashMapIterator_create(ps_discovery->watchers);
+    while (hashMapIterator_hasNext(iter)) {
+        struct watcher_info * wi = hashMapIterator_nextValue(iter);
+        etcdWatcher_destroy(wi->watcher);
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(ps_discovery->watchers, true, true);
+    celixThreadMutex_unlock(&ps_discovery->watchersMutex);
+    return status;
+}
+
+/* Functions called by the etcd_watcher */
+
+celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    bool valid = pubsub_discovery_isEndpointValid(pubEP);
+    if (!valid) {
+        status = CELIX_ILLEGAL_STATE;
+        return status;
+    }
+
+    bool inform = false;
+    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+
+    char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
+    if(pubEP_list==NULL){
+        arrayList_create(&pubEP_list);
+        arrayList_add(pubEP_list,pubEP);
+        hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list);
+        inform=true;
+    }
+    else{
+        int i;
+        bool found = false;
+        for(i=0;i<arrayList_size(pubEP_list) && !found;i++){
+            found = pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i));
+        }
+        if(found){
+            pubsubEndpoint_destroy(pubEP);
+        }
+        else{
+            arrayList_add(pubEP_list,pubEP);
+            inform=true;
+        }
+    }
+    free(pubs_key);
+
+    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+    if(inform){
+        status = pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true);
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
+    celix_status_t status = CELIX_SUCCESS;
+    pubsub_endpoint_pt p = NULL;
+    bool found = false;
+
+    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+    char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    array_list_pt pubEP_list = (array_list_pt) hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
+    free(pubs_key);
+    if (pubEP_list == NULL) {
+        printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        status = CELIX_ILLEGAL_STATE;
+    } else {
+        int i;
+
+        for (i = 0; !found && i < arrayList_size(pubEP_list); i++) {
+            p = arrayList_get(pubEP_list, i);
+            found = pubsubEndpoint_equals(pubEP, p);
+            if (found) {
+                arrayList_remove(pubEP_list, i);
+                pubsubEndpoint_destroy(p);
+            }
+        }
+    }
+
+    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+    if (found) {
+        status = pubsub_discovery_informPublishersListeners(pubsub_discovery, pubEP, false);
+    }
+    pubsubEndpoint_destroy(pubEP);
+
+    return status;
+}
+
+/* Callback to the pubsub_topology_manager */
+celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    // Inform listeners of new publisher endpoint
+    celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+
+    if (pubsub_discovery->listenerReferences != NULL) {
+        hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->listenerReferences);
+        while (hashMapIterator_hasNext(iter)) {
+            service_reference_pt reference = hashMapIterator_nextKey(iter);
+
+            publisher_endpoint_announce_pt listener = NULL;
+
+            bundleContext_getService(pubsub_discovery->context, reference, (void**) &listener);
+            if (epAdded) {
+                listener->announcePublisher(listener->handle, pubEP);
+            } else {
+                listener->removePublisher(listener->handle, pubEP);
+            }
+            bundleContext_ungetService(pubsub_discovery->context, reference, NULL);
+        }
+        hashMapIterator_destroy(iter);
+    }
+
+    celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+
+    return status;
+}
+
+
+/* Service's functions implementation */
+celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) {
+    celix_status_t status = CELIX_SUCCESS;
+    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+    bool valid = pubsub_discovery_isEndpointValid(pubEP);
+    if (!valid) {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        return status;
+    }
+
+    if (pubsub_discovery->verbose) {
+        printf("pubsub_discovery_announcePublisher : %s / %s\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+
+
+    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+
+    char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
+
+    if(pubEP_list==NULL){
+        arrayList_create(&pubEP_list);
+        hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list);
+    }
+    free(pub_key);
+    pubsub_endpoint_pt p = NULL;
+    pubsubEndpoint_clone(pubEP, &p);
+
+    arrayList_add(pubEP_list,p);
+
+    status = etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true);
+
+    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+    return status;
+}
+
+celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) {
+    celix_status_t status = CELIX_SUCCESS;
+    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+    bool valid = pubsub_discovery_isEndpointValid(pubEP);
+    if (!valid) {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        return status;
+    }
+
+    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+
+    char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
+    free(pub_key);
+    if(pubEP_list==NULL){
+        printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        status = CELIX_ILLEGAL_STATE;
+    }
+    else{
+
+        int i;
+        bool found = false;
+        pubsub_endpoint_pt p = NULL;
+
+        for(i=0;!found && i<arrayList_size(pubEP_list);i++){
+            p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+            found = pubsubEndpoint_equals(pubEP,p);
+        }
+
+        if(!found){
+            printf("WARNING PSD: Trying to remove a not existing endpoint. Something is not consistent.\n");
+            status = CELIX_ILLEGAL_STATE;
+        }
+        else{
+
+            arrayList_removeElement(pubEP_list,p);
+
+            status = etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p);
+
+            pubsubEndpoint_destroy(p);
+        }
+    }
+
+    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+    return status;
+}
+
+celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) {
+    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+    char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
+    struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, scope_topic_key);
+    if(wi) {
+        wi->nr_references++;
+        free(scope_topic_key);
+    } else {
+        wi = calloc(1, sizeof(*wi));
+        etcdWatcher_create(pubsub_discovery, pubsub_discovery->context, scope, topic, &wi->watcher);
+        wi->nr_references = 1;
+        hashMap_put(pubsub_discovery->watchers, scope_topic_key, wi);
+    }
+
+    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
+
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic) {
+    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+    char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
+
+    hash_map_entry_pt entry =  hashMap_getEntry(pubsub_discovery->watchers, scope_topic_key);
+    if(entry) {
+        struct watcher_info * wi = hashMapEntry_getValue(entry);
+        wi->nr_references--;
+        if(wi->nr_references == 0) {
+            char *key = hashMapEntry_getKey(entry);
+            hashMap_remove(pubsub_discovery->watchers, scope_topic_key);
+            free(key);
+            free(scope_topic_key);
+            etcdWatcher_stop(wi->watcher);
+            etcdWatcher_destroy(wi->watcher);
+            free(wi);
+        }
+    } else {
+        fprintf(stderr, "[DISC] Inconsistency error: Removing unknown topic %s\n", topic);
+    }
+    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
+    return CELIX_SUCCESS;
+}
+
+/* pubsub_topology_manager tracker callbacks */
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
+    publisher_endpoint_announce_pt listener = (publisher_endpoint_announce_pt)service;
+
+    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+    celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+
+    /* Notify the PSTM about discovered publisher endpoints */
+    hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->discoveredPubs);
+    while(hashMapIterator_hasNext(iter)){
+        array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
+        int i;
+        for(i=0;i<arrayList_size(pubEP_list);i++){
+            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+            status += listener->announcePublisher(listener->handle, pubEP);
+        }
+    }
+
+    hashMapIterator_destroy(iter);
+
+    hashMap_put(pubsub_discovery->listenerReferences, reference, NULL);
+
+    celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+    if (pubsub_discovery->verbose) {
+        printf("PSD: pubsub_tm_announce_publisher added.\n");
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, service);
+    if (status == CELIX_SUCCESS) {
+        status = pubsub_discovery_tmPublisherAnnounceAdded(handle, reference, service);
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service) {
+    celix_status_t status = CELIX_SUCCESS;
+    pubsub_discovery_pt pubsub_discovery = handle;
+
+    celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+
+    if (pubsub_discovery->listenerReferences != NULL) {
+        if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) {
+            if (pubsub_discovery->verbose) {
+                printf("PSD: pubsub_tm_announce_publisher removed.\n");
+            }
+        }
+    }
+    celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+
+    return status;
+}
+
+static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp) {
+    //required properties
+    bool valid = true;
+    static const char* keys[] = {
+        PUBSUB_ENDPOINT_UUID,
+        PUBSUB_ENDPOINT_FRAMEWORK_UUID,
+        PUBSUB_ENDPOINT_TYPE,
+        PUBSUB_ENDPOINT_ADMIN_TYPE,
+        PUBSUB_ENDPOINT_SERIALIZER,
+        PUBSUB_ENDPOINT_TOPIC_NAME,
+        PUBSUB_ENDPOINT_TOPIC_SCOPE,
+        NULL };
+    int i;
+    for (i = 0; keys[i] != NULL; ++i) {
+        const char *val = properties_get(psEp->endpoint_props, keys[i]);
+        if (val == NULL) { //missing required key
+            fprintf(stderr, "[ERROR] PSD: Invalid endpoint missing key: '%s'\n", keys[i]);
+            valid = false;
+        }
+    }
+    if (!valid) {
+        const char *key = NULL;
+        fprintf(stderr, "PubSubEndpoint entries:\n");
+        PROPERTIES_FOR_EACH(psEp->endpoint_props, key) {
+            fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key));
+        }
+        if (psEp->topic_props != NULL) {
+            fprintf(stderr, "PubSubEndpoint topic properties entries:\n");
+            PROPERTIES_FOR_EACH(psEp->topic_props, key) {
+                fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key));
+            }
+        }
+    }
+    return valid;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
new file mode 100644
index 0000000..eaf8e85
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -0,0 +1,77 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_DISCOVERY_IMPL_H_
+#define PUBSUB_DISCOVERY_IMPL_H_
+
+#include "bundle_context.h"
+#include "service_reference.h"
+
+#include "etcd_watcher.h"
+#include "etcd_writer.h"
+#include "pubsub_endpoint.h"
+
+#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
+
+#define PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY "PUBSUB_ETCD_DISCOVERY_VERBOSE"
+#define PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE false
+
+struct watcher_info {
+    etcd_watcher_pt watcher;
+    int nr_references;
+};
+
+struct pubsub_discovery {
+	bundle_context_pt context;
+
+	celix_thread_mutex_t discoveredPubsMutex;
+	hash_map_pt discoveredPubs; //<topic,List<pubsub_endpoint_pt>>
+
+	celix_thread_mutex_t listenerReferencesMutex;
+	hash_map_pt listenerReferences; //key=serviceReference, value=nop
+
+	celix_thread_mutex_t watchersMutex;
+	hash_map_pt watchers; //key = topicname, value = struct watcher_info
+
+	etcd_writer_pt writer;
+
+	bool verbose;
+};
+
+
+celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt* node_discovery);
+celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt node_discovery);
+celix_status_t pubsub_discovery_start(pubsub_discovery_pt node_discovery);
+celix_status_t pubsub_discovery_stop(pubsub_discovery_pt node_discovery);
+
+celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic);
+celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic);
+
+celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt discovery, pubsub_endpoint_pt endpoint, bool endpointAdded);
+
+#endif /* PUBSUB_DISCOVERY_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
new file mode 100644
index 0000000..cacdf09
--- /dev/null
+++ b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(Jansson REQUIRED)
+
+
+add_celix_bundle(celix_pubsub_serializer_json
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_serializer_json"
+    VERSION "1.0.0"
+    SOURCES
+		src/ps_activator.c
+		src/pubsub_serializer_impl.c
+)
+target_include_directories(celix_pubsub_serializer_json PRIVATE
+	src
+	${JANSSON_INCLUDE_DIR}
+)
+set_target_properties(celix_pubsub_serializer_json PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_serializer_json PRIVATE Celix::pubsub_spi Celix::framework Celix::dfi ${JANSSON_LIBRARIES} Celix::log_helper)
+
+install_celix_bundle(celix_pubsub_serializer_json EXPORT celix COMPONENT pubsub)
+
+add_library(Celix::pubsub_serializer_json ALIAS celix_pubsub_serializer_json)
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c b/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c
new file mode 100644
index 0000000..32dd1fc
--- /dev/null
+++ b/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c
@@ -0,0 +1,108 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * ps_activator.c
+ *
+ *  \date       Mar 24, 2017
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+#include "service_registration.h"
+#include "pubsub_constants.h"
+
+#include "pubsub_serializer_impl.h"
+
+struct activator {
+	pubsub_serializer_t* serializer;
+	pubsub_serializer_service_t* serializerService;
+	service_registration_pt registration;
+};
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator;
+
+	activator = calloc(1, sizeof(*activator));
+	if (!activator) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		*userData = activator;
+		status = pubsubSerializer_create(context, &(activator->serializer));
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+	pubsub_serializer_service_t* pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
+
+	if (!pubsubSerializerSvc) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		pubsubSerializerSvc->handle = activator->serializer;
+
+		pubsubSerializerSvc->createSerializerMap = (void*)pubsubSerializer_createSerializerMap;
+		pubsubSerializerSvc->destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap;
+		activator->serializerService = pubsubSerializerSvc;
+
+		/* Set serializer type */
+		properties_pt props = properties_create();
+		properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, PUBSUB_SERIALIZER_TYPE);
+
+		status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, props, &activator->registration);
+
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	serviceRegistration_unregister(activator->registration);
+	activator->registration = NULL;
+
+	free(activator->serializerService);
+	activator->serializerService = NULL;
+
+	return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	pubsubSerializer_destroy(activator->serializer);
+	activator->serializer = NULL;
+
+	free(activator);
+
+	return status;
+}
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
new file mode 100644
index 0000000..685d499
--- /dev/null
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
@@ -0,0 +1,295 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer_impl.c
+ *
+ *  \date       Mar 24, 2017
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <inttypes.h>
+
+#include "utils.h"
+#include "hash_map.h"
+#include "bundle_context.h"
+
+#include "log_helper.h"
+
+#include "json_serializer.h"
+
+#include "pubsub_serializer_impl.h"
+
+#define SYSTEM_BUNDLE_ARCHIVE_PATH 		"CELIX_FRAMEWORK_EXTENDER_PATH"
+#define MAX_PATH_LEN    1024
+
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle_pt bundle);
+
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t** serializer) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	*serializer = calloc(1, sizeof(**serializer));
+
+	if (!*serializer) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+
+		(*serializer)->bundle_context= context;
+
+		if (logHelper_create(context, &(*serializer)->loghelper) == CELIX_SUCCESS) {
+			logHelper_start((*serializer)->loghelper);
+		}
+
+	}
+
+	return status;
+}
+
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	logHelper_stop(serializer->loghelper);
+	logHelper_destroy(&serializer->loghelper);
+
+	free(serializer);
+
+	return status;
+}
+
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
+
+	if (map != NULL) {
+		pubsubSerializer_fillMsgSerializerMap(map, bundle);
+	} else {
+		logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map");
+		status = CELIX_ENOMEM;
+	}
+
+	if (status == CELIX_SUCCESS) {
+		*serializerMap = map;
+	}
+	return status;
+}
+
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, hash_map_pt serializerMap) {
+	celix_status_t status = CELIX_SUCCESS;
+	if (serializerMap == NULL) {
+		return CELIX_ILLEGAL_ARGUMENT;
+	}
+
+	hash_map_iterator_t iter = hashMapIterator_construct(serializerMap);
+	while (hashMapIterator_hasNext(&iter)) {
+		pubsub_msg_serializer_t* msgSerializer = hashMapIterator_nextValue(&iter);
+		dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+		dynMessage_destroy(dynMsg); //note msgSer->name and msgSer->version owned by dynType
+		free(msgSerializer); //also contains the service struct.
+	}
+
+	hashMap_destroy(serializerMap, false, false);
+
+	return status;
+}
+
+
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	char *jsonOutput = NULL;
+	dyn_type* dynType = NULL;
+	dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+	dynMessage_getMessageType(dynMsg, &dynType);
+
+	if (jsonSerializer_serialize(dynType, msg, &jsonOutput) != 0){
+		status = CELIX_BUNDLE_EXCEPTION;
+	}
+
+	if (status == CELIX_SUCCESS) {
+		*out = jsonOutput;
+		*outLen = strlen(jsonOutput) + 1;
+	}
+
+	return status;
+}
+
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out) {
+
+	celix_status_t status = CELIX_SUCCESS;
+	void *msg = NULL;
+	dyn_type* dynType = NULL;
+	dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+	dynMessage_getMessageType(dynMsg, &dynType);
+
+	if (jsonSerializer_deserialize(dynType, (const char*)input, &msg) != 0) {
+		status = CELIX_BUNDLE_EXCEPTION;
+	}
+	else{
+		*out = msg;
+	}
+
+	return status;
+}
+
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg) {
+	dyn_type* dynType = NULL;
+	dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+	dynMessage_getMessageType(dynMsg, &dynType);
+	if (dynType != NULL) {
+		dynType_free(dynType, msg);
+	}
+}
+
+
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bundle_pt bundle) {
+	char* root = NULL;
+	char* metaInfPath = NULL;
+
+	root = pubsubSerializer_getMsgDescriptionDir(bundle);
+
+	if(root != NULL){
+		asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
+
+		pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
+		pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
+
+		free(metaInfPath);
+		free(root);
+	}
+}
+
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
+{
+	char *root = NULL;
+
+	bool isSystemBundle = false;
+	bundle_isSystemBundle(bundle, &isSystemBundle);
+
+	if(isSystemBundle == true) {
+		bundle_context_pt context;
+		bundle_getContext(bundle, &context);
+
+		const char *prop = NULL;
+
+		bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+
+		if(prop != NULL) {
+			root = strdup(prop);
+		} else {
+			root = getcwd(NULL, 0);
+		}
+	} else {
+		bundle_getEntry(bundle, ".", &root);
+	}
+
+	return root;
+}
+
+
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgSerializers)
+{
+	char path[MAX_PATH_LEN];
+	struct dirent *entry = NULL;
+	DIR *dir = opendir(root);
+
+	if(dir) {
+		entry = readdir(dir);
+	}
+
+	while (entry != NULL) {
+
+		if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+			printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+			snprintf(path, MAX_PATH_LEN, "%s/%s", root, entry->d_name);
+			FILE *stream = fopen(path,"r");
+
+			if (stream != NULL){
+				dyn_message_type* msgType = NULL;
+
+				int rc = dynMessage_parse(stream, &msgType);
+				if (rc == 0 && msgType != NULL) {
+
+					char* msgName = NULL;
+					rc += dynMessage_getName(msgType,&msgName);
+
+					version_pt msgVersion = NULL;
+					rc += dynMessage_getVersion(msgType, &msgVersion);
+
+					if(rc == 0 && msgName != NULL && msgVersion != NULL){
+
+						unsigned int msgId = utils_stringHash(msgName);
+
+						pubsub_msg_serializer_t *msgSerializer = calloc(1,sizeof(pubsub_msg_serializer_t));
+
+						msgSerializer->handle = msgType;
+						msgSerializer->msgId = msgId;
+						msgSerializer->msgName = msgName;
+						msgSerializer->msgVersion = msgVersion;
+						msgSerializer->serialize = (void*) pubsubMsgSerializer_serialize;
+						msgSerializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
+						msgSerializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
+
+						bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
+						if (clash){
+							printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
+							free(msgSerializer);
+							dynMessage_destroy(msgType);
+						}
+						else if (msgId != 0){
+							printf("Adding %u : %s\n", msgId, msgName);
+							hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, msgSerializer);
+						}
+						else{
+							printf("Error creating msg serializer\n");
+							free(msgSerializer);
+							dynMessage_destroy(msgType);
+						}
+
+					}
+					else{
+						printf("Cannot retrieve name and/or version from msg\n");
+					}
+
+				} else{
+					printf("DMU: cannot parse message from descriptor %s\n.",path);
+				}
+				fclose(stream);
+			}else{
+				printf("DMU: cannot open descriptor file %s\n.",path);
+			}
+
+		}
+		entry = readdir(dir);
+	}
+
+	if(dir) {
+		closedir(dir);
+	}
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
new file mode 100644
index 0000000..c36f20e
--- /dev/null
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
@@ -0,0 +1,55 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer_impl.h
+ *
+ *  \date       Mar 24, 2017
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_SERIALIZER_JSON_H_
+#define PUBSUB_SERIALIZER_JSON_H_
+
+#include "dyn_common.h"
+#include "dyn_type.h"
+#include "dyn_message.h"
+#include "log_helper.h"
+
+#include "pubsub_serializer.h"
+
+#define PUBSUB_SERIALIZER_TYPE	"json"
+
+typedef struct pubsub_serializer {
+	bundle_context_pt bundle_context;
+	log_helper_pt loghelper;
+} pubsub_serializer_t;
+
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t* *serializer);
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer);
+
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap);
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, hash_map_pt serializerMap);
+
+/* Start of serializer specific functions */
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg);
+
+#endif /* PUBSUB_SERIALIZER_JSON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt
new file mode 100644
index 0000000..c565660
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(pubsub_spi STATIC
+		src/pubsub_admin_match.c
+        src/pubsub_endpoint.c
+        src/pubsub_utils.c
+)
+set_target_properties(pubsub_spi PROPERTIES OUTPUT_NAME "celix_pubsub_spi")
+target_include_directories(pubsub_spi PUBLIC
+		$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>
+		$<INSTALL_INTERFACE:include/celix/pubsub_spi>
+)
+target_link_libraries(pubsub_spi PUBLIC Celix::framework Celix::pubsub_api)
+
+set_target_properties(pubsub_spi PROPERTIES TOPIC_INFO_DESCRIPTOR ${CMAKE_CURRENT_LIST_DIR}/include/pubsub_topic_info.descriptor)
+#TODO how to make this descriptor available for imported targets? $<INSTALL_INTERFACE:include/celix/pubsub_spi/pubsub_topic_info.descriptor>
+
+add_library(Celix::pubsub_spi ALIAS pubsub_spi)
+
+install(TARGETS pubsub_spi EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub)
+install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub)

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/publisher_endpoint_announce.h b/bundles/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
new file mode 100644
index 0000000..607e83a
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
@@ -0,0 +1,40 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_
+#define PUBLISHER_ENDPOINT_ANNOUNCE_H_
+
+#include "pubsub_endpoint.h"
+
+
+//TODO refactor to pubsub_endpoint_announce
+//can be used to announce and remove publisher and subscriber endpoints
+
+struct publisher_endpoint_announce {
+	void *handle;
+	celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP);
+	celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt pubEP);
+	celix_status_t (*interestedInTopic)(void* handle, const char *scope, const char *topic);
+	celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, const char *topic);
+};
+
+typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt;
+
+
+#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
new file mode 100644
index 0000000..5379415
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -0,0 +1,70 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin.h
+ *
+ *  \date       Sep 30, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ADMIN_H_
+#define PUBSUB_ADMIN_H_
+
+#include "service_reference.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+
+#include "pubsub_constants.h"
+
+
+typedef struct pubsub_admin *pubsub_admin_pt;
+
+struct pubsub_admin_service {
+	pubsub_admin_pt admin;
+
+	celix_status_t (*addSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+	celix_status_t (*removeSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+	celix_status_t (*addPublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+	celix_status_t (*removePublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+	celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic);
+	celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic);
+
+	//TODO add match function for subscription service and publication listeners, e.g.:
+	//matchPublisherListener(admin, bundle, filter, outScore)
+	//matchSubscriberService(admin, svcRef, outScore)
+
+	/* Match principle:
+	 * - A full matching pubsub_admin gives 100 points
+	 */
+	//TODO this should only be called for remote endpoints (e.g. not endpoints from this framework
+	celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
+
+        //TODO redesign add function for handling endpoint seperate, e.g.: 
+        //addEndpoint(admin, endpoint); 
+        //note that endpoints can be subscribers and publishers
+        //Also note that we than can have pending subscribers and pending (subscriber/publisher) endpoints.
+};
+
+typedef struct pubsub_admin_service *pubsub_admin_service_pt;
+
+#endif /* PUBSUB_ADMIN_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h
new file mode 100644
index 0000000..08d6582
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h
@@ -0,0 +1,47 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#ifndef PUBSUB_ADMIN_MATCH_H_
+#define PUBSUB_ADMIN_MATCH_H_
+
+#include "celix_errno.h"
+#include "properties.h"
+#include "array_list.h"
+
+#include "pubsub_serializer.h"
+
+#define QOS_ATTRIBUTE_KEY	"attribute.qos"
+#define QOS_TYPE_SAMPLE		"sample"	/* A.k.a. unreliable connection */
+#define QOS_TYPE_CONTROL	"control"	/* A.k.a. reliable connection */
+
+#define PUBSUB_ADMIN_FULL_MATCH_SCORE	100.0F
+
+celix_status_t pubsub_admin_match(
+        pubsub_endpoint_pt endpoint,
+        const char *pubsub_admin_type,
+        const char *frameworkUuid,
+        double sampleScore,
+        double controlScore,
+        double defaultScore,
+        array_list_pt serializerList,
+        double *score);
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, service_reference_pt *out);
+
+#endif /* PUBSUB_ADMIN_MATCH_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_common.h b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
new file mode 100644
index 0000000..5dfd8fd
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
@@ -0,0 +1,52 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_common.h
+ *
+ *  \date       Sep 17, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_COMMON_H_
+#define PUBSUB_COMMON_H_
+
+#define PUBSUB_SERIALIZER_SERVICE 		"pubsub_serializer"
+#define PUBSUB_ADMIN_SERVICE 			"pubsub_admin"
+#define PUBSUB_DISCOVERY_SERVICE		"pubsub_discovery"
+#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE    "pubsub_tm_announce_publisher"
+
+#define PUBSUB_ANY_SUB_TOPIC		        "any"
+
+#define	PUBSUB_BUNDLE_ID			"bundle.id"
+
+#define MAX_SCOPE_LEN                           1024
+#define MAX_TOPIC_LEN				1024
+
+struct pubsub_msg_header{
+	char topic[MAX_TOPIC_LEN];
+	unsigned int type;
+	unsigned char major;
+	unsigned char minor;
+};
+
+typedef struct pubsub_msg_header* pubsub_msg_header_pt;
+
+
+#endif /* PUBSUB_COMMON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
new file mode 100644
index 0000000..47e31d3
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -0,0 +1,30 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_CONSTANTS_H_
+#define PUBSUB_CONSTANTS_H_
+
+#define PSA_IP 	"PSA_IP"
+#define PSA_ITF	"PSA_INTERFACE"
+#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
+
+#define PUBSUB_ADMIN_TYPE_KEY	   "pubsub.config"
+#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer.type"
+
+#endif /* PUBSUB_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
new file mode 100644
index 0000000..c0492f5
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -0,0 +1,81 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_endpoint.h
+ *
+ *  \date       Sep 21, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ENDPOINT_H_
+#define PUBSUB_ENDPOINT_H_
+
+#include "service_reference.h"
+#include "listener_hook_service.h"
+#include "properties.h"
+
+#include "pubsub/publisher.h"
+#include "pubsub/subscriber.h"
+
+#include "pubsub_constants.h"
+
+//required for valid endpoint
+#define PUBSUB_ENDPOINT_UUID            "pubsub.endpoint.uuid" //required
+#define PUBSUB_ENDPOINT_FRAMEWORK_UUID  "pubsub.framework.uuid" //required
+#define PUBSUB_ENDPOINT_TYPE            "pubsub.endpoint.type" //PUBSUB_PUBLISHER_ENDPOINT_TYPE or PUBSUB_SUBSCRIBER_ENDPOINT_TYPE
+#define PUBSUB_ENDPOINT_ADMIN_TYPE       PUBSUB_ADMIN_TYPE_KEY
+#define PUBSUB_ENDPOINT_SERIALIZER       PUBSUB_SERIALIZER_TYPE_KEY
+#define PUBSUB_ENDPOINT_TOPIC_NAME      "pubsub.topic.name"
+#define PUBSUB_ENDPOINT_TOPIC_SCOPE     "pubsub.topic.scope"
+
+//optional
+#define PUBSUB_ENDPOINT_SERVICE_ID      "pubsub.service.id"
+#define PUBSUB_ENDPOINT_BUNDLE_ID       "pubsub.bundle.id"
+#define PUBSUB_ENDPOINT_URL             "pubsub.url"
+
+
+#define PUBSUB_PUBLISHER_ENDPOINT_TYPE  "pubsub.publisher"
+#define PUBSUB_SUBSCRIBER_ENDPOINT_TYPE "pubsub.subscriber"
+
+
+struct pubsub_endpoint {
+    properties_pt endpoint_props;
+    properties_pt topic_props;
+};
+
+typedef struct pubsub_endpoint *pubsub_endpoint_pt;
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char* pubsubType, properties_pt topic_props, pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t* ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* out);
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t* ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out);
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out);
+void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
+bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
+celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value);
+
+/**
+ * Creates a pubsub_endpoint based on discovered properties.
+ * Will take ownership over the discovredProperties
+ */
+celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t *discoveredProperties, pubsub_endpoint_pt* out);
+
+char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic);
+
+#endif /* PUBSUB_ENDPOINT_H_ */