You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:30 UTC

[13/34] celix git commit: CELIX-454: Refactors the pubsub zmq admin.

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
new file mode 100644
index 0000000..9477055
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -0,0 +1,556 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <pubsub_common.h>
+#include <zconf.h>
+#include <arpa/inet.h>
+#include <czmq.h>
+#include <log_helper.h>
+#include "pubsub_zmq_topic_sender.h"
+#include "pubsub_psa_zmq_constants.h"
+#include "pubsub_zmq_common.h"
+
+#define FIRST_SEND_DELAY_IN_SECONDS             2
+#define ZMQ_BIND_MAX_RETRY                      10
+
+#define L_DEBUG(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_zmq_topic_sender {
+    celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
+
+    char *scope;
+    char *topic;
+    char *url;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        zsock_t *socket;
+        zcert_t *cert;
+    } zmq;
+
+    long serTrackerId;
+    struct {
+        celix_thread_mutex_t mutex;
+        pubsub_serializer_service_t *svc;
+        const celix_properties_t *props;
+    } serializer;
+
+    struct {
+        long svcId;
+        celix_service_factory_t factory;
+    } publisher;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map;  //key = bndId, value = psa_zmq_bounded_service_entry_t
+    } boundedServices;
+};
+
+typedef struct psa_zmq_bounded_service_entry {
+    pubsub_zmq_topic_sender_t *parent;
+    pubsub_publisher_t service;
+    long bndId;
+    hash_map_t *msgTypes;
+    int getCount;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        bool inProgress;
+        celix_array_list_t *parts;
+    } multipart;
+} psa_zmq_bounded_service_entry_t;
+
+
+typedef struct pubsub_msg {
+    pubsub_msg_header_t *header;
+    char* payload;
+    int payloadSize;
+} pubsub_msg_t;
+
+static void pubsub_zmqTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
+static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static unsigned int rand_range(unsigned int min, unsigned int max);
+static void delay_first_send_for_late_joiners(pubsub_zmq_topic_sender_t *sender);
+
+static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
+static int psa_zmq_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags);
+
+pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
+        celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        long serializerSvcId,
+        const char *bindIP,
+        unsigned int basePort,
+        unsigned int maxPort) {
+    pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
+    sender->ctx = ctx;
+    sender->logHelper = logHelper;
+
+    //setting up zmq socket for ZMQ TopicSender
+    {
+#ifdef BUILD_WITH_ZMQ_SECURITY
+        char* secure_topics = NULL;
+    	bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
+
+        if (secure_topics){
+            array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
+
+            int i;
+            int secure_topics_size = arrayList_size(secure_topics_list);
+            for (i = 0; i < secure_topics_size; i++){
+                char* top = arrayList_get(secure_topics_list, i);
+                if (strcmp(pubEP->topic, top) == 0){
+                    printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
+                    pubEP->is_secure = true;
+                }
+                free(top);
+                top = NULL;
+            }
+
+            arrayList_destroy(secure_topics_list);
+        }
+
+        zcert_t* pub_cert = NULL;
+        if (pubEP->is_secure){
+            char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
+            if (keys_bundle_dir == NULL){
+                return CELIX_SERVICE_EXCEPTION;
+            }
+
+            const char* keys_file_path = NULL;
+            const char* keys_file_name = NULL;
+            bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
+            bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
+
+            char cert_path[MAX_CERT_PATH_LENGTH];
+
+            //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
+            snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic);
+            free(keys_bundle_dir);
+            printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
+
+            pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path);
+            if (pub_cert == NULL){
+                printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
+                printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
+                pubEP->is_secure = false;
+            }
+        }
+#endif
+
+        zsock_t* socket = zsock_new(ZMQ_PUB);
+        if (socket==NULL) {
+#ifdef BUILD_WITH_ZMQ_SECURITY
+            if (pubEP->is_secure) {
+				zcert_destroy(&pub_cert);
+			}
+#endif
+            perror("Error for zmq_socket");
+        }
+#ifdef BUILD_WITH_ZMQ_SECURITY
+        if (pubEP->is_secure) {
+		    zcert_apply (pub_cert, socket); // apply certificate to socket
+		    zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
+	    }
+#endif
+
+        int rv = -1, retry=0;
+        while(rv==-1 && retry < ZMQ_BIND_MAX_RETRY ) {
+            /* Randomized part due to same bundle publishing on different topics */
+            unsigned int port = rand_range(basePort,maxPort);
+
+            size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
+            char *url = calloc(len, sizeof(char*));
+            snprintf(url, len, "tcp://%s:%u", bindIP, port);
+
+            len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
+            char *bindUrl = calloc(len, sizeof(char));
+            snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
+
+            rv = zsock_bind (socket, "%s", bindUrl);
+            if (rv == -1) {
+                perror("Error for zmq_bind");
+                free(url);
+            } else {
+                sender->url = url;
+                sender->zmq.socket = socket;
+            }
+            retry++;
+            free(bindUrl);
+        }
+    }
+
+    if (sender->url != NULL) {
+        sender->scope = strndup(scope, 1024 * 1024);
+        sender->topic = strndup(topic, 1024 * 1024);
+
+        celixThreadMutex_create(&sender->serializer.mutex, NULL);
+        celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+        celixThreadMutex_create(&sender->zmq.mutex, NULL);
+        sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+    }
+
+    //track serializer svc based on the provided serializerSvcId
+    if (sender->url != NULL ) {
+        char filter[64];
+        snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+        opts.filter.filter = filter;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = sender;
+        opts.setWithProperties = pubsub_zmqTopicSender_setSerializer;
+        sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+    //register publisher services using a service factory
+    if (sender->url != NULL) {
+        sender->publisher.factory.handle = sender;
+        sender->publisher.factory.getService = psa_zmq_getPublisherService;
+        sender->publisher.factory.ungetService = psa_zmq_ungetPublisherService;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+
+
+        sender->publisher.svcId = celix_bundleContext_registerServiceFactory(ctx, &sender->publisher.factory, PUBSUB_PUBLISHER_SERVICE_NAME, props);
+    }
+
+    if (sender->url == NULL) {
+        free(sender);
+        sender = NULL;
+    }
+
+    return sender;
+}
+
+void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
+    if (sender != NULL) {
+        celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
+        celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+
+        celixThreadMutex_destroy(&sender->serializer.mutex);
+        celixThreadMutex_destroy(&sender->boundedServices.mutex);
+        celixThreadMutex_destroy(&sender->zmq.mutex);
+
+        //TODO loop and cleanup
+        hashMap_destroy(sender->boundedServices.map, false, false);
+
+        free(sender->scope);
+        free(sender->topic);
+        free(sender->url);
+        free(sender);
+    }
+}
+
+const char* pubsub_zmqTopicSender_psaType(pubsub_zmq_topic_sender_t *sender __attribute__((unused))) {
+    return PSA_ZMQ_PUBSUB_ADMIN_TYPE;
+}
+
+const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
+    const char *result = NULL;
+    celixThreadMutex_lock(&sender->serializer.mutex);
+    if (sender->serializer.props != NULL) {
+        result = celix_properties_get(sender->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+    }
+    celixThreadMutex_unlock(&sender->serializer.mutex);
+    return result;
+}
+
+const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender) {
+    return sender->scope;
+}
+
+const char* pubsub_zmqTopicSender_topic(pubsub_zmq_topic_sender_t *sender) {
+    return sender->topic;
+}
+
+const char* pubsub_zmqTopicSender_url(pubsub_zmq_topic_sender_t *sender) {
+    return sender->url;
+}
+
+
+void pubsub_zmqTopicSender_connectTo(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint) {
+    //TODO subscriber count -> topic info
+}
+
+void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint) {
+    //TODO
+}
+
+static void pubsub_zmqTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_zmq_topic_sender_t *sender = handle;
+    pubsub_serializer_service_t *ser = svc;
+
+    if (ser == NULL) {
+        //TODO -> no serializer -> remove all publishers
+    }
+
+    celixThreadMutex_lock(&sender->serializer.mutex);
+    sender->serializer.svc = ser;
+    sender->serializer.props = props;
+    celixThreadMutex_unlock(&sender->serializer.mutex);
+}
+
+static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+    pubsub_zmq_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_zmq_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->getCount += 1;
+    } else {
+        entry = calloc(1, sizeof(*entry));
+        entry->getCount = 1;
+        entry->parent = sender;
+        entry->bndId = bndId;
+
+        celixThreadMutex_lock(&sender->serializer.mutex);
+        celix_status_t rc = CELIX_SUCCESS;
+        if (sender->serializer.svc != NULL) {
+            rc = sender->serializer.svc->createSerializerMap(sender->serializer.svc->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+        }
+        if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+            //TODO destroy and return NULL?
+            L_ERROR("Error creating publisher service, serializer not available / cannot get msg serializer map\n");
+        }
+        celixThreadMutex_unlock(&sender->serializer.mutex);
+
+
+        entry->service.handle = entry;
+        entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
+        entry->service.send = psa_zmq_topicPublicationSend;
+        entry->service.sendMultipart = psa_zmq_topicPublicationSendMultipart;
+
+        hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+    return &entry->service;
+}
+
+static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+    pubsub_zmq_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_zmq_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->getCount -= 1;
+    }
+    if (entry != NULL && entry->getCount == 0) {
+        //free entry
+        hashMap_remove(sender->boundedServices.map, (void*)bndId);
+
+
+        celixThreadMutex_lock(&sender->serializer.mutex);
+        celix_status_t rc = CELIX_SUCCESS;
+        if (sender->serializer.svc != NULL) {
+            rc = sender->serializer.svc->destroySerializerMap(sender->serializer.svc->handle, entry->msgTypes);
+        }
+        if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+            L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
+        }
+        celixThreadMutex_unlock(&sender->serializer.mutex);
+
+        free(entry);
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+}
+
+static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
+    return psa_zmq_topicPublicationSendMultipart( handle, msgTypeId, msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
+}
+
+static bool psa_zmq_sendMsg(pubsub_zmq_topic_sender_t *sender, pubsub_msg_t *msg, bool last){
+
+    bool ret = true;
+
+    zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header));
+    if (headerMsg == NULL) ret=false;
+    zframe_t* payloadMsg = zframe_new(msg->payload, (size_t)msg->payloadSize);
+    if (payloadMsg == NULL) ret=false;
+
+    delay_first_send_for_late_joiners(sender);
+
+    celixThreadMutex_lock(&sender->zmq.mutex);
+    if (sender->zmq.socket == NULL) {
+        L_ERROR("[PSA_ZMQ] TopicSender zmq socket is NULL");
+    } else {
+        if (zframe_send(&headerMsg, sender->zmq.socket, ZFRAME_MORE) == -1) ret = false;
+
+        if (!last) {
+            if (zframe_send(&payloadMsg, sender->zmq.socket, ZFRAME_MORE) == -1) ret = false;
+        } else {
+            if (zframe_send(&payloadMsg, sender->zmq.socket, 0) == -1) ret = false;
+        }
+    }
+    celixThreadMutex_unlock(&sender->zmq.mutex);
+
+    if (!ret){
+        zframe_destroy(&headerMsg);
+        zframe_destroy(&payloadMsg);
+    }
+
+    free(msg->header);
+    free(msg->payload);
+    free(msg);
+
+    return ret;
+
+}
+
+static bool psa_zmq_sendMsgParts(pubsub_zmq_topic_sender_t *sender, celix_array_list_t *parts){
+    bool ret = true;
+    unsigned int i = 0;
+    unsigned int mp_num = (unsigned int)celix_arrayList_size(parts);
+    for (; i<mp_num; i++) {
+        ret = ret && psa_zmq_sendMsg(sender, (pubsub_msg_t*)celix_arrayList_get(parts,i), (i==mp_num-1));
+    }
+    celix_arrayList_clear(parts);
+    return ret;
+}
+
+static int psa_zmq_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags) {
+    int status = 0;
+    psa_zmq_bounded_service_entry_t *bound = handle;
+    pubsub_zmq_topic_sender_t *sender = bound->parent;
+
+    celixThreadMutex_lock(&bound->multipart.mutex);
+    if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->multipart.inProgress){ //means a real mp_msg
+        L_INFO("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n");
+        celixThreadMutex_unlock(&bound->multipart.mutex);
+        return -3;
+    }
+
+    pubsub_msg_serializer_t* msgSer = hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId);
+
+    if (msgSer!= NULL) {
+        int major=0, minor=0;
+
+        pubsub_msg_header_t *msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
+        strncpy( msg_hdr->topic, bound->parent->topic, MAX_TOPIC_LEN-1);
+        msg_hdr->type = msgTypeId;
+
+        if (msgSer->msgVersion != NULL){
+            version_getMajor(msgSer->msgVersion, &major);
+            version_getMinor(msgSer->msgVersion, &minor);
+            msg_hdr->major = (unsigned char)major;
+            msg_hdr->minor = (unsigned char)minor;
+        }
+
+        void *serializedOutput = NULL;
+        size_t serializedOutputLen = 0;
+        msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+
+        pubsub_msg_t *msg = calloc(1,sizeof(struct pubsub_msg));
+        msg->header = msg_hdr;
+        msg->payload = (char*)serializedOutput;
+        msg->payloadSize = (int)serializedOutputLen;
+        bool snd = true;
+
+        switch(flags) {
+            case PUBSUB_PUBLISHER_FIRST_MSG:
+                bound->multipart.inProgress = true;
+                celix_arrayList_add(bound->multipart.parts, msg);
+                break;
+            case PUBSUB_PUBLISHER_PART_MSG:
+                if(!bound->multipart.inProgress){
+                    L_INFO("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n");
+                    status = -4;
+                }
+                else{
+                    celix_arrayList_add(bound->multipart.parts, msg);
+                }
+                break;
+            case PUBSUB_PUBLISHER_LAST_MSG:
+                if(!bound->multipart.inProgress){
+                    L_INFO("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n");
+                    status = -4;
+                }
+                else{
+                    celix_arrayList_add(bound->multipart.parts, msg);
+                    snd = psa_zmq_sendMsgParts(bound->parent, bound->multipart.parts);
+                    bound->multipart.inProgress = false;
+                    assert(celix_arrayList_size(bound->multipart.parts) == 0); //should be cleanup by sendMsg
+                }
+                break;
+            case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:	//Normal send case
+                snd = psa_zmq_sendMsg(bound->parent, msg, true);
+                break;
+            default:
+                L_INFO("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
+                status = -4;
+                break;
+        }
+
+        if (status==-4) {
+            free(msg);
+        }
+
+        if (!snd) {
+            L_WARN("[PSA_ZMQ] Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
+        }
+
+    } else {
+        L_ERROR("[PSA_ZMQ] No msg serializer available for msg type id %d\n", msgTypeId);
+        status=-1;
+    }
+
+    celixThreadMutex_unlock(&(bound->multipart.mutex));
+
+    return status;
+
+}
+
+static void delay_first_send_for_late_joiners(pubsub_zmq_topic_sender_t *sender) {
+
+    static bool firstSend = true;
+
+    if(firstSend){
+        L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        firstSend = false;
+    }
+}
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+    double scaled = ((double)random())/((double)RAND_MAX);
+    return (unsigned int)((max-min+1)*scaled + min);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
new file mode 100644
index 0000000..23a0c25
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -0,0 +1,46 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
+#define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
+
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_zmq_topic_sender pubsub_zmq_topic_sender_t;
+
+pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
+        celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        long serializerSvcId,
+        const char *bindIP,
+        unsigned int basePort,
+        unsigned int maxPort);
+void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);
+
+const char* pubsub_zmqTopicSender_psaType(pubsub_zmq_topic_sender_t *sender);
+const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender);
+const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender);
+const char* pubsub_zmqTopicSender_topic(pubsub_zmq_topic_sender_t *sender);
+const char* pubsub_zmqTopicSender_url(pubsub_zmq_topic_sender_t *sender);
+
+void pubsub_zmqTopicSender_connectTo(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
+void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
+
+#endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/CMakeLists.txt b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
index 96a8af1..c372973 100644
--- a/bundles/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
@@ -31,7 +31,10 @@ target_include_directories(celix_pubsub_discovery_etcd SYSTEM PRIVATE
 	${CURL_INCLUDE_DIR}
 	${JANSSON_INCLUDE_DIR}
 )
-target_link_libraries(celix_pubsub_discovery_etcd PRIVATE Celix::pubsub_spi Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
+target_link_libraries(celix_pubsub_discovery_etcd PRIVATE
+		Celix::pubsub_spi Celix::framework Celix::etcdlib_static
+		Celix::shell_api Celix::log_helper
+		${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
 
 install_celix_bundle(celix_pubsub_discovery_etcd EXPORT celix COMPONENT pubsub)
 

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index 81e467d..56d8997 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -21,6 +21,9 @@
 #include <stdlib.h>
 #include <string.h>
 
+#include "log_helper.h"
+#include "command.h"
+
 #include "celix_bundle_context.h"
 #include "celix_bundle_activator.h"
 #include "constants.h"
@@ -29,7 +32,6 @@
 #include "pubsub_common.h"
 #include "pubsub_listeners.h"
 #include "pubsub_discovery_impl.h"
-#include "../../../shell/shell/include/command.h"
 
 typedef struct psd_activator {
 	pubsub_discovery_t *pubsub_discovery;
@@ -42,12 +44,17 @@ typedef struct psd_activator {
 
 	command_service_t cmdSvc;
 	long cmdSvcId;
+
+	log_helper_t *loghelper;
 } psd_activator_t;
 
 static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ctx) {
 	celix_status_t status;
 
-	pubsub_discovery_create(ctx, &act->pubsub_discovery);
+	logHelper_create(ctx, &act->loghelper);
+	logHelper_start(act->loghelper);
+
+	act->pubsub_discovery = pubsub_discovery_create(ctx, act->loghelper);
 	// pubsub_discovery_start needs to be first to initialize
 	status = pubsub_discovery_start(act->pubsub_discovery);
 
@@ -91,6 +98,9 @@ static celix_status_t psd_stop(psd_activator_t *act, celix_bundle_context_t *ctx
 	celix_status_t status = pubsub_discovery_stop(act->pubsub_discovery);
 	pubsub_discovery_destroy(act->pubsub_discovery);
 
+	logHelper_stop(act->loghelper);
+	logHelper_destroy(&act->loghelper);
+
 	return status;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 237f2f2..116dd50 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -41,50 +41,53 @@
 #include "pubsub_endpoint.h"
 #include "pubsub_discovery_impl.h"
 
-static celix_properties_t* pubsub_discovery_parseEndpoint(const char *value);
+#define L_DEBUG(...) \
+    logHelper_log(disc->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(disc->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(disc->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(disc->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+static celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, const char *value);
 static char* pubsub_discovery_createJsonEndpoint(const celix_properties_t *props);
 static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, celix_properties_t *endpoint);
 static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc, const char *uuid);
 
 /* Discovery activator functions */
-celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_t **ps_discovery) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    *ps_discovery = calloc(1, sizeof(**ps_discovery));
-
-    if (*ps_discovery == NULL) {
-        return CELIX_ENOMEM;
-    }
-
-    (*ps_discovery)->context = context;
-    (*ps_discovery)->discoveredEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-    (*ps_discovery)->announcedEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-    (*ps_discovery)->discoveredEndpointsListeners = hashMap_create(NULL, NULL, NULL, NULL);
-    celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsListenersMutex, NULL);
-    celixThreadMutex_create(&(*ps_discovery)->announcedEndpointsMutex, NULL);
-    celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsMutex, NULL);
-    pthread_mutex_init(&(*ps_discovery)->waitMutex, NULL);
+pubsub_discovery_t* pubsub_discovery_create(bundle_context_pt context, log_helper_t *logHelper) {
+    pubsub_discovery_t *disc = calloc(1, sizeof(*disc));
+    disc->logHelper = logHelper;
+    disc->context = context;
+    disc->discoveredEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    disc->announcedEndpoints = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    disc->discoveredEndpointsListeners = hashMap_create(NULL, NULL, NULL, NULL);
+    celixThreadMutex_create(&disc->discoveredEndpointsListenersMutex, NULL);
+    celixThreadMutex_create(&disc->announcedEndpointsMutex, NULL);
+    celixThreadMutex_create(&disc->discoveredEndpointsMutex, NULL);
+    pthread_mutex_init(&disc->waitMutex, NULL);
     pthread_condattr_t attr;
     pthread_condattr_init(&attr);
     pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
-    pthread_cond_init(&(*ps_discovery)->waitCond, &attr);
-    celixThreadMutex_create(&(*ps_discovery)->runningMutex, NULL);
-    (*ps_discovery)->running = true;
+    pthread_cond_init(&disc->waitCond, &attr);
+    celixThreadMutex_create(&disc->runningMutex, NULL);
+    disc->running = true;
 
 
-    (*ps_discovery)->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE);
+    disc->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE);
 
     const char* etcdIp = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_IP_KEY, PUBSUB_DISCOVERY_SERVER_IP_DEFAULT);
     long etcdPort = celix_bundleContext_getPropertyAsLong(context, PUBSUB_DISCOVERY_SERVER_PORT_KEY, PUBSUB_DISCOVERY_SERVER_PORT_DEFAULT);
     long ttl = celix_bundleContext_getPropertyAsLong(context, PUBSUB_DISCOVERY_ETCD_TTL_KEY, PUBSUB_DISCOVERY_ETCD_TTL_DEFAULT);
 
     etcd_init(etcdIp, (int)etcdPort, ETCDLIB_NO_CURL_INITIALIZATION);
-    (*ps_discovery)->ttlForEntries = (int)ttl;
-    (*ps_discovery)->sleepInsecBetweenTTLRefresh = (int)(((float)ttl)/2.0);
-    (*ps_discovery)->pubsubPath = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_PATH_KEY, PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT);
-    (*ps_discovery)->fwUUID = celix_bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+    disc->ttlForEntries = (int)ttl;
+    disc->sleepInsecBetweenTTLRefresh = (int)(((float)ttl)/2.0);
+    disc->pubsubPath = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_PATH_KEY, PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT);
+    disc->fwUUID = celix_bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
 
-    return status;
+    return disc;
 }
 
 celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) {
@@ -118,7 +121,7 @@ celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) {
 
 static void psd_etcdReadCallback(const char *key __attribute__((unused)), const char *value, void* arg) {
     pubsub_discovery_t *disc = arg;
-    celix_properties_t *props = pubsub_discovery_parseEndpoint(value);
+    celix_properties_t *props = pubsub_discovery_parseEndpoint(disc, value);
     if (props != NULL) {
         pubsub_discovery_addDiscoveredEndpoint(disc, props);
     }
@@ -158,7 +161,7 @@ static void psd_watchForChange(pubsub_discovery_t *disc, bool *connectedPtr, lon
             if (strncmp(ETCDLIB_ACTION_CREATE, action, strlen(ETCDLIB_ACTION_CREATE)) == 0 ||
                        strncmp(ETCDLIB_ACTION_SET, action, strlen(ETCDLIB_ACTION_SET)) == 0 ||
                        strncmp(ETCDLIB_ACTION_UPDATE, action, strlen(ETCDLIB_ACTION_UPDATE)) == 0) {
-                celix_properties_t *props = pubsub_discovery_parseEndpoint(value);
+                celix_properties_t *props = pubsub_discovery_parseEndpoint(disc, value);
                 if (props != NULL) {
                     pubsub_discovery_addDiscoveredEndpoint(disc, props);
                 }
@@ -259,16 +262,21 @@ void* psd_refresh(void *data) {
                 //only refresh ttl -> no index update -> no watch trigger
                 int rc = etcd_refresh(entry->key, disc->ttlForEntries);
                 if (rc != ETCDLIB_RC_OK) {
-                    fprintf(stderr, "[PSD] Warning: error refreshing etcd key %s\n", entry->key);
+                    L_ERROR("[PSD] Warning: error refreshing etcd key %s\n", entry->key);
                     entry->isSet = false;
+                    entry->errorCount += 1;
+                } else {
+                    entry->refreshCount += 1;
                 }
             } else {
                 char *str = pubsub_discovery_createJsonEndpoint(entry->properties);
                 int rc = etcd_set(entry->key, str, disc->ttlForEntries, false);
                 if (rc == ETCDLIB_RC_OK) {
                     entry->isSet = true;
+                    entry->setCount += 1;
                 } else {
-                    fprintf(stderr, "[PSD] Warning: error setting endpoint in etcd for key %s\n", entry->key);
+                    L_ERROR("[PSD] Warning: error setting endpoint in etcd for key %s\n", entry->key);
+                    entry->errorCount += 1;
                 }
             }
         }
@@ -337,6 +345,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_t *disc) {
         if (entry->isSet) {
             etcd_del(entry->key);
         }
+        free(entry->key);
         celix_properties_destroy(entry->properties);
         free(entry);
     }
@@ -386,6 +395,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
 
     if (valid) {
         pubsub_announce_entry_t *entry = calloc(1, sizeof(*entry));
+        clock_gettime(CLOCK_MONOTONIC, &entry->createTime);
         entry->isSet = false;
         entry->properties = celix_properties_copy(endpoint);
         asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope, topic, uuid);
@@ -503,11 +513,11 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
         }
         celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex);
     } else {
-        fprintf(stderr, "[PSD] Warning unexpected remove from non existing endpoint (uuid is %s)\n", uuid);
+        L_ERROR("[PSD] Warning unexpected remove from non existing endpoint (uuid is %s)\n", uuid);
     }
 }
 
-celix_properties_t* pubsub_discovery_parseEndpoint(const char* etcdValue) {
+celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, const char* etcdValue) {
     properties_t *props = properties_create();
 
     // etcdValue contains the json formatted string
@@ -535,7 +545,7 @@ celix_properties_t* pubsub_discovery_parseEndpoint(const char* etcdValue) {
 
     bool valid = pubsubEndpoint_isValid(props, true, true);
     if (!valid) {
-        fprintf(stderr, "[PSD] Warning retrieved endpoint is not valid\n");
+        L_ERROR("[PSD] Warning retrieved endpoint is not valid\n");
         celix_properties_destroy(props);
         props = NULL;
     }
@@ -558,8 +568,61 @@ static char* pubsub_discovery_createJsonEndpoint(const celix_properties_t *props
 }
 
 celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
-    //pubsub_discovery_t *psd = handle;
-    //TODO
-    fprintf(os, "TODO\n");
+    pubsub_discovery_t *disc = handle;
+
+    struct timespec now;
+    clock_gettime(CLOCK_MONOTONIC, &now);
+    //TODO add support for query (scope / topic)
+
+    fprintf(os, "\n");
+    fprintf(os, "Discovered Endpoints:\n");
+    celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpoints);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_properties_t *ep = hashMapIterator_nextValue(&iter);
+        const char *uuid = celix_properties_get(ep, PUBSUB_ENDPOINT_UUID, "!Error!");
+        const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+        const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
+        const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+        const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!");
+        fprintf(os, "Endpoint %s:\n", uuid);
+        fprintf(os, "   |- type          = %s\n", type);
+        fprintf(os, "   |- scope         = %s\n", scope);
+        fprintf(os, "   |- topic         = %s\n", topic);
+        fprintf(os, "   |- admin type    = %s\n", adminType);
+        fprintf(os, "   |- serializer    = %s\n", serType);
+    }
+    celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
+
+    fprintf(os, "\n");
+    fprintf(os, "Announced Endpoints:\n");
+    celixThreadMutex_lock(&disc->announcedEndpointsMutex);
+    iter = hashMapIterator_construct(disc->announcedEndpoints);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
+        const char *uuid = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, "!Error!");
+        const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+        const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
+        const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+        const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE, "!Error!");
+        int age = (int)(now.tv_sec - entry->createTime.tv_sec);
+        fprintf(os, "Endpoint %s:\n", uuid);
+        fprintf(os, "   |- type          = %s\n", type);
+        fprintf(os, "   |- scope         = %s\n", scope);
+        fprintf(os, "   |- topic         = %s\n", topic);
+        fprintf(os, "   |- admin type    = %s\n", adminType);
+        fprintf(os, "   |- serializer    = %s\n", serType);
+        fprintf(os, "   |- age           = %ds\n", age);
+        fprintf(os, "   |- is set        = %s\n", entry->isSet ? "true" : "false");
+        if (disc->verbose) {
+            fprintf(os, "   |- set count     = %d\n", entry->setCount);
+            fprintf(os, "   |- refresh count = %d\n", entry->refreshCount);
+            fprintf(os, "   |- error count   = %d\n", entry->errorCount);
+        }
+    }
+    celixThreadMutex_unlock(&disc->announcedEndpointsMutex);
+
     return CELIX_SUCCESS;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index a1af837..b2726fb 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -20,6 +20,7 @@
 #ifndef PUBSUB_DISCOVERY_IMPL_H_
 #define PUBSUB_DISCOVERY_IMPL_H_
 
+#include <log_helper.h>
 #include "bundle_context.h"
 #include "service_reference.h"
 
@@ -45,7 +46,7 @@
 
 typedef struct pubsub_discovery {
 	bundle_context_pt context;
-	//TODO add logHelper
+	log_helper_t *logHelper;
 
 	celix_thread_mutex_t discoveredEndpointsMutex; //when locked with EndpointsListenersMutex -> first lock this
 	hash_map_pt discoveredEndpoints; //<key = uuid,celix_properties_t /*endpoint*/>>
@@ -77,11 +78,15 @@ typedef struct pubsub_discovery {
 typedef struct pubsub_announce_entry {
 	char *key; //etcd key
 	bool isSet; //whether the value is already set (in case of unavailable etcd server this can linger)
+	int refreshCount;
+	int setCount;
+	int errorCount;
 	celix_properties_t *properties; //the endpoint properties
+	struct timespec createTime; //from MONOTONIC clock
 } pubsub_announce_entry_t;
 
 
-celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_t **out);
+pubsub_discovery_t* pubsub_discovery_create(bundle_context_pt context, log_helper_t *logHelper);
 celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *node_discovery);
 celix_status_t pubsub_discovery_start(pubsub_discovery_t *node_discovery);
 celix_status_t pubsub_discovery_stop(pubsub_discovery_t *node_discovery);

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index c21d597..945ddf0 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -126,7 +126,7 @@ double pubsub_utils_matchPublisher(
 		score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
 	}
 
-	printf("Score publisher service for psa type %s is %f\n", adminType, score);
+//	printf("Score publisher service for psa type %s is %f\n", adminType, score);
 
 	if (outSerializerSvcId != NULL) {
 		*outSerializerSvcId = serializerSvcId;
@@ -183,7 +183,7 @@ double pubsub_utils_matchSubscriber(
 		score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
 	}
 
-	printf("Score subscriber service match for psa type %s is %f\n", adminType, score);
+//	printf("Score subscriber service match for psa type %s is %f\n", adminType, score);
 
 	if (outSerializerSvcId != NULL) {
 		*outSerializerSvcId = serializerSvcId;
@@ -217,7 +217,7 @@ bool pubsub_utils_matchEndpoint(
 	}
 
 	bool match = psaMatch && serMatch;
-	printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
+//	printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
 
 	if (outSerializerSvcId != NULL) {
 		*outSerializerSvcId = serializerSvcId;

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 1ebd74a..788e2de 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -98,6 +98,18 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
 
 
 	celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+	hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
+	while (hashMapIterator_hasNext(&iter)) {
+	    celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
+	    if (endpoints != NULL) {
+	        int size = celix_arrayList_size(endpoints);
+	        for (int i = 0; i < size; ++i) {
+	            celix_properties_t *ep = celix_arrayList_get(endpoints, i);
+	            celix_properties_destroy(ep);
+	        }
+	        celix_arrayList_destroy(endpoints);
+	    }
+	}
 	hashMap_destroy(manager->announcedEndpoints.map, false, false);
 	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
 	celixThreadMutex_destroy(&manager->announcedEndpoints.mutex);
@@ -108,10 +120,52 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
 	celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
 
 	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
-	hashMap_destroy(manager->discoveredEndpoints.map, true, false);
+    iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL) {
+            celix_properties_destroy(entry->endpoint);
+            free(entry);
+        }
+    }
+	hashMap_destroy(manager->discoveredEndpoints.map, false, false);
 	celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
 	celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex);
 
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    iter = hashMapIterator_construct(manager->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL) {
+            free(entry->scopeAndTopicKey);
+            free(entry->scope);
+            free(entry->topic);
+            celix_properties_destroy(entry->subscriberProperties);
+            celix_properties_destroy(entry->endpoint);
+            free(entry);
+        }
+    }
+    hashMap_destroy(manager->topicReceivers.map, false, false);
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+    celixThreadMutex_destroy(&manager->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+    iter = hashMapIterator_construct(manager->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL) {
+            free(entry->scopeAndTopicKey);
+            free(entry->scope);
+            free(entry->topic);
+            celix_properties_destroy(entry->endpoint);
+            celix_filter_destroy(entry->publisherFilter);
+            free(entry);
+        }
+    }
+    hashMap_destroy(manager->topicSenders.map, false, false);
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
+    celixThreadMutex_destroy(&manager->topicSenders.mutex);
+
 	free(manager);
 
 	return status;
@@ -180,7 +234,8 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
             }
 	    }
 	}
-    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
     celixThreadMutex_lock(&manager->topicReceivers.mutex);
     iter = hashMapIterator_construct(manager->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/libs/etcdlib/src/etcd.c
----------------------------------------------------------------------
diff --git a/libs/etcdlib/src/etcd.c b/libs/etcdlib/src/etcd.c
index f6a0c9b..1f52dcb 100644
--- a/libs/etcdlib/src/etcd.c
+++ b/libs/etcdlib/src/etcd.c
@@ -34,6 +34,7 @@
 #define ETCD_JSON_DIR                   "dir"
 #define ETCD_JSON_MODIFIEDINDEX         "modifiedIndex"
 #define ETCD_JSON_INDEX                 "index"
+#define ETCD_JSON_ERRORCODE				"errorCode"
 
 #define ETCD_HEADER_INDEX               "X-Etcd-Index: "
 
@@ -258,7 +259,7 @@ int etcd_get_directory(const char* directory, etcd_key_value_callback callback,
 	} else if (res == CURLE_OPERATION_TIMEDOUT) {
 		retVal = ETCDLIB_RC_TIMEOUT;
 	} else {
-		//TODO return error ?
+		retVal = ETCDLIB_RC_ERROR;
 	}
 
     free(reply.memory);
@@ -361,8 +362,21 @@ int etcd_refresh(const char* key, int ttl) {
 		free(url);
 	}
 
-	if (res == CURLE_OK) {
-		retVal = ETCDLIB_RC_OK;
+	if (res == CURLE_OK && reply.memory != NULL) {
+		json_error_t error;
+		json_t *root = json_loads(reply.memory, 0, &error);
+		if (root != NULL) {
+			json_t *errorCode = json_object_get(root, ETCD_JSON_ERRORCODE);
+			if (errorCode == NULL) {
+				//no curl error and no etcd errorcode reply -> OK
+				retVal = ETCDLIB_RC_OK;
+			} else {
+				retVal = ETCDLIB_RC_ERROR;
+			}
+		} else {
+			retVal = ETCDLIB_RC_ERROR;
+			fprintf(stderr, "[ETCDLIB] Error: %s is not json", reply.memory);
+		}
 	}
 
 	if (reply.memory) {

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/libs/utils/include/celix_array_list.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_array_list.h b/libs/utils/include/celix_array_list.h
index 45cb917..b4b5742 100644
--- a/libs/utils/include/celix_array_list.h
+++ b/libs/utils/include/celix_array_list.h
@@ -68,6 +68,8 @@ void celix_arrayList_addSize(celix_array_list_t *list, size_t val);
 int celix_arrayList_indexOf(celix_array_list_t *list, celix_array_list_entry_t entry);
 void celix_arrayList_removeAt(celix_array_list_t *list, int index);
 
+void celix_arrayList_clear(celix_array_list_t *list);
+
 /**
  * Remove entry from array list. To use this first memset the entry to null to ensure it completely initialized or
  * ensure that the array list is created with a custom equals which matches the used entry.

http://git-wip-us.apache.org/repos/asf/celix/blob/00454efd/libs/utils/src/array_list.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/array_list.c b/libs/utils/src/array_list.c
index d4192f6..20ad5b6 100644
--- a/libs/utils/src/array_list.c
+++ b/libs/utils/src/array_list.c
@@ -252,14 +252,7 @@ bool arrayList_removeElement(array_list_pt list, void * element) {
 }
 
 void arrayList_clear(array_list_pt list) {
-	unsigned int i;
-	list->modCount++;
-
-	for (i = 0; i < list->size; i++) {
-		// free(list->elementData[i]);
-		memset(&list->elementData[i], 0, sizeof(celix_array_list_entry_t));
-	}
-	list->size = 0;
+	celix_arrayList_clear(list);
 }
 
 bool arrayList_addAll(array_list_pt list, array_list_pt toAdd) {
@@ -559,3 +552,14 @@ void celix_arrayList_removeSize(celix_array_list_t *list, size_t val) {
 	entry.sizeVal = val;
 	celix_arrayList_removeEntry(list, entry);
 }
+
+void celix_arrayList_clear(celix_array_list_t *list) {
+	unsigned int i;
+	list->modCount++;
+
+	for (i = 0; i < list->size; i++) {
+		// free(list->elementData[i]);
+		memset(&list->elementData[i], 0, sizeof(celix_array_list_entry_t));
+	}
+	list->size = 0;
+}