You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 18:34:22 UTC
[10/19] celix git commit: CELIX-389: Refactors pubsub.
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h b/pubsub/pubsub_common/public/include/pubsub_admin.h
new file mode 100644
index 0000000..fc1cfbb
--- /dev/null
+++ b/pubsub/pubsub_common/public/include/pubsub_admin.h
@@ -0,0 +1,61 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin.h
+ *
+ * \date Sep 30, 2011
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ADMIN_H_
+#define PUBSUB_ADMIN_H_
+
+#include "service_reference.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+
+#define PSA_IP "PSA_IP"
+#define PSA_ITF "PSA_INTERFACE"
+#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
+
+#define PSA_DEFAULT "zmq"
+
+typedef struct pubsub_admin *pubsub_admin_pt;
+
+struct pubsub_admin_service {
+ pubsub_admin_pt admin;
+
+ celix_status_t (*addSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+ celix_status_t (*removeSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+ celix_status_t (*addPublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+ celix_status_t (*removePublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+ celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic);
+ celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic);
+
+ celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
+ celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
+};
+
+typedef struct pubsub_admin_service *pubsub_admin_service_pt;
+
+#endif /* PUBSUB_ADMIN_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_common.h b/pubsub/pubsub_common/public/include/pubsub_common.h
new file mode 100644
index 0000000..d9c6f1d
--- /dev/null
+++ b/pubsub/pubsub_common/public/include/pubsub_common.h
@@ -0,0 +1,51 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_common.h
+ *
+ * \date Sep 17, 2015
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_COMMON_H_
+#define PUBSUB_COMMON_H_
+
+#define PUBSUB_ADMIN_SERVICE "pubsub_admin"
+#define PUBSUB_DISCOVERY_SERVICE "pubsub_discovery"
+#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE "pubsub_tm_announce_publisher"
+
+#define PUBSUB_ANY_SUB_TOPIC "any"
+
+#define PUBSUB_BUNDLE_ID "bundle.id"
+
+#define MAX_SCOPE_LEN 1024
+#define MAX_TOPIC_LEN 1024
+
+struct pubsub_msg_header{
+ char topic[MAX_TOPIC_LEN];
+ unsigned int type;
+ unsigned char major;
+ unsigned char minor;
+};
+
+typedef struct pubsub_msg_header* pubsub_msg_header_pt;
+
+
+#endif /* PUBSUB_COMMON_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_endpoint.h b/pubsub/pubsub_common/public/include/pubsub_endpoint.h
new file mode 100644
index 0000000..193b3fd
--- /dev/null
+++ b/pubsub/pubsub_common/public/include/pubsub_endpoint.h
@@ -0,0 +1,50 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_endpoint.h
+ *
+ * \date Sep 21, 2015
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ENDPOINT_H_
+#define PUBSUB_ENDPOINT_H_
+
+#include "service_reference.h"
+
+struct pubsub_endpoint {
+ char *frameworkUUID;
+ char *scope;
+ char *topic;
+ long serviceID;
+ char* endpoint;
+ bool is_secure;
+};
+
+typedef struct pubsub_endpoint *pubsub_endpoint_pt;
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
+bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
+
+char *createScopeTopicKey(const char* scope, const char* topic);
+
+#endif /* PUBSUB_ENDPOINT_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h b/pubsub/pubsub_common/public/include/pubsub_serializer.h
new file mode 100644
index 0000000..565bac4
--- /dev/null
+++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -0,0 +1,48 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer.h
+ *
+ * \date Dec 7, 2016
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_SERIALIZER_H
+#define PUBSUB_SERIALIZER_H
+
+#include "bundle.h"
+#include "hash_map.h"
+#include "celix_errno.h"
+
+typedef struct _pubsub_message_type pubsub_message_type;
+
+celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
+celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output);
+
+unsigned int pubsubSerializer_hashCode(const char *string);
+version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType);
+char* pubsubSerializer_getName(pubsub_message_type *msgType);
+
+void pubsubSerializer_fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle);
+void pubsubSerializer_emptyMsgTypesMap(hash_map_pt msgTypesMap);
+
+void pubsubSerializer_freeMsg(pubsub_message_type *msgType, void *msg);
+
+#endif
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor b/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
new file mode 100644
index 0000000..c01a2fd
--- /dev/null
+++ b/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
@@ -0,0 +1,10 @@
+:header
+type=interface
+name=pubsub_topic_info
+version=1.0.0
+:annotations
+:types
+:methods
+getParticipantsNumber(t)i=getParticipantsNumber(#am=handle;Pt#am=pre;*i)N
+getSubscribersNumber(t)i=getSubscribersNumber(#am=handle;Pt#am=pre;*i)N
+getPublishersNumber(t)i=getPublishersNumber(#am=handle;Pt#am=pre;*i)N
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_utils.h b/pubsub/pubsub_common/public/include/pubsub_utils.h
new file mode 100644
index 0000000..aff5c72
--- /dev/null
+++ b/pubsub/pubsub_common/public/include/pubsub_utils.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.h
+ *
+ * \date Sep 24, 2015
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_UTILS_H_
+#define PUBSUB_UTILS_H_
+
+#include "bundle_context.h"
+#include "array_list.h"
+
+char* pubsub_getScopeFromFilter(char* bundle_filter);
+char* pubsub_getTopicFromFilter(char* bundle_filter);
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
+array_list_pt pubsub_getTopicsFromString(char* string);
+
+
+#endif /* PUBSUB_UTILS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/src/dyn_msg_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/dyn_msg_utils.c b/pubsub/pubsub_common/public/src/dyn_msg_utils.c
new file mode 100644
index 0000000..11e4507
--- /dev/null
+++ b/pubsub/pubsub_common/public/src/dyn_msg_utils.c
@@ -0,0 +1,162 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * dyn_msg_utils.c
+ *
+ * \date Nov 11, 2015
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <dirent.h>
+
+#include "utils.h"
+#include "dyn_message.h"
+
+#include "dyn_msg_utils.h"
+
+#define SYSTEM_BUNDLE_ARCHIVE_PATH "CELIX_FRAMEWORK_EXTENDER_PATH"
+
+static char * getMsgDescriptionDir(bundle_pt bundle);
+static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
+
+
+unsigned int uintHash(const void * uintNum) {
+ return *((unsigned int*)uintNum);
+}
+
+int uintEquals(const void * uintNum, const void * toCompare) {
+ return ( (*((unsigned int*)uintNum)) == (*((unsigned int*)toCompare)) );
+}
+
+void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
+
+ char *root = NULL;
+ char *metaInfPath = NULL;
+
+ root = getMsgDescriptionDir(bundle);
+
+ if(root != NULL){
+ asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
+
+ addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
+ addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
+
+ free(metaInfPath);
+ free(root);
+ }
+}
+
+void emptyMsgTypesMap(hash_map_pt msgTypesMap)
+{
+ hash_map_iterator_pt iter = hashMapIterator_create(msgTypesMap);
+
+ while(hashMapIterator_hasNext(iter)){
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ dynMessage_destroy( ((dyn_message_type *) hashMapEntry_getValue(entry)) );
+ }
+ hashMap_clear(msgTypesMap, true, false);
+ hashMapIterator_destroy(iter);
+}
+
+static char * getMsgDescriptionDir(bundle_pt bundle)
+{
+ char *root = NULL;
+
+ bool isSystemBundle = false;
+ bundle_isSystemBundle(bundle, &isSystemBundle);
+
+ if(isSystemBundle == true) {
+ bundle_context_pt context;
+ bundle_getContext(bundle, &context);
+
+ const char *prop = NULL;
+
+ bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+
+ if(prop != NULL) {
+ root = strdup(prop);
+ } else {
+ root = getcwd(NULL, 0);
+ }
+ } else {
+ char *dir;
+ bundle_getEntry(bundle, ".", &dir);
+ root = dir;
+ }
+
+ return root;
+}
+
+
+static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap)
+{
+ char path[128];
+ struct dirent *entry = NULL;
+ DIR *dir = opendir(root);
+
+ if(dir) {
+ entry = readdir(dir);
+ }
+
+ while (entry != NULL) {
+
+ if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+ printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+ memset(path,0,128);
+ snprintf(path, 128, "%s/%s", root, entry->d_name);
+ FILE *stream = fopen(path,"r");
+
+ if (stream != NULL){
+ dyn_message_type* msgType = NULL;
+
+ int rc = dynMessage_parse(stream, &msgType);
+ if (rc == 0 && msgType!=NULL) {
+
+ char* msgName = NULL;
+ dynMessage_getName(msgType,&msgName);
+
+ if(msgName!=NULL){
+ unsigned int* msgId = malloc(sizeof(unsigned int));
+ *msgId = utils_stringHash(msgName);
+ hashMap_put(msgTypesMap,msgId,msgType);
+ }
+
+ }
+ else{
+ printf("DMU: cannot parse message from descriptor %s\n.",path);
+ }
+ fclose(stream);
+ }else{
+ printf("DMU: cannot open descriptor file %s\n.",path);
+ }
+
+ }
+ entry = readdir(dir);
+ }
+
+ if(dir) {
+ closedir(dir);
+ }
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/src/log_helper.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/log_helper.c b/pubsub/pubsub_common/public/src/log_helper.c
new file mode 100644
index 0000000..b18ef36
--- /dev/null
+++ b/pubsub/pubsub_common/public/src/log_helper.c
@@ -0,0 +1,203 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * log_helper.c
+ *
+ * \date Nov 10, 2014
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <stdarg.h>
+
+#include "bundle_context.h"
+#include "service_tracker.h"
+#include "celix_threads.h"
+#include "array_list.h"
+
+#include "celix_errno.h"
+#include "log_service.h"
+
+#include "log_helper.h"
+
+#define LOGHELPER_ENABLE_STDOUT_FALLBACK_PROPERTY_NAME "LOGHELPER_ENABLE_STDOUT_FALLBACK"
+
+
+struct log_helper {
+ bundle_context_pt bundleContext;
+ service_tracker_pt logServiceTracker;
+ celix_thread_mutex_t logListLock;
+ array_list_pt logServices;
+ bool stdOutFallback;
+};
+
+celix_status_t logHelper_logServiceAdded(void *handle, service_reference_pt reference, void *service);
+celix_status_t logHelper_logServiceRemoved(void *handle, service_reference_pt reference, void *service);
+
+
+celix_status_t logHelper_create(bundle_context_pt context, log_helper_pt* loghelper)
+{
+ celix_status_t status = CELIX_SUCCESS;
+
+ (*loghelper) = calloc(1, sizeof(**loghelper));
+
+ if (!(*loghelper))
+ {
+ status = CELIX_ENOMEM;
+ }
+ else
+ {
+ const char* stdOutFallbackStr = NULL;
+ (*loghelper)->bundleContext = context;
+ (*loghelper)->logServiceTracker = NULL;
+ (*loghelper)->stdOutFallback = false;
+
+ bundleContext_getProperty(context, LOGHELPER_ENABLE_STDOUT_FALLBACK_PROPERTY_NAME, &stdOutFallbackStr);
+
+ if (stdOutFallbackStr != NULL) {
+ (*loghelper)->stdOutFallback = true;
+ }
+
+ pthread_mutex_init(&(*loghelper)->logListLock, NULL);
+ arrayList_create(&(*loghelper)->logServices);
+ }
+
+ return status;
+}
+
+celix_status_t logHelper_start(log_helper_pt loghelper)
+{
+ celix_status_t status = CELIX_SUCCESS;
+ service_tracker_customizer_pt logTrackerCustomizer = NULL;
+
+ status = serviceTrackerCustomizer_create(loghelper, NULL, logHelper_logServiceAdded, NULL, logHelper_logServiceRemoved, &logTrackerCustomizer);
+
+ if (status == CELIX_SUCCESS) {
+ status = serviceTracker_create(loghelper->bundleContext, (char*) OSGI_LOGSERVICE_NAME, logTrackerCustomizer, &loghelper->logServiceTracker);
+ }
+
+ if (status == CELIX_SUCCESS) {
+ status = serviceTracker_open(loghelper->logServiceTracker);
+ }
+
+ return status;
+}
+
+
+
+celix_status_t logHelper_logServiceAdded(void *handle, service_reference_pt reference, void *service)
+{
+ log_helper_pt loghelper = handle;
+
+ pthread_mutex_lock(&loghelper->logListLock);
+ arrayList_add(loghelper->logServices, service);
+ pthread_mutex_unlock(&loghelper->logListLock);
+
+ return CELIX_SUCCESS;
+}
+
+celix_status_t logHelper_logServiceRemoved(void *handle, service_reference_pt reference, void *service)
+{
+ log_helper_pt loghelper = handle;
+
+ pthread_mutex_lock(&loghelper->logListLock);
+ arrayList_removeElement(loghelper->logServices, service);
+ pthread_mutex_unlock(&loghelper->logListLock);
+
+ return CELIX_SUCCESS;
+}
+
+
+celix_status_t logHelper_stop(log_helper_pt loghelper) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = serviceTracker_close(loghelper->logServiceTracker);
+
+ return status;
+}
+
+celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ serviceTracker_destroy((*loghelper)->logServiceTracker);
+
+ pthread_mutex_lock(&(*loghelper)->logListLock);
+ arrayList_destroy((*loghelper)->logServices);
+ pthread_mutex_unlock(&(*loghelper)->logListLock);
+
+ pthread_mutex_destroy(&(*loghelper)->logListLock);
+
+ free(*loghelper);
+ *loghelper = NULL;
+ return status;
+}
+
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
+{
+ celix_status_t status = CELIX_SUCCESS;
+ va_list listPointer;
+ char msg[1024];
+ msg[0] = '\0';
+ bool logged = false;
+
+ va_start(listPointer, message);
+ vsnprintf(msg, 1024, message, listPointer);
+
+ if (loghelper != NULL) {
+ pthread_mutex_lock(&loghelper->logListLock);
+
+ int i = 0;
+
+ for (; i < arrayList_size(loghelper->logServices); i++) {
+ log_service_pt logService = arrayList_get(loghelper->logServices, i);
+
+ if (logService != NULL) {
+ (logService->log)(logService->logger, level, msg);
+ logged = true;
+ }
+ }
+
+ pthread_mutex_unlock(&loghelper->logListLock);
+
+ if (!logged && loghelper->stdOutFallback) {
+ char *levelStr = NULL;
+
+ switch (level) {
+ case OSGI_LOGSERVICE_ERROR:
+ levelStr = "ERROR";
+ break;
+ case OSGI_LOGSERVICE_WARNING:
+ levelStr = "WARNING";
+ break;
+ case OSGI_LOGSERVICE_INFO:
+ levelStr = "INFO";
+ break;
+ case OSGI_LOGSERVICE_DEBUG:
+ default:
+ levelStr = "DEBUG";
+ break;
+ }
+
+ printf("%s: %s\n", levelStr, msg);
+ }
+ }
+
+ return status;
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_endpoint.c b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
new file mode 100644
index 0000000..8586203
--- /dev/null
+++ b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
@@ -0,0 +1,153 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_description.c
+ *
+ * \date 25 Jul 2014
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "celix_errno.h"
+#include "celix_log.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+#include "constants.h"
+#include "subscriber.h"
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* out) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
+
+ if (fwUUID != NULL) {
+ psEp->frameworkUUID = strdup(fwUUID);
+ }
+
+ if (scope != NULL) {
+ psEp->scope = strdup(scope);
+ }
+
+ if (topic != NULL) {
+ psEp->topic = strdup(topic);
+ }
+
+ psEp->serviceID = serviceId;
+
+ if (endpoint != NULL) {
+ psEp->endpoint = strdup(endpoint);
+ }
+
+ if (status != CELIX_SUCCESS) {
+ pubsubEndpoint_destroy(psEp);
+ } else {
+ *out = psEp;
+ }
+
+ return status;
+
+}
+
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* out){
+ celix_status_t status = CELIX_SUCCESS;
+
+ pubsub_endpoint_pt psEp = calloc(1,sizeof(*psEp));
+
+ bundle_pt bundle = NULL;
+ bundle_context_pt ctxt = NULL;
+ const char* fwUUID = NULL;
+ serviceReference_getBundle(reference,&bundle);
+ bundle_getContext(bundle,&ctxt);
+ bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+ const char* scope = NULL;
+ serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope);
+
+ const char* topic = NULL;
+ serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
+
+ const char* serviceId = NULL;
+ serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
+
+ if(fwUUID!=NULL){
+ psEp->frameworkUUID = strdup(fwUUID);
+ }
+
+ if(scope!=NULL){
+ psEp->scope = strdup(scope);
+ } else {
+ psEp->scope = strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
+ }
+
+ if(topic!=NULL){
+ psEp->topic = strdup(topic);
+ }
+
+ if(serviceId!=NULL){
+ psEp->serviceID = strtol(serviceId,NULL,10);
+ }
+
+ if (!psEp->frameworkUUID || !psEp->serviceID || !psEp->scope || !psEp->topic) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!.");
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+
+ if (status != CELIX_SUCCESS) {
+ pubsubEndpoint_destroy(psEp);
+ } else {
+ *out = psEp;
+ }
+
+ return status;
+
+}
+
+celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
+ if (psEp != NULL) {
+ free(psEp->frameworkUUID);
+ free(psEp->scope);
+ free(psEp->topic);
+ free(psEp->endpoint);
+ }
+ free(psEp);
+
+ return CELIX_SUCCESS;
+}
+
+bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
+
+ return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) &&
+ (strcmp(psEp1->scope,psEp2->scope)==0) &&
+ (strcmp(psEp1->topic,psEp2->topic)==0) &&
+ (psEp1->serviceID == psEp2->serviceID) /*&&
+ ((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
+ );
+
+}
+
+char *createScopeTopicKey(const char* scope, const char* topic) {
+ char *result = NULL;
+ asprintf(&result, "%s:%s", scope, topic);
+
+ return result;
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/src/pubsub_serializer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_serializer.c b/pubsub/pubsub_common/public/src/pubsub_serializer.c
new file mode 100644
index 0000000..bb6096a
--- /dev/null
+++ b/pubsub/pubsub_common/public/src/pubsub_serializer.c
@@ -0,0 +1,109 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer_json.c
+ *
+ * \date Dec 7, 2016
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include "pubsub_serializer.h"
+
+#include "utils.h"
+#include "json_serializer.h"
+#include "dyn_msg_utils.h"
+#include "dyn_type.h"
+#include "string.h"
+#include "dyn_message.h"
+#include "dyn_common.h"
+
+struct _pubsub_message_type { /* _dyn_message_type */
+ struct namvals_head header;
+ struct namvals_head annotations;
+ struct types_head types;
+ dyn_type *msgType;
+ version_pt msgVersion;
+};
+
+celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
+ celix_status_t status = CELIX_SUCCESS;
+
+ dyn_type *type = NULL;
+ dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+
+ char *jsonOutput = NULL;
+ int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
+ if (rc != 0){
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+
+ *output = (void *) jsonOutput;
+ *outputLen = strlen(jsonOutput) + 1;
+
+ return status;
+}
+
+celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output){
+ celix_status_t status = CELIX_SUCCESS;
+
+ dyn_type *type = NULL;
+ dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+
+ void *textOutput = NULL;
+ int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+ if (rc != 0){
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+
+ *output = textOutput;
+
+ return status;
+}
+
+unsigned int pubsubSerializer_hashCode(const char *string){
+ return utils_stringHash(string);
+}
+
+version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType){
+ version_pt msgVersion = NULL;
+ dynMessage_getVersion((dyn_message_type *) msgType, &msgVersion);
+ return msgVersion;
+}
+
+char* pubsubSerializer_getName(pubsub_message_type *msgType){
+ char *name = NULL;
+ dynMessage_getName((dyn_message_type *) msgType, &name);
+ return name;
+}
+
+void pubsubSerializer_fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
+ fillMsgTypesMap(msgTypesMap, bundle);
+}
+
+void pubsubSerializer_emptyMsgTypesMap(hash_map_pt msgTypesMap){
+ emptyMsgTypesMap(msgTypesMap);
+}
+
+void pubsubSerializer_freeMsg(pubsub_message_type *msgType, void *msg){
+ dyn_type *type = NULL;
+ dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+ dynType_free(type, msg);
+}
+
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_common/public/src/pubsub_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_utils.c b/pubsub/pubsub_common/public/src/pubsub_utils.c
new file mode 100644
index 0000000..5f1b7ba
--- /dev/null
+++ b/pubsub/pubsub_common/public/src/pubsub_utils.c
@@ -0,0 +1,163 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.c
+ *
+ * \date Sep 24, 2015
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "constants.h"
+
+#include "pubsub_common.h"
+#include "publisher.h"
+#include "pubsub_utils.h"
+
+#include "array_list.h"
+#include "bundle.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#define MAX_KEYBUNDLE_LENGTH 256
+
+char* pubsub_getScopeFromFilter(char* bundle_filter){
+
+ char* scope = NULL;
+
+ char* filter = strdup(bundle_filter);
+
+ char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
+ if(oc!=NULL){
+ oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
+ if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
+
+ char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE);
+ if(scopes!=NULL){
+
+ scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1;
+ char* bottom=strchr(scopes,')');
+ *bottom='\0';
+
+ scope=strdup(scopes);
+ } else {
+ scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+ }
+ }
+ }
+
+ free(filter);
+
+ return scope;
+}
+
+char* pubsub_getTopicFromFilter(char* bundle_filter){
+
+ char* topic = NULL;
+
+ char* filter = strdup(bundle_filter);
+
+ char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
+ if(oc!=NULL){
+ oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
+ if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
+
+ char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC);
+ if(topics!=NULL){
+
+ topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1;
+ char* bottom=strchr(topics,')');
+ *bottom='\0';
+
+ topic=strdup(topics);
+
+ }
+ }
+ }
+
+ free(filter);
+
+ return topic;
+
+}
+
+array_list_pt pubsub_getTopicsFromString(char* string){
+
+ array_list_pt topic_list = NULL;
+ arrayList_create(&topic_list);
+
+ char* topics = strdup(string);
+
+ char* topic = strtok(topics,",;|# ");
+ arrayList_add(topic_list,strdup(topic));
+
+ while( (topic = strtok(NULL,",;|# ")) !=NULL){
+ arrayList_add(topic_list,strdup(topic));
+ }
+
+ free(topics);
+
+ return topic_list;
+
+}
+
+/**
+ * Loop through all bundles and look for the bundle with the keys inside.
+ * If no key bundle found, return NULL
+ *
+ * Caller is responsible for freeing the object
+ */
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
+{
+ array_list_pt bundles = NULL;
+ bundleContext_getBundles(ctx, &bundles);
+ int nrOfBundles = arrayList_size(bundles);
+
+ char* result = NULL;
+
+ for (int i = 0; i < nrOfBundles; i++){
+ bundle_pt b = arrayList_get(bundles, i);
+ char* dir = NULL;
+ bundle_getEntry(b, ".", &dir);
+
+ char cert_dir[MAX_KEYBUNDLE_LENGTH];
+ snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", dir);
+
+ struct stat s;
+ int err = stat(cert_dir, &s);
+ if (err != -1){
+ if (S_ISDIR(s.st_mode)){
+ result = dir;
+ break;
+ }
+ }
+
+ free(dir);
+ }
+
+ arrayList_destroy(bundles);
+
+ return result;
+}
+
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/CMakeLists.txt b/pubsub/pubsub_discovery/CMakeLists.txt
new file mode 100644
index 0000000..f568f0c
--- /dev/null
+++ b/pubsub/pubsub_discovery/CMakeLists.txt
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(CURL REQUIRED)
+find_package(Jansson REQUIRED)
+
+include_directories("${CURL_INCLUDE_DIR}")
+include_directories("${JANSSON_INCLUDE_DIR}")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
+include_directories("${PROJECT_SOURCE_DIR}/etcdlib/public/include")
+include_directories("private/include")
+include_directories("public/include")
+
+add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_discovery_etcd"
+ VERSION "1.0.0"
+ SOURCES
+ private/src/psd_activator.c
+ private/src/pubsub_discovery_impl.c
+ private/src/etcd_common.c
+ private/src/etcd_watcher.c
+ private/src/etcd_writer.c
+ ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+)
+
+target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery celix_framework celix_utils etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
+install_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery)
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/private/include/etcd_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/etcd_common.h b/pubsub/pubsub_discovery/private/include/etcd_common.h
new file mode 100644
index 0000000..7a3e7b6
--- /dev/null
+++ b/pubsub/pubsub_discovery/private/include/etcd_common.h
@@ -0,0 +1,28 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_COMMON_H_
+#define ETCD_COMMON_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+celix_status_t etcdCommon_init(bundle_context_pt context);
+
+#endif /* ETCD_COMMON_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/private/include/etcd_watcher.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/etcd_watcher.h b/pubsub/pubsub_discovery/private/include/etcd_watcher.h
new file mode 100644
index 0000000..c425e60
--- /dev/null
+++ b/pubsub/pubsub_discovery/private/include/etcd_watcher.h
@@ -0,0 +1,38 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_WATCHER_H_
+#define ETCD_WATCHER_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_endpoint.h"
+
+typedef struct etcd_watcher *etcd_watcher_pt;
+
+celix_status_t etcdWatcher_create(pubsub_discovery_pt discovery, bundle_context_pt context, const char *scope, const char* topic, etcd_watcher_pt *watcher);
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
+celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher);
+
+celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt discovery, const char* key, const char* value, pubsub_endpoint_pt* pubEP);
+
+
+#endif /* ETCD_WATCHER_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/private/include/etcd_writer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/etcd_writer.h b/pubsub/pubsub_discovery/private/include/etcd_writer.h
new file mode 100644
index 0000000..3ff98b9
--- /dev/null
+++ b/pubsub/pubsub_discovery/private/include/etcd_writer.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_WRITER_H_
+#define ETCD_WRITER_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_endpoint.h"
+
+typedef struct etcd_writer *etcd_writer_pt;
+
+
+etcd_writer_pt etcdWriter_create(pubsub_discovery_pt discovery);
+void etcdWriter_destroy(etcd_writer_pt writer);
+
+celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP,bool storeEP);
+celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP);
+
+
+#endif /* ETCD_WRITER_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
new file mode 100644
index 0000000..d5be8d6
--- /dev/null
+++ b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
@@ -0,0 +1,73 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_DISCOVERY_IMPL_H_
+#define PUBSUB_DISCOVERY_IMPL_H_
+
+#include "bundle_context.h"
+#include "service_reference.h"
+
+#include "etcd_watcher.h"
+#include "etcd_writer.h"
+#include "pubsub_endpoint.h"
+
+#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
+
+struct watcher_info {
+ etcd_watcher_pt watcher;
+ int nr_references;
+};
+
+struct pubsub_discovery {
+ bundle_context_pt context;
+
+ celix_thread_mutex_t discoveredPubsMutex;
+ hash_map_pt discoveredPubs; //<topic,List<pubsub_endpoint_pt>>
+
+ celix_thread_mutex_t listenerReferencesMutex;
+ hash_map_pt listenerReferences; //key=serviceReference, value=nop
+
+ celix_thread_mutex_t watchersMutex;
+ hash_map_pt watchers; //key = topicname, value = struct watcher_info
+
+ etcd_writer_pt writer;
+};
+
+
+celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt* node_discovery);
+celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt node_discovery);
+celix_status_t pubsub_discovery_start(pubsub_discovery_pt node_discovery);
+celix_status_t pubsub_discovery_stop(pubsub_discovery_pt node_discovery);
+
+celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, service_reference_pt reference, void **service);
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic);
+celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic);
+
+celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt discovery, pubsub_endpoint_pt endpoint, bool endpointAdded);
+
+#endif /* PUBSUB_DISCOVERY_IMPL_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/private/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_common.c b/pubsub/pubsub_discovery/private/src/etcd_common.c
new file mode 100644
index 0000000..a53a844
--- /dev/null
+++ b/pubsub/pubsub_discovery/private/src/etcd_common.c
@@ -0,0 +1,81 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include <curl/curl.h>
+#include "etcd.h"
+#include "etcd_watcher.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+#define MAX_ROOTNODE_LENGTH 128
+#define MAX_LOCALNODE_LENGTH 4096
+#define MAX_FIELD_LENGTH 128
+
+#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+
+
+celix_status_t etcdCommon_init(bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ const char* etcd_server = NULL;
+ const char* etcd_port_string = NULL;
+ int etcd_port = 0;
+
+ if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS) || !etcd_server) {
+ etcd_server = DEFAULT_ETCD_SERVER_IP;
+ }
+
+ if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_PORT, &etcd_port_string) != CELIX_SUCCESS) || !etcd_port_string) {
+ etcd_port = DEFAULT_ETCD_SERVER_PORT;
+ } else {
+ char* endptr = NULL;
+ errno = 0;
+ etcd_port = strtol(etcd_port_string, &endptr, 10);
+ if (*endptr || errno != 0) {
+ etcd_port = DEFAULT_ETCD_SERVER_PORT;
+ }
+ }
+
+ printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port);
+
+ if (etcd_init(etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ status = CELIX_SUCCESS;
+ }
+
+ return status;
+}
+
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_watcher.c b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
new file mode 100644
index 0000000..0d8468e
--- /dev/null
+++ b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
@@ -0,0 +1,290 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include "etcd.h"
+#include "etcd_watcher.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+
+
+#define MAX_ROOTNODE_LENGTH 128
+#define MAX_LOCALNODE_LENGTH 4096
+#define MAX_FIELD_LENGTH 128
+
+#define CFG_ETCD_ROOT_PATH "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH "pubsub/discovery"
+
+#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+
+
+struct etcd_watcher {
+ pubsub_discovery_pt pubsub_discovery;
+
+ celix_thread_mutex_t watcherLock;
+ celix_thread_t watcherThread;
+
+ char *scope;
+ char *topic;
+ volatile bool running;
+};
+
+struct etcd_writer {
+ pubsub_discovery_pt pubsub_discovery;
+ celix_thread_mutex_t localPubsLock;
+ array_list_pt localPubs;
+ volatile bool running;
+ celix_thread_t writerThread;
+};
+
+
+// note that the rootNode shouldn't have a leading slash
+static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, const char *scope, const char *topic, char* rootNode, int rootNodeLen) {
+ celix_status_t status = CELIX_SUCCESS;
+ const char* rootPath = NULL;
+
+ if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+ snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic);
+ } else {
+ snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic);
+ }
+
+ return status;
+}
+
+static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) {
+ celix_status_t status = CELIX_SUCCESS;
+ const char* rootPath = NULL;
+
+ if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+ strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
+ } else {
+ strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
+ }
+
+ return status;
+}
+
+
+static void add_node(const char *key, const char *value, void* arg) {
+ pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
+ pubsub_endpoint_pt pubEP = NULL;
+ celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
+ if(!status && pubEP) {
+ pubsub_discovery_addNode(ps_discovery, pubEP);
+ }
+}
+
+static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) {
+ celix_status_t status = CELIX_SUCCESS;
+ if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ return status;
+}
+
+// gets everything from provided key
+celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsub_discovery, const char* etcdKey, const char* etcdValue, pubsub_endpoint_pt* pubEP) {
+
+ celix_status_t status = CELIX_SUCCESS;
+
+ char rootPath[MAX_ROOTNODE_LENGTH];
+ char *expr = NULL;
+ char scope[MAX_FIELD_LENGTH];
+ char topic[MAX_FIELD_LENGTH];
+ char fwUUID[MAX_FIELD_LENGTH];
+ char serviceId[MAX_FIELD_LENGTH];
+
+ memset(rootPath,0,MAX_ROOTNODE_LENGTH);
+ memset(topic,0,MAX_FIELD_LENGTH);
+ memset(fwUUID,0,MAX_FIELD_LENGTH);
+ memset(serviceId,0,MAX_FIELD_LENGTH);
+
+ etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
+
+ asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
+ if(expr) {
+ int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId);
+ free(expr);
+ if (foundItems != 4) { // Could happen when a directory is removed, just don't process this.
+ status = CELIX_ILLEGAL_STATE;
+ }
+ else{
+ status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,pubEP);
+ }
+ }
+ return status;
+}
+
+/*
+ * performs (blocking) etcd_watch calls to check for
+ * changing discovery endpoint information within etcd.
+ */
+static void* etcdWatcher_run(void* data) {
+ etcd_watcher_pt watcher = (etcd_watcher_pt) data;
+ time_t timeBeforeWatch = time(NULL);
+ char rootPath[MAX_ROOTNODE_LENGTH];
+ long long highestModified = 0;
+
+ pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
+ bundle_context_pt context = ps_discovery->context;
+
+ memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
+
+ //TODO: add topic to etcd key
+ etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
+ etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified);
+
+ while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) {
+
+ char *rkey = NULL;
+ char *value = NULL;
+ char *preValue = NULL;
+ char *action = NULL;
+ long long modIndex;
+
+ celixThreadMutex_unlock(&watcher->watcherLock);
+
+ if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
+ pubsub_endpoint_pt pubEP = NULL;
+ if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) {
+ if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
+ pubsub_discovery_addNode(ps_discovery, pubEP);
+ }
+ } else if (strcmp(action, "delete") == 0) {
+ if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
+ pubsub_discovery_removeNode(ps_discovery, pubEP);
+ }
+ } else if (strcmp(action, "expire") == 0) {
+ if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
+ pubsub_discovery_removeNode(ps_discovery, pubEP);
+ }
+ } else if (strcmp(action, "update") == 0) {
+ if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
+ pubsub_discovery_addNode(ps_discovery, pubEP);
+ }
+ } else {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action);
+ }
+ highestModified = modIndex;
+ } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
+ sleep(DEFAULT_ETCD_TTL / 4);
+ }
+
+ FREE_MEM(action);
+ FREE_MEM(value);
+ FREE_MEM(preValue);
+ FREE_MEM(rkey);
+
+ /* prevent busy waiting, in case etcd_watch returns false */
+
+
+ if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
+ timeBeforeWatch = time(NULL);
+ }
+
+ }
+
+ if (watcher->running == false) {
+ celixThreadMutex_unlock(&watcher->watcherLock);
+ }
+
+ return NULL;
+}
+
+celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) {
+ celix_status_t status = CELIX_SUCCESS;
+
+
+ if (pubsub_discovery == NULL) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ (*watcher) = calloc(1, sizeof(struct etcd_watcher));
+
+ if(*watcher == NULL){
+ return CELIX_ENOMEM;
+ }
+
+ (*watcher)->pubsub_discovery = pubsub_discovery;
+ (*watcher)->scope = strdup(scope);
+ (*watcher)->topic = strdup(topic);
+
+ celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
+
+ celixThreadMutex_lock(&(*watcher)->watcherLock);
+
+ status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher);
+ if (status == CELIX_SUCCESS) {
+ (*watcher)->running = true;
+ }
+
+ celixThreadMutex_unlock(&(*watcher)->watcherLock);
+
+ return status;
+}
+
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
+
+ celix_status_t status = CELIX_SUCCESS;
+
+ char rootPath[MAX_ROOTNODE_LENGTH];
+ etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
+ celixThreadMutex_destroy(&(watcher->watcherLock));
+
+ free(watcher->scope);
+ free(watcher->topic);
+ free(watcher);
+
+ return status;
+}
+
+celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){
+ celix_status_t status = CELIX_SUCCESS;
+
+ celixThreadMutex_lock(&(watcher->watcherLock));
+ watcher->running = false;
+ celixThreadMutex_unlock(&(watcher->watcherLock));
+
+ watcher->running = false;
+
+ celixThread_join(watcher->watcherThread, NULL);
+
+ return status;
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/private/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_writer.c b/pubsub/pubsub_discovery/private/src/etcd_writer.c
new file mode 100644
index 0000000..687d802
--- /dev/null
+++ b/pubsub/pubsub_discovery/private/src/etcd_writer.c
@@ -0,0 +1,189 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include "etcd.h"
+#include "etcd_writer.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+#define MAX_ROOTNODE_LENGTH 128
+
+#define CFG_ETCD_ROOT_PATH "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH "pubsub/discovery"
+
+#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+
+struct etcd_writer {
+ pubsub_discovery_pt pubsub_discovery;
+ celix_thread_mutex_t localPubsLock;
+ array_list_pt localPubs;
+ volatile bool running;
+ celix_thread_t writerThread;
+};
+
+
+static const char* etcdWriter_getRootPath(bundle_context_pt context);
+static void* etcdWriter_run(void* data);
+
+
+etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
+ etcd_writer_pt writer = calloc(1, sizeof(*writer));
+ if(writer) {
+ celixThreadMutex_create(&writer->localPubsLock, NULL);
+ arrayList_create(&writer->localPubs);
+ writer->pubsub_discovery = disc;
+ writer->running = true;
+ celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer);
+ }
+ return writer;
+}
+
+void etcdWriter_destroy(etcd_writer_pt writer) {
+ char dir[MAX_ROOTNODE_LENGTH];
+ const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+ writer->running = false;
+ celixThread_join(writer->writerThread, NULL);
+
+ celixThreadMutex_lock(&writer->localPubsLock);
+ for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
+ memset(dir,0,MAX_ROOTNODE_LENGTH);
+ snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
+ etcd_del(dir);
+ pubsubEndpoint_destroy(pubEP);
+ }
+ arrayList_destroy(writer->localPubs);
+
+ celixThreadMutex_unlock(&writer->localPubsLock);
+ celixThreadMutex_destroy(&(writer->localPubsLock));
+
+ free(writer);
+}
+
+celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+ if(storeEP){
+ const char *fwUUID = NULL;
+ bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+ if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
+ celixThreadMutex_lock(&writer->localPubsLock);
+ pubsub_endpoint_pt p = NULL;
+ pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
+ arrayList_add(writer->localPubs,p);
+ celixThreadMutex_unlock(&writer->localPubsLock);
+ }
+ }
+
+ char *key;
+
+ const char* ttlStr = NULL;
+ int ttl = 0;
+
+ // determine ttl
+ if ((bundleContext_getProperty(writer->pubsub_discovery->context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) {
+ ttl = DEFAULT_ETCD_TTL;
+ } else {
+ char* endptr = NULL;
+ errno = 0;
+ ttl = strtol(ttlStr, &endptr, 10);
+ if (*endptr || errno != 0) {
+ ttl = DEFAULT_ETCD_TTL;
+ }
+ }
+
+ const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+ asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID);
+
+ if(!etcd_set(key,pubEP->endpoint,ttl,false)){
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ FREE_MEM(key);
+ return status;
+}
+
+celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) {
+ celix_status_t status = CELIX_SUCCESS;
+ char *key = NULL;
+
+ const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+ asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID);
+
+ celixThreadMutex_lock(&writer->localPubsLock);
+ for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
+ pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
+ if (pubsubEndpoint_equals(ep, pubEP)) {
+ arrayList_remove(writer->localPubs, i);
+ pubsubEndpoint_destroy(ep);
+ break;
+ }
+ }
+ celixThreadMutex_unlock(&writer->localPubsLock);
+
+ if (etcd_del(key)) {
+ printf("Failed to remove key %s from ETCD\n",key);
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ FREE_MEM(key);
+ return status;
+}
+
+static void* etcdWriter_run(void* data) {
+ etcd_writer_pt writer = (etcd_writer_pt)data;
+ while(writer->running) {
+ celixThreadMutex_lock(&writer->localPubsLock);
+ for(int i=0; i < arrayList_size(writer->localPubs); i++) {
+ etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
+ }
+ celixThreadMutex_unlock(&writer->localPubsLock);
+ sleep(DEFAULT_ETCD_TTL / 2);
+ }
+
+ return NULL;
+}
+
+static const char* etcdWriter_getRootPath(bundle_context_pt context) {
+ const char* rootPath = NULL;
+ bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
+ if(rootPath == NULL) {
+ rootPath = DEFAULT_ETCD_ROOTPATH;
+ }
+ return rootPath;
+}
+
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/private/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/psd_activator.c b/pubsub/pubsub_discovery/private/src/psd_activator.c
new file mode 100644
index 0000000..afbe282
--- /dev/null
+++ b/pubsub/pubsub_discovery/private/src/psd_activator.c
@@ -0,0 +1,171 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "bundle_activator.h"
+#include "service_tracker.h"
+#include "service_registration.h"
+#include "constants.h"
+#include "celix_log.h"
+
+#include "pubsub_common.h"
+#include "publisher_endpoint_announce.h"
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+struct activator {
+ bundle_context_pt context;
+ pubsub_discovery_pt pubsub_discovery;
+
+ service_tracker_pt pstmPublishersTracker;
+
+ publisher_endpoint_announce_pt publisherEPAnnounce;
+ service_registration_pt publisherEPAnnounceService;
+};
+
+static celix_status_t createTMPublisherAnnounceTracker(struct activator *activator, service_tracker_pt *tracker) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ service_tracker_customizer_pt customizer = NULL;
+
+ status = serviceTrackerCustomizer_create(activator->pubsub_discovery,
+ pubsub_discovery_tmPublisherAnnounceAdding,
+ pubsub_discovery_tmPublisherAnnounceAdded,
+ pubsub_discovery_tmPublisherAnnounceModified,
+ pubsub_discovery_tmPublisherAnnounceRemoved,
+ &customizer);
+
+ if (status == CELIX_SUCCESS) {
+ status = serviceTracker_create(activator->context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, customizer, tracker);
+ }
+
+ return status;
+}
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ struct activator* activator = calloc(1, sizeof(*activator));
+
+ if (activator) {
+ activator->context = context;
+ activator->pstmPublishersTracker = NULL;
+ activator->publisherEPAnnounce = NULL;
+ activator->publisherEPAnnounceService = NULL;
+
+ status = pubsub_discovery_create(context, &activator->pubsub_discovery);
+
+ if (status == CELIX_SUCCESS) {
+ status = createTMPublisherAnnounceTracker(activator, &(activator->pstmPublishersTracker));
+ }
+
+ if (status == CELIX_SUCCESS) {
+ *userData = activator;
+ } else {
+ free(activator);
+ }
+ } else {
+ status = CELIX_ENOMEM;
+ }
+
+ return status;
+
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ struct activator *activator = userData;
+
+ publisher_endpoint_announce_pt pubEPAnnouncer = calloc(1, sizeof(*pubEPAnnouncer));
+
+ if (pubEPAnnouncer) {
+
+ pubEPAnnouncer->handle = activator->pubsub_discovery;
+ pubEPAnnouncer->announcePublisher = pubsub_discovery_announcePublisher;
+ pubEPAnnouncer->removePublisher = pubsub_discovery_removePublisher;
+ pubEPAnnouncer->interestedInTopic = pubsub_discovery_interestedInTopic;
+ pubEPAnnouncer->uninterestedInTopic = pubsub_discovery_uninterestedInTopic;
+ activator->publisherEPAnnounce = pubEPAnnouncer;
+
+ properties_pt props = properties_create();
+ properties_set(props, "PUBSUB_DISCOVERY", "true");
+
+ // pubsub_discovery_start needs to be first to initalize the propert etcd_watcher values
+ status = pubsub_discovery_start(activator->pubsub_discovery);
+
+ if (status == CELIX_SUCCESS) {
+ status = serviceTracker_open(activator->pstmPublishersTracker);
+ }
+
+ if (status == CELIX_SUCCESS) {
+ status = bundleContext_registerService(context, (char *) PUBSUB_DISCOVERY_SERVICE, pubEPAnnouncer, props, &activator->publisherEPAnnounceService);
+ }
+
+
+ }
+ else{
+ status = CELIX_ENOMEM;
+ }
+
+ if(status!=CELIX_SUCCESS && pubEPAnnouncer!=NULL){
+ free(pubEPAnnouncer);
+ }
+
+
+ return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+
+ status += pubsub_discovery_stop(activator->pubsub_discovery);
+
+ status += serviceTracker_close(activator->pstmPublishersTracker);
+
+ status += serviceRegistration_unregister(activator->publisherEPAnnounceService);
+
+ if (status == CELIX_SUCCESS) {
+ free(activator->publisherEPAnnounce);
+ }
+
+ return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+
+ status += serviceTracker_destroy(activator->pstmPublishersTracker);
+ status += pubsub_discovery_destroy(activator->pubsub_discovery);
+
+ activator->publisherEPAnnounce = NULL;
+ activator->publisherEPAnnounceService = NULL;
+ activator->pstmPublishersTracker = NULL;
+ activator->pubsub_discovery = NULL;
+ activator->context = NULL;
+
+ free(activator);
+
+ return status;
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
new file mode 100644
index 0000000..0c7d6c4
--- /dev/null
+++ b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
@@ -0,0 +1,468 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <netdb.h>
+#include <netinet/in.h>
+
+#include "constants.h"
+#include "celix_threads.h"
+#include "bundle_context.h"
+#include "array_list.h"
+#include "utils.h"
+#include "celix_errno.h"
+#include "filter.h"
+#include "service_reference.h"
+#include "service_registration.h"
+
+#include "publisher_endpoint_announce.h"
+#include "etcd_common.h"
+#include "etcd_watcher.h"
+#include "etcd_writer.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_discovery_impl.h"
+
+/* Discovery activator functions */
+celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *ps_discovery = calloc(1, sizeof(**ps_discovery));
+
+ if (*ps_discovery == NULL) {
+ status = CELIX_ENOMEM;
+ }
+ else{
+ (*ps_discovery)->context = context;
+ (*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
+ (*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL);
+ celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL);
+ celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL);
+ celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL);
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
+
+ hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->discoveredPubs);
+
+ while (hashMapIterator_hasNext(iter)) {
+ array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
+
+ for(int i=0; i < arrayList_size(pubEP_list); i++) {
+ pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i)));
+ }
+ arrayList_destroy(pubEP_list);
+ }
+
+ hashMapIterator_destroy(iter);
+
+ hashMap_destroy(ps_discovery->discoveredPubs, true, false);
+ ps_discovery->discoveredPubs = NULL;
+
+ celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
+
+ celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex);
+
+
+ celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex);
+
+ hashMap_destroy(ps_discovery->listenerReferences, false, false);
+ ps_discovery->listenerReferences = NULL;
+
+ celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex);
+
+ celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex);
+
+ free(ps_discovery);
+
+ return status;
+}
+
+celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) {
+ celix_status_t status = CELIX_SUCCESS;
+ status = etcdCommon_init(ps_discovery->context);
+ ps_discovery->writer = etcdWriter_create(ps_discovery);
+
+ return status;
+}
+
+celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ const char* fwUUID = NULL;
+
+ bundleContext_getProperty(ps_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+ if (fwUUID == NULL) {
+ printf("PSD: Cannot retrieve fwUUID.\n");
+ return CELIX_INVALID_BUNDLE_CONTEXT;
+ }
+
+ celixThreadMutex_lock(&ps_discovery->watchersMutex);
+
+ hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->watchers);
+ while (hashMapIterator_hasNext(iter)) {
+ struct watcher_info * wi = hashMapIterator_nextValue(iter);
+ etcdWatcher_stop(wi->watcher);
+ }
+ hashMapIterator_destroy(iter);
+
+ celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
+
+ /* Unexport all publishers for the local framework, and also delete from ETCD publisher belonging to the local framework */
+
+ iter = hashMapIterator_create(ps_discovery->discoveredPubs);
+ while (hashMapIterator_hasNext(iter)) {
+ array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
+
+ int i;
+ for (i = 0; i < arrayList_size(pubEP_list); i++) {
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(pubEP_list, i);
+ if (strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
+ etcdWriter_deletePublisherEndpoint(ps_discovery->writer, pubEP);
+ } else {
+ pubsub_discovery_informPublishersListeners(ps_discovery, pubEP, false);
+ arrayList_remove(pubEP_list, i);
+ pubsubEndpoint_destroy(pubEP);
+ i--;
+ }
+ }
+ }
+
+ hashMapIterator_destroy(iter);
+
+ celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
+ etcdWriter_destroy(ps_discovery->writer);
+
+ iter = hashMapIterator_create(ps_discovery->watchers);
+ while (hashMapIterator_hasNext(iter)) {
+ struct watcher_info * wi = hashMapIterator_nextValue(iter);
+ etcdWatcher_destroy(wi->watcher);
+ }
+ hashMapIterator_destroy(iter);
+ hashMap_destroy(ps_discovery->watchers, true, true);
+ celixThreadMutex_unlock(&ps_discovery->watchersMutex);
+
+ return status;
+}
+
+/* Functions called by the etcd_watcher */
+
+celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
+ celix_status_t status = CELIX_SUCCESS;
+ bool inform=false;
+ celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+
+ char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+ array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
+ if(pubEP_list==NULL){
+ arrayList_create(&pubEP_list);
+ arrayList_add(pubEP_list,pubEP);
+ hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list);
+ inform=true;
+ }
+ else{
+ int i;
+ bool found = false;
+ for(i=0;i<arrayList_size(pubEP_list) && !found;i++){
+ found = pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i));
+ }
+ if(found){
+ pubsubEndpoint_destroy(pubEP);
+ }
+ else{
+ arrayList_add(pubEP_list,pubEP);
+ inform=true;
+ }
+ }
+ free(pubs_key);
+
+ celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+ if(inform){
+ status = pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true);
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
+ celix_status_t status = CELIX_SUCCESS;
+ pubsub_endpoint_pt p = NULL;
+ bool found = false;
+
+ celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+ char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+ array_list_pt pubEP_list = (array_list_pt) hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
+ free(pubs_key);
+ if (pubEP_list == NULL) {
+ printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n", pubEP->topic);
+ status = CELIX_ILLEGAL_STATE;
+ } else {
+ int i;
+
+ for (i = 0; !found && i < arrayList_size(pubEP_list); i++) {
+ p = arrayList_get(pubEP_list, i);
+ found = pubsubEndpoint_equals(pubEP, p);
+ if (found) {
+ arrayList_remove(pubEP_list, i);
+ pubsubEndpoint_destroy(p);
+ }
+ }
+ }
+
+ celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+ if (found) {
+ status = pubsub_discovery_informPublishersListeners(pubsub_discovery, pubEP, false);
+ }
+ pubsubEndpoint_destroy(pubEP);
+
+ return status;
+}
+
+/* Callback to the pubsub_topology_manager */
+celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ // Inform listeners of new publisher endpoint
+ celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+
+ if (pubsub_discovery->listenerReferences != NULL) {
+ hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->listenerReferences);
+ while (hashMapIterator_hasNext(iter)) {
+ service_reference_pt reference = hashMapIterator_nextKey(iter);
+
+ publisher_endpoint_announce_pt listener = NULL;
+
+ bundleContext_getService(pubsub_discovery->context, reference, (void**) &listener);
+ if (epAdded) {
+ listener->announcePublisher(listener->handle, pubEP);
+ } else {
+ listener->removePublisher(listener->handle, pubEP);
+ }
+ bundleContext_ungetService(pubsub_discovery->context, reference, NULL);
+ }
+ hashMapIterator_destroy(iter);
+ }
+
+ celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+
+ return status;
+}
+
+
+/* Service's functions implementation */
+celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) {
+ celix_status_t status = CELIX_SUCCESS;
+ printf("pubsub_discovery_announcePublisher : %s / %s\n", pubEP->topic, pubEP->endpoint);
+ pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+ celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+
+ char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
+ array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
+
+ if(pubEP_list==NULL){
+ arrayList_create(&pubEP_list);
+ hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list);
+ }
+ free(pub_key);
+ pubsub_endpoint_pt p = NULL;
+ pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
+
+ arrayList_add(pubEP_list,p);
+
+ status = etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true);
+
+ celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+ return status;
+}
+
+celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+ celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+
+ char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
+ array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
+ free(pub_key);
+ if(pubEP_list==NULL){
+ printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",pubEP->topic);
+ status = CELIX_ILLEGAL_STATE;
+ }
+ else{
+
+ int i;
+ bool found = false;
+ pubsub_endpoint_pt p = NULL;
+
+ for(i=0;!found && i<arrayList_size(pubEP_list);i++){
+ p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+ found = pubsubEndpoint_equals(pubEP,p);
+ }
+
+ if(!found){
+ printf("PSD: Trying to remove a not existing endpoint. Something is not consistent.\n");
+ status = CELIX_ILLEGAL_STATE;
+ }
+ else{
+
+ arrayList_removeElement(pubEP_list,p);
+
+ status = etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p);
+
+ pubsubEndpoint_destroy(p);
+ }
+ }
+
+ celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+ return status;
+}
+
+celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) {
+ pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+ char *scope_topic_key = createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
+ struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, scope_topic_key);
+ if(wi) {
+ wi->nr_references++;
+ free(scope_topic_key);
+ } else {
+ wi = calloc(1, sizeof(*wi));
+ etcdWatcher_create(pubsub_discovery, pubsub_discovery->context, scope, topic, &wi->watcher);
+ wi->nr_references = 1;
+ hashMap_put(pubsub_discovery->watchers, scope_topic_key, wi);
+ }
+
+ celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
+
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic) {
+ pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+ char *scope_topic_key = createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
+
+ hash_map_entry_pt entry = hashMap_getEntry(pubsub_discovery->watchers, scope_topic_key);
+ if(entry) {
+ struct watcher_info * wi = hashMapEntry_getValue(entry);
+ wi->nr_references--;
+ if(wi->nr_references == 0) {
+ char *key = hashMapEntry_getKey(entry);
+ hashMap_remove(pubsub_discovery->watchers, scope_topic_key);
+ free(key);
+ free(scope_topic_key);
+ etcdWatcher_stop(wi->watcher);
+ etcdWatcher_destroy(wi->watcher);
+ free(wi);
+ }
+ } else {
+ fprintf(stderr, "[DISC] Inconsistency error: Removing unknown topic %s\n", topic);
+ }
+ celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
+ return CELIX_SUCCESS;
+}
+
+/* pubsub_topology_manager tracker callbacks */
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, service_reference_pt reference, void **service) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
+
+ status = bundleContext_getService(pubsub_discovery->context, reference, service);
+
+ return status;
+}
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
+ publisher_endpoint_announce_pt listener = (publisher_endpoint_announce_pt)service;
+
+ celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+ celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+
+ /* Notify the PSTM about discovered publisher endpoints */
+ hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->discoveredPubs);
+ while(hashMapIterator_hasNext(iter)){
+ array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
+ int i;
+ for(i=0;i<arrayList_size(pubEP_list);i++){
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+ status += listener->announcePublisher(listener->handle, pubEP);
+ }
+ }
+
+ hashMapIterator_destroy(iter);
+
+ hashMap_put(pubsub_discovery->listenerReferences, reference, NULL);
+
+ celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+ celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+ printf("PSD: pubsub_tm_announce_publisher added.\n");
+
+ return status;
+}
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, service);
+ if (status == CELIX_SUCCESS) {
+ status = pubsub_discovery_tmPublisherAnnounceAdded(handle, reference, service);
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service) {
+ celix_status_t status = CELIX_SUCCESS;
+ pubsub_discovery_pt pubsub_discovery = handle;
+
+ celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+
+ if (pubsub_discovery->listenerReferences != NULL) {
+ if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) {
+ printf("PSD: pubsub_tm_announce_publisher removed.\n");
+ }
+ }
+ celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+
+ return status;
+}
+