You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:30 UTC
[13/34] celix git commit: CELIX-454: Refactors the pubsub zmq admin.
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
new file mode 100644
index 0000000..9477055
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -0,0 +1,556 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <pubsub_common.h>
+#include <zconf.h>
+#include <arpa/inet.h>
+#include <czmq.h>
+#include <log_helper.h>
+#include "pubsub_zmq_topic_sender.h"
+#include "pubsub_psa_zmq_constants.h"
+#include "pubsub_zmq_common.h"
+
+#define FIRST_SEND_DELAY_IN_SECONDS 2
+#define ZMQ_BIND_MAX_RETRY 10
+
+#define L_DEBUG(...) \
+ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_zmq_topic_sender {
+ celix_bundle_context_t *ctx;
+ log_helper_t *logHelper;
+
+ char *scope;
+ char *topic;
+ char *url;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ zsock_t *socket;
+ zcert_t *cert;
+ } zmq;
+
+ long serTrackerId;
+ struct {
+ celix_thread_mutex_t mutex;
+ pubsub_serializer_service_t *svc;
+ const celix_properties_t *props;
+ } serializer;
+
+ struct {
+ long svcId;
+ celix_service_factory_t factory;
+ } publisher;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = bndId, value = psa_zmq_bounded_service_entry_t
+ } boundedServices;
+};
+
+typedef struct psa_zmq_bounded_service_entry {
+ pubsub_zmq_topic_sender_t *parent;
+ pubsub_publisher_t service;
+ long bndId;
+ hash_map_t *msgTypes;
+ int getCount;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ bool inProgress;
+ celix_array_list_t *parts;
+ } multipart;
+} psa_zmq_bounded_service_entry_t;
+
+
+typedef struct pubsub_msg {
+ pubsub_msg_header_t *header;
+ char* payload;
+ int payloadSize;
+} pubsub_msg_t;
+
+static void pubsub_zmqTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
+static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static unsigned int rand_range(unsigned int min, unsigned int max);
+static void delay_first_send_for_late_joiners(pubsub_zmq_topic_sender_t *sender);
+
+static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
+static int psa_zmq_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags);
+
+pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
+ celix_bundle_context_t *ctx,
+ log_helper_t *logHelper,
+ const char *scope,
+ const char *topic,
+ long serializerSvcId,
+ const char *bindIP,
+ unsigned int basePort,
+ unsigned int maxPort) {
+ pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
+ sender->ctx = ctx;
+ sender->logHelper = logHelper;
+
+ //setting up zmq socket for ZMQ TopicSender
+ {
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ char* secure_topics = NULL;
+ bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
+
+ if (secure_topics){
+ array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
+
+ int i;
+ int secure_topics_size = arrayList_size(secure_topics_list);
+ for (i = 0; i < secure_topics_size; i++){
+ char* top = arrayList_get(secure_topics_list, i);
+ if (strcmp(pubEP->topic, top) == 0){
+ printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
+ pubEP->is_secure = true;
+ }
+ free(top);
+ top = NULL;
+ }
+
+ arrayList_destroy(secure_topics_list);
+ }
+
+ zcert_t* pub_cert = NULL;
+ if (pubEP->is_secure){
+ char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
+ if (keys_bundle_dir == NULL){
+ return CELIX_SERVICE_EXCEPTION;
+ }
+
+ const char* keys_file_path = NULL;
+ const char* keys_file_name = NULL;
+ bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
+ bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
+
+ char cert_path[MAX_CERT_PATH_LENGTH];
+
+ //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
+ snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic);
+ free(keys_bundle_dir);
+ printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
+
+ pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path);
+ if (pub_cert == NULL){
+ printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
+ printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
+ pubEP->is_secure = false;
+ }
+ }
+#endif
+
+ zsock_t* socket = zsock_new(ZMQ_PUB);
+ if (socket==NULL) {
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ if (pubEP->is_secure) {
+ zcert_destroy(&pub_cert);
+ }
+#endif
+ perror("Error for zmq_socket");
+ }
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ if (pubEP->is_secure) {
+ zcert_apply (pub_cert, socket); // apply certificate to socket
+ zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
+ }
+#endif
+
+ int rv = -1, retry=0;
+ while(rv==-1 && retry < ZMQ_BIND_MAX_RETRY ) {
+ /* Randomized part due to same bundle publishing on different topics */
+ unsigned int port = rand_range(basePort,maxPort);
+
+ size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
+ char *url = calloc(len, sizeof(char*));
+ snprintf(url, len, "tcp://%s:%u", bindIP, port);
+
+ len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
+ char *bindUrl = calloc(len, sizeof(char));
+ snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
+
+ rv = zsock_bind (socket, "%s", bindUrl);
+ if (rv == -1) {
+ perror("Error for zmq_bind");
+ free(url);
+ } else {
+ sender->url = url;
+ sender->zmq.socket = socket;
+ }
+ retry++;
+ free(bindUrl);
+ }
+ }
+
+ if (sender->url != NULL) {
+ sender->scope = strndup(scope, 1024 * 1024);
+ sender->topic = strndup(topic, 1024 * 1024);
+
+ celixThreadMutex_create(&sender->serializer.mutex, NULL);
+ celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+ celixThreadMutex_create(&sender->zmq.mutex, NULL);
+ sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+ }
+
+ //track serializer svc based on the provided serializerSvcId
+ if (sender->url != NULL ) {
+ char filter[64];
+ snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.filter = filter;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = sender;
+ opts.setWithProperties = pubsub_zmqTopicSender_setSerializer;
+ sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //register publisher services using a service factory
+ if (sender->url != NULL) {
+ sender->publisher.factory.handle = sender;
+ sender->publisher.factory.getService = psa_zmq_getPublisherService;
+ sender->publisher.factory.ungetService = psa_zmq_ungetPublisherService;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+ celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+
+
+ sender->publisher.svcId = celix_bundleContext_registerServiceFactory(ctx, &sender->publisher.factory, PUBSUB_PUBLISHER_SERVICE_NAME, props);
+ }
+
+ if (sender->url == NULL) {
+ free(sender);
+ sender = NULL;
+ }
+
+ return sender;
+}
+
+void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
+ if (sender != NULL) {
+ celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
+ celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+
+ celixThreadMutex_destroy(&sender->serializer.mutex);
+ celixThreadMutex_destroy(&sender->boundedServices.mutex);
+ celixThreadMutex_destroy(&sender->zmq.mutex);
+
+ //TODO loop and cleanup
+ hashMap_destroy(sender->boundedServices.map, false, false);
+
+ free(sender->scope);
+ free(sender->topic);
+ free(sender->url);
+ free(sender);
+ }
+}
+
+const char* pubsub_zmqTopicSender_psaType(pubsub_zmq_topic_sender_t *sender __attribute__((unused))) {
+ return PSA_ZMQ_PUBSUB_ADMIN_TYPE;
+}
+
+const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
+ const char *result = NULL;
+ celixThreadMutex_lock(&sender->serializer.mutex);
+ if (sender->serializer.props != NULL) {
+ result = celix_properties_get(sender->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+ }
+ celixThreadMutex_unlock(&sender->serializer.mutex);
+ return result;
+}
+
+const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender) {
+ return sender->scope;
+}
+
+const char* pubsub_zmqTopicSender_topic(pubsub_zmq_topic_sender_t *sender) {
+ return sender->topic;
+}
+
+const char* pubsub_zmqTopicSender_url(pubsub_zmq_topic_sender_t *sender) {
+ return sender->url;
+}
+
+
+void pubsub_zmqTopicSender_connectTo(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint) {
+ //TODO subscriber count -> topic info
+}
+
+void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint) {
+ //TODO
+}
+
+static void pubsub_zmqTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_zmq_topic_sender_t *sender = handle;
+ pubsub_serializer_service_t *ser = svc;
+
+ if (ser == NULL) {
+ //TODO -> no serializer -> remove all publishers
+ }
+
+ celixThreadMutex_lock(&sender->serializer.mutex);
+ sender->serializer.svc = ser;
+ sender->serializer.props = props;
+ celixThreadMutex_unlock(&sender->serializer.mutex);
+}
+
+static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+ pubsub_zmq_topic_sender_t *sender = handle;
+ long bndId = celix_bundle_getId(requestingBundle);
+
+ celixThreadMutex_lock(&sender->boundedServices.mutex);
+ psa_zmq_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->getCount += 1;
+ } else {
+ entry = calloc(1, sizeof(*entry));
+ entry->getCount = 1;
+ entry->parent = sender;
+ entry->bndId = bndId;
+
+ celixThreadMutex_lock(&sender->serializer.mutex);
+ celix_status_t rc = CELIX_SUCCESS;
+ if (sender->serializer.svc != NULL) {
+ rc = sender->serializer.svc->createSerializerMap(sender->serializer.svc->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+ }
+ if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+ //TODO destroy and return NULL?
+ L_ERROR("Error creating publisher service, serializer not available / cannot get msg serializer map\n");
+ }
+ celixThreadMutex_unlock(&sender->serializer.mutex);
+
+
+ entry->service.handle = entry;
+ entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
+ entry->service.send = psa_zmq_topicPublicationSend;
+ entry->service.sendMultipart = psa_zmq_topicPublicationSendMultipart;
+
+ hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+ }
+ celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+ return &entry->service;
+}
+
+static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+ pubsub_zmq_topic_sender_t *sender = handle;
+ long bndId = celix_bundle_getId(requestingBundle);
+
+ celixThreadMutex_lock(&sender->boundedServices.mutex);
+ psa_zmq_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->getCount -= 1;
+ }
+ if (entry != NULL && entry->getCount == 0) {
+ //free entry
+ hashMap_remove(sender->boundedServices.map, (void*)bndId);
+
+
+ celixThreadMutex_lock(&sender->serializer.mutex);
+ celix_status_t rc = CELIX_SUCCESS;
+ if (sender->serializer.svc != NULL) {
+ rc = sender->serializer.svc->destroySerializerMap(sender->serializer.svc->handle, entry->msgTypes);
+ }
+ if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+ L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
+ }
+ celixThreadMutex_unlock(&sender->serializer.mutex);
+
+ free(entry);
+ }
+ celixThreadMutex_unlock(&sender->boundedServices.mutex);
+}
+
+static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
+ return psa_zmq_topicPublicationSendMultipart( handle, msgTypeId, msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
+}
+
+static bool psa_zmq_sendMsg(pubsub_zmq_topic_sender_t *sender, pubsub_msg_t *msg, bool last){
+
+ bool ret = true;
+
+ zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header));
+ if (headerMsg == NULL) ret=false;
+ zframe_t* payloadMsg = zframe_new(msg->payload, (size_t)msg->payloadSize);
+ if (payloadMsg == NULL) ret=false;
+
+ delay_first_send_for_late_joiners(sender);
+
+ celixThreadMutex_lock(&sender->zmq.mutex);
+ if (sender->zmq.socket == NULL) {
+ L_ERROR("[PSA_ZMQ] TopicSender zmq socket is NULL");
+ } else {
+ if (zframe_send(&headerMsg, sender->zmq.socket, ZFRAME_MORE) == -1) ret = false;
+
+ if (!last) {
+ if (zframe_send(&payloadMsg, sender->zmq.socket, ZFRAME_MORE) == -1) ret = false;
+ } else {
+ if (zframe_send(&payloadMsg, sender->zmq.socket, 0) == -1) ret = false;
+ }
+ }
+ celixThreadMutex_unlock(&sender->zmq.mutex);
+
+ if (!ret){
+ zframe_destroy(&headerMsg);
+ zframe_destroy(&payloadMsg);
+ }
+
+ free(msg->header);
+ free(msg->payload);
+ free(msg);
+
+ return ret;
+
+}
+
+static bool psa_zmq_sendMsgParts(pubsub_zmq_topic_sender_t *sender, celix_array_list_t *parts){
+ bool ret = true;
+ unsigned int i = 0;
+ unsigned int mp_num = (unsigned int)celix_arrayList_size(parts);
+ for (; i<mp_num; i++) {
+ ret = ret && psa_zmq_sendMsg(sender, (pubsub_msg_t*)celix_arrayList_get(parts,i), (i==mp_num-1));
+ }
+ celix_arrayList_clear(parts);
+ return ret;
+}
+
+static int psa_zmq_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags) {
+ int status = 0;
+ psa_zmq_bounded_service_entry_t *bound = handle;
+ pubsub_zmq_topic_sender_t *sender = bound->parent;
+
+ celixThreadMutex_lock(&bound->multipart.mutex);
+ if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->multipart.inProgress){ //means a real mp_msg
+ L_INFO("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n");
+ celixThreadMutex_unlock(&bound->multipart.mutex);
+ return -3;
+ }
+
+ pubsub_msg_serializer_t* msgSer = hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId);
+
+ if (msgSer!= NULL) {
+ int major=0, minor=0;
+
+ pubsub_msg_header_t *msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
+ strncpy( msg_hdr->topic, bound->parent->topic, MAX_TOPIC_LEN-1);
+ msg_hdr->type = msgTypeId;
+
+ if (msgSer->msgVersion != NULL){
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
+ msg_hdr->major = (unsigned char)major;
+ msg_hdr->minor = (unsigned char)minor;
+ }
+
+ void *serializedOutput = NULL;
+ size_t serializedOutputLen = 0;
+ msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+
+ pubsub_msg_t *msg = calloc(1,sizeof(struct pubsub_msg));
+ msg->header = msg_hdr;
+ msg->payload = (char*)serializedOutput;
+ msg->payloadSize = (int)serializedOutputLen;
+ bool snd = true;
+
+ switch(flags) {
+ case PUBSUB_PUBLISHER_FIRST_MSG:
+ bound->multipart.inProgress = true;
+ celix_arrayList_add(bound->multipart.parts, msg);
+ break;
+ case PUBSUB_PUBLISHER_PART_MSG:
+ if(!bound->multipart.inProgress){
+ L_INFO("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n");
+ status = -4;
+ }
+ else{
+ celix_arrayList_add(bound->multipart.parts, msg);
+ }
+ break;
+ case PUBSUB_PUBLISHER_LAST_MSG:
+ if(!bound->multipart.inProgress){
+ L_INFO("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n");
+ status = -4;
+ }
+ else{
+ celix_arrayList_add(bound->multipart.parts, msg);
+ snd = psa_zmq_sendMsgParts(bound->parent, bound->multipart.parts);
+ bound->multipart.inProgress = false;
+ assert(celix_arrayList_size(bound->multipart.parts) == 0); //should be cleanup by sendMsg
+ }
+ break;
+ case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case
+ snd = psa_zmq_sendMsg(bound->parent, msg, true);
+ break;
+ default:
+ L_INFO("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
+ status = -4;
+ break;
+ }
+
+ if (status==-4) {
+ free(msg);
+ }
+
+ if (!snd) {
+ L_WARN("[PSA_ZMQ] Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
+ }
+
+ } else {
+ L_ERROR("[PSA_ZMQ] No msg serializer available for msg type id %d\n", msgTypeId);
+ status=-1;
+ }
+
+ celixThreadMutex_unlock(&(bound->multipart.mutex));
+
+ return status;
+
+}
+
+static void delay_first_send_for_late_joiners(pubsub_zmq_topic_sender_t *sender) {
+
+ static bool firstSend = true;
+
+ if(firstSend){
+ L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+ sleep(FIRST_SEND_DELAY_IN_SECONDS);
+ firstSend = false;
+ }
+}
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+ double scaled = ((double)random())/((double)RAND_MAX);
+ return (unsigned int)((max-min+1)*scaled + min);
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
new file mode 100644
index 0000000..23a0c25
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -0,0 +1,46 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
+#define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
+
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_zmq_topic_sender pubsub_zmq_topic_sender_t;
+
+pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
+ celix_bundle_context_t *ctx,
+ log_helper_t *logHelper,
+ const char *scope,
+ const char *topic,
+ long serializerSvcId,
+ const char *bindIP,
+ unsigned int basePort,
+ unsigned int maxPort);
+void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);
+
+const char* pubsub_zmqTopicSender_psaType(pubsub_zmq_topic_sender_t *sender);
+const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender);
+const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender);
+const char* pubsub_zmqTopicSender_topic(pubsub_zmq_topic_sender_t *sender);
+const char* pubsub_zmqTopicSender_url(pubsub_zmq_topic_sender_t *sender);
+
+void pubsub_zmqTopicSender_connectTo(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
+void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
+
+#endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/CMakeLists.txt b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
index 96a8af1..c372973 100644
--- a/bundles/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
@@ -31,7 +31,10 @@ target_include_directories(celix_pubsub_discovery_etcd SYSTEM PRIVATE
${CURL_INCLUDE_DIR}
${JANSSON_INCLUDE_DIR}
)
-target_link_libraries(celix_pubsub_discovery_etcd PRIVATE Celix::pubsub_spi Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
+target_link_libraries(celix_pubsub_discovery_etcd PRIVATE
+ Celix::pubsub_spi Celix::framework Celix::etcdlib_static
+ Celix::shell_api Celix::log_helper
+ ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
install_celix_bundle(celix_pubsub_discovery_etcd EXPORT celix COMPONENT pubsub)
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index 81e467d..56d8997 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -21,6 +21,9 @@
#include <stdlib.h>
#include <string.h>
+#include "log_helper.h"
+#include "command.h"
+
#include "celix_bundle_context.h"
#include "celix_bundle_activator.h"
#include "constants.h"
@@ -29,7 +32,6 @@
#include "pubsub_common.h"
#include "pubsub_listeners.h"
#include "pubsub_discovery_impl.h"
-#include "../../../shell/shell/include/command.h"
typedef struct psd_activator {
pubsub_discovery_t *pubsub_discovery;
@@ -42,12 +44,17 @@ typedef struct psd_activator {
command_service_t cmdSvc;
long cmdSvcId;
+
+ log_helper_t *loghelper;
} psd_activator_t;
static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ctx) {
celix_status_t status;
- pubsub_discovery_create(ctx, &act->pubsub_discovery);
+ logHelper_create(ctx, &act->loghelper);
+ logHelper_start(act->loghelper);
+
+ act->pubsub_discovery = pubsub_discovery_create(ctx, act->loghelper);
// pubsub_discovery_start needs to be first to initialize
status = pubsub_discovery_start(act->pubsub_discovery);
@@ -91,6 +98,9 @@ static celix_status_t psd_stop(psd_activator_t *act, celix_bundle_context_t *ctx
celix_status_t status = pubsub_discovery_stop(act->pubsub_discovery);
pubsub_discovery_destroy(act->pubsub_discovery);
+ logHelper_stop(act->loghelper);
+ logHelper_destroy(&act->loghelper);
+
return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 237f2f2..116dd50 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -41,50 +41,53 @@
#include "pubsub_endpoint.h"
#include "pubsub_discovery_impl.h"
-static celix_properties_t* pubsub_discovery_parseEndpoint(const char *value);
+#define L_DEBUG(...) \
+ logHelper_log(disc->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(disc->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(disc->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(disc->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+static celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, const char *value);
static char* pubsub_discovery_createJsonEndpoint(const celix_properties_t *props);
static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, celix_properties_t *endpoint);
static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc, const char *uuid);
/* Discovery activator functions */
-celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_t **ps_discovery) {
- celix_status_t status = CELIX_SUCCESS;
-
- *ps_discovery = calloc(1, sizeof(**ps_discovery));
-
- if (*ps_discovery == NULL) {
- return CELIX_ENOMEM;
- }
-
- (*ps_discovery)->context = context;
- (*ps_discovery)->discoveredEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*ps_discovery)->announcedEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*ps_discovery)->discoveredEndpointsListeners = hashMap_create(NULL, NULL, NULL, NULL);
- celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsListenersMutex, NULL);
- celixThreadMutex_create(&(*ps_discovery)->announcedEndpointsMutex, NULL);
- celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsMutex, NULL);
- pthread_mutex_init(&(*ps_discovery)->waitMutex, NULL);
+pubsub_discovery_t* pubsub_discovery_create(bundle_context_pt context, log_helper_t *logHelper) {
+ pubsub_discovery_t *disc = calloc(1, sizeof(*disc));
+ disc->logHelper = logHelper;
+ disc->context = context;
+ disc->discoveredEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ disc->announcedEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ disc->discoveredEndpointsListeners = hashMap_create(NULL, NULL, NULL, NULL);
+ celixThreadMutex_create(&disc->discoveredEndpointsListenersMutex, NULL);
+ celixThreadMutex_create(&disc->announcedEndpointsMutex, NULL);
+ celixThreadMutex_create(&disc->discoveredEndpointsMutex, NULL);
+ pthread_mutex_init(&disc->waitMutex, NULL);
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
- pthread_cond_init(&(*ps_discovery)->waitCond, &attr);
- celixThreadMutex_create(&(*ps_discovery)->runningMutex, NULL);
- (*ps_discovery)->running = true;
+ pthread_cond_init(&disc->waitCond, &attr);
+ celixThreadMutex_create(&disc->runningMutex, NULL);
+ disc->running = true;
- (*ps_discovery)->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE);
+ disc->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE);
const char* etcdIp = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_IP_KEY, PUBSUB_DISCOVERY_SERVER_IP_DEFAULT);
long etcdPort = celix_bundleContext_getPropertyAsLong(context, PUBSUB_DISCOVERY_SERVER_PORT_KEY, PUBSUB_DISCOVERY_SERVER_PORT_DEFAULT);
long ttl = celix_bundleContext_getPropertyAsLong(context, PUBSUB_DISCOVERY_ETCD_TTL_KEY, PUBSUB_DISCOVERY_ETCD_TTL_DEFAULT);
etcd_init(etcdIp, (int)etcdPort, ETCDLIB_NO_CURL_INITIALIZATION);
- (*ps_discovery)->ttlForEntries = (int)ttl;
- (*ps_discovery)->sleepInsecBetweenTTLRefresh = (int)(((float)ttl)/2.0);
- (*ps_discovery)->pubsubPath = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_PATH_KEY, PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT);
- (*ps_discovery)->fwUUID = celix_bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+ disc->ttlForEntries = (int)ttl;
+ disc->sleepInsecBetweenTTLRefresh = (int)(((float)ttl)/2.0);
+ disc->pubsubPath = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_PATH_KEY, PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT);
+ disc->fwUUID = celix_bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
- return status;
+ return disc;
}
celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) {
@@ -118,7 +121,7 @@ celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) {
static void psd_etcdReadCallback(const char *key __attribute__((unused)), const char *value, void* arg) {
pubsub_discovery_t *disc = arg;
- celix_properties_t *props = pubsub_discovery_parseEndpoint(value);
+ celix_properties_t *props = pubsub_discovery_parseEndpoint(disc, value);
if (props != NULL) {
pubsub_discovery_addDiscoveredEndpoint(disc, props);
}
@@ -158,7 +161,7 @@ static void psd_watchForChange(pubsub_discovery_t *disc, bool *connectedPtr, lon
if (strncmp(ETCDLIB_ACTION_CREATE, action, strlen(ETCDLIB_ACTION_CREATE)) == 0 ||
strncmp(ETCDLIB_ACTION_SET, action, strlen(ETCDLIB_ACTION_SET)) == 0 ||
strncmp(ETCDLIB_ACTION_UPDATE, action, strlen(ETCDLIB_ACTION_UPDATE)) == 0) {
- celix_properties_t *props = pubsub_discovery_parseEndpoint(value);
+ celix_properties_t *props = pubsub_discovery_parseEndpoint(disc, value);
if (props != NULL) {
pubsub_discovery_addDiscoveredEndpoint(disc, props);
}
@@ -259,16 +262,21 @@ void* psd_refresh(void *data) {
//only refresh ttl -> no index update -> no watch trigger
int rc = etcd_refresh(entry->key, disc->ttlForEntries);
if (rc != ETCDLIB_RC_OK) {
- fprintf(stderr, "[PSD] Warning: error refreshing etcd key %s\n", entry->key);
+ L_ERROR("[PSD] Warning: error refreshing etcd key %s\n", entry->key);
entry->isSet = false;
+ entry->errorCount += 1;
+ } else {
+ entry->refreshCount += 1;
}
} else {
char *str = pubsub_discovery_createJsonEndpoint(entry->properties);
int rc = etcd_set(entry->key, str, disc->ttlForEntries, false);
if (rc == ETCDLIB_RC_OK) {
entry->isSet = true;
+ entry->setCount += 1;
} else {
- fprintf(stderr, "[PSD] Warning: error setting endpoint in etcd for key %s\n", entry->key);
+ L_ERROR("[PSD] Warning: error setting endpoint in etcd for key %s\n", entry->key);
+ entry->errorCount += 1;
}
}
}
@@ -337,6 +345,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_t *disc) {
if (entry->isSet) {
etcd_del(entry->key);
}
+ free(entry->key);
celix_properties_destroy(entry->properties);
free(entry);
}
@@ -386,6 +395,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
if (valid) {
pubsub_announce_entry_t *entry = calloc(1, sizeof(*entry));
+ clock_gettime(CLOCK_MONOTONIC, &entry->createTime);
entry->isSet = false;
entry->properties = celix_properties_copy(endpoint);
asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope, topic, uuid);
@@ -503,11 +513,11 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
}
celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex);
} else {
- fprintf(stderr, "[PSD] Warning unexpected remove from non existing endpoint (uuid is %s)\n", uuid);
+ L_ERROR("[PSD] Warning unexpected remove from non existing endpoint (uuid is %s)\n", uuid);
}
}
-celix_properties_t* pubsub_discovery_parseEndpoint(const char* etcdValue) {
+celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, const char* etcdValue) {
properties_t *props = properties_create();
// etcdValue contains the json formatted string
@@ -535,7 +545,7 @@ celix_properties_t* pubsub_discovery_parseEndpoint(const char* etcdValue) {
bool valid = pubsubEndpoint_isValid(props, true, true);
if (!valid) {
- fprintf(stderr, "[PSD] Warning retrieved endpoint is not valid\n");
+ L_ERROR("[PSD] Warning retrieved endpoint is not valid\n");
celix_properties_destroy(props);
props = NULL;
}
@@ -558,8 +568,61 @@ static char* pubsub_discovery_createJsonEndpoint(const celix_properties_t *props
}
celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
- //pubsub_discovery_t *psd = handle;
- //TODO
- fprintf(os, "TODO\n");
+ pubsub_discovery_t *disc = handle;
+
+ struct timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ //TODO add support for query (scope / topic)
+
+ fprintf(os, "\n");
+ fprintf(os, "Discovered Endpoints:\n");
+ celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpoints);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_properties_t *ep = hashMapIterator_nextValue(&iter);
+ const char *uuid = celix_properties_get(ep, PUBSUB_ENDPOINT_UUID, "!Error!");
+ const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+ const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
+ const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!");
+ fprintf(os, "Endpoint %s:\n", uuid);
+ fprintf(os, " |- type = %s\n", type);
+ fprintf(os, " |- scope = %s\n", scope);
+ fprintf(os, " |- topic = %s\n", topic);
+ fprintf(os, " |- admin type = %s\n", adminType);
+ fprintf(os, " |- serializer = %s\n", serType);
+ }
+ celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
+
+ fprintf(os, "\n");
+ fprintf(os, "Announced Endpoints:\n");
+ celixThreadMutex_lock(&disc->announcedEndpointsMutex);
+ iter = hashMapIterator_construct(disc->announcedEndpoints);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
+ const char *uuid = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, "!Error!");
+ const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+ const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
+ const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE, "!Error!");
+ int age = (int)(now.tv_sec - entry->createTime.tv_sec);
+ fprintf(os, "Endpoint %s:\n", uuid);
+ fprintf(os, " |- type = %s\n", type);
+ fprintf(os, " |- scope = %s\n", scope);
+ fprintf(os, " |- topic = %s\n", topic);
+ fprintf(os, " |- admin type = %s\n", adminType);
+ fprintf(os, " |- serializer = %s\n", serType);
+ fprintf(os, " |- age = %ds\n", age);
+ fprintf(os, " |- is set = %s\n", entry->isSet ? "true" : "false");
+ if (disc->verbose) {
+ fprintf(os, " |- set count = %d\n", entry->setCount);
+ fprintf(os, " |- refresh count = %d\n", entry->refreshCount);
+ fprintf(os, " |- error count = %d\n", entry->errorCount);
+ }
+ }
+ celixThreadMutex_unlock(&disc->announcedEndpointsMutex);
+
return CELIX_SUCCESS;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index a1af837..b2726fb 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -20,6 +20,7 @@
#ifndef PUBSUB_DISCOVERY_IMPL_H_
#define PUBSUB_DISCOVERY_IMPL_H_
+#include <log_helper.h>
#include "bundle_context.h"
#include "service_reference.h"
@@ -45,7 +46,7 @@
typedef struct pubsub_discovery {
bundle_context_pt context;
- //TODO add logHelper
+ log_helper_t *logHelper;
celix_thread_mutex_t discoveredEndpointsMutex; //when locked with EndpointsListenersMutex -> first lock this
hash_map_pt discoveredEndpoints; //<key = uuid,celix_properties_t /*endpoint*/>>
@@ -77,11 +78,15 @@ typedef struct pubsub_discovery {
typedef struct pubsub_announce_entry {
char *key; //etcd key
bool isSet; //whether the value is already set (in case of unavailable etcd server this can linger)
+ int refreshCount;
+ int setCount;
+ int errorCount;
celix_properties_t *properties; //the endpoint properties
+ struct timespec createTime; //from MONOTONIC clock
} pubsub_announce_entry_t;
-celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_t **out);
+pubsub_discovery_t* pubsub_discovery_create(bundle_context_pt context, log_helper_t *logHelper);
celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *node_discovery);
celix_status_t pubsub_discovery_start(pubsub_discovery_t *node_discovery);
celix_status_t pubsub_discovery_stop(pubsub_discovery_t *node_discovery);
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index c21d597..945ddf0 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -126,7 +126,7 @@ double pubsub_utils_matchPublisher(
score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
}
- printf("Score publisher service for psa type %s is %f\n", adminType, score);
+// printf("Score publisher service for psa type %s is %f\n", adminType, score);
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
@@ -183,7 +183,7 @@ double pubsub_utils_matchSubscriber(
score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
}
- printf("Score subscriber service match for psa type %s is %f\n", adminType, score);
+// printf("Score subscriber service match for psa type %s is %f\n", adminType, score);
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
@@ -217,7 +217,7 @@ bool pubsub_utils_matchEndpoint(
}
bool match = psaMatch && serMatch;
- printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
+// printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 1ebd74a..788e2de 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -98,6 +98,18 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
+ if (endpoints != NULL) {
+ int size = celix_arrayList_size(endpoints);
+ for (int i = 0; i < size; ++i) {
+ celix_properties_t *ep = celix_arrayList_get(endpoints, i);
+ celix_properties_destroy(ep);
+ }
+ celix_arrayList_destroy(endpoints);
+ }
+ }
hashMap_destroy(manager->announcedEndpoints.map, false, false);
celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
celixThreadMutex_destroy(&manager->announcedEndpoints.mutex);
@@ -108,10 +120,52 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
- hashMap_destroy(manager->discoveredEndpoints.map, true, false);
+ iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ free(entry);
+ }
+ }
+ hashMap_destroy(manager->discoveredEndpoints.map, false, false);
celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex);
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL) {
+ free(entry->scopeAndTopicKey);
+ free(entry->scope);
+ free(entry->topic);
+ celix_properties_destroy(entry->subscriberProperties);
+ celix_properties_destroy(entry->endpoint);
+ free(entry);
+ }
+ }
+ hashMap_destroy(manager->topicReceivers.map, false, false);
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ celixThreadMutex_destroy(&manager->topicReceivers.mutex);
+
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL) {
+ free(entry->scopeAndTopicKey);
+ free(entry->scope);
+ free(entry->topic);
+ celix_properties_destroy(entry->endpoint);
+ celix_filter_destroy(entry->publisherFilter);
+ free(entry);
+ }
+ }
+ hashMap_destroy(manager->topicSenders.map, false, false);
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ celixThreadMutex_destroy(&manager->topicSenders.mutex);
+
free(manager);
return status;
@@ -180,7 +234,8 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
}
}
}
- celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
celixThreadMutex_lock(&manager->topicReceivers.mutex);
iter = hashMapIterator_construct(manager->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/libs/etcdlib/src/etcd.c
----------------------------------------------------------------------
diff --git a/libs/etcdlib/src/etcd.c b/libs/etcdlib/src/etcd.c
index f6a0c9b..1f52dcb 100644
--- a/libs/etcdlib/src/etcd.c
+++ b/libs/etcdlib/src/etcd.c
@@ -34,6 +34,7 @@
#define ETCD_JSON_DIR "dir"
#define ETCD_JSON_MODIFIEDINDEX "modifiedIndex"
#define ETCD_JSON_INDEX "index"
+#define ETCD_JSON_ERRORCODE "errorCode"
#define ETCD_HEADER_INDEX "X-Etcd-Index: "
@@ -258,7 +259,7 @@ int etcd_get_directory(const char* directory, etcd_key_value_callback callback,
} else if (res == CURLE_OPERATION_TIMEDOUT) {
retVal = ETCDLIB_RC_TIMEOUT;
} else {
- //TODO return error ?
+ retVal = ETCDLIB_RC_ERROR;
}
free(reply.memory);
@@ -361,8 +362,21 @@ int etcd_refresh(const char* key, int ttl) {
free(url);
}
- if (res == CURLE_OK) {
- retVal = ETCDLIB_RC_OK;
+ if (res == CURLE_OK && reply.memory != NULL) {
+ json_error_t error;
+ json_t *root = json_loads(reply.memory, 0, &error);
+ if (root != NULL) {
+ json_t *errorCode = json_object_get(root, ETCD_JSON_ERRORCODE);
+ if (errorCode == NULL) {
+ //no curl error and no etcd errorcode reply -> OK
+ retVal = ETCDLIB_RC_OK;
+ } else {
+ retVal = ETCDLIB_RC_ERROR;
+ }
+ } else {
+ retVal = ETCDLIB_RC_ERROR;
+ fprintf(stderr, "[ETCDLIB] Error: %s is not json", reply.memory);
+ }
}
if (reply.memory) {
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/libs/utils/include/celix_array_list.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_array_list.h b/libs/utils/include/celix_array_list.h
index 45cb917..b4b5742 100644
--- a/libs/utils/include/celix_array_list.h
+++ b/libs/utils/include/celix_array_list.h
@@ -68,6 +68,8 @@ void celix_arrayList_addSize(celix_array_list_t *list, size_t val);
int celix_arrayList_indexOf(celix_array_list_t *list, celix_array_list_entry_t entry);
void celix_arrayList_removeAt(celix_array_list_t *list, int index);
+void celix_arrayList_clear(celix_array_list_t *list);
+
/**
* Remove entry from array list. To use this first memset the entry to null to ensure it completely initialized or
* ensure that the array list is created with a custom equals which matches the used entry.
http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/libs/utils/src/array_list.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/array_list.c b/libs/utils/src/array_list.c
index d4192f6..20ad5b6 100644
--- a/libs/utils/src/array_list.c
+++ b/libs/utils/src/array_list.c
@@ -252,14 +252,7 @@ bool arrayList_removeElement(array_list_pt list, void * element) {
}
void arrayList_clear(array_list_pt list) {
- unsigned int i;
- list->modCount++;
-
- for (i = 0; i < list->size; i++) {
- // free(list->elementData[i]);
- memset(&list->elementData[i], 0, sizeof(celix_array_list_entry_t));
- }
- list->size = 0;
+ celix_arrayList_clear(list);
}
bool arrayList_addAll(array_list_pt list, array_list_pt toAdd) {
@@ -559,3 +552,14 @@ void celix_arrayList_removeSize(celix_array_list_t *list, size_t val) {
entry.sizeVal = val;
celix_arrayList_removeEntry(list, entry);
}
+
+void celix_arrayList_clear(celix_array_list_t *list) {
+ unsigned int i;
+ list->modCount++;
+
+ for (i = 0; i < list->size; i++) {
+ // free(list->elementData[i]);
+ memset(&list->elementData[i], 0, sizeof(celix_array_list_entry_t));
+ }
+ list->size = 0;
+}