You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:34 UTC

[17/34] celix git commit: CELIX-454: Some bug and mem leak fixes for the PSA refactoring

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index e26754c..f0c072d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -45,23 +45,16 @@
 #define L_ERROR(...) \
     logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
-struct pubsub_updmc_topic_receiver {
+struct pubsub_zmq_topic_receiver {
     celix_bundle_context_t *ctx;
     log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
     char *scope;
     char *topic;
 
     zsock_t *zmqSocket;
 
-    //serialiser svc
-    long serializerTrackerId;
-    struct {
-
-        celix_thread_mutex_t mutex; //protect svc
-        pubsub_serializer_service_t *svc;
-        const celix_properties_t *props;
-    } serializer;
-
     struct {
         celix_thread_t thread;
         celix_thread_mutex_t mutex;
@@ -107,21 +100,23 @@ typedef struct msg_map_entry {
 } msg_map_entry_t;
 
 
-static void pubsub_zmqTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props);
 static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
-static void psa_zmq_processMsg(pubsub_updmc_topic_receiver_t *receiver, celix_array_list_t *messages);
+static void psa_zmq_processMsg(pubsub_zmq_topic_receiver_t *receiver, celix_array_list_t *messages);
 static void* psa_zmq_recvThread(void * data);
 
 
-pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
                                                               log_helper_t *logHelper,
                                                               const char *scope,
                                                                 const char *topic,
-                                                                long serializerSvcId) {
-    pubsub_updmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+                                                                long serializerSvcId,
+                                                                pubsub_serializer_service_t *serializer) {
+    pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
+    receiver->serializerSvcId = serializerSvcId;
+    receiver->serializer = serializer;
 
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -186,7 +181,6 @@ pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_conte
         receiver->scope = strndup(scope, 1024 * 1024);
         receiver->topic = strndup(topic, 1024 * 1024);
 
-        celixThreadMutex_create(&receiver->serializer.mutex, NULL);
         celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
         celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
         celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
@@ -195,20 +189,6 @@ pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_conte
         receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
     }
 
-    //track serializer svc based on the provided serializerSvcId
-    if (receiver->zmqSocket != NULL ) {
-        char filter[64];
-        snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
-
-        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
-        opts.filter.filter = filter;
-        opts.filter.ignoreServiceLanguage = true;
-        opts.callbackHandle = receiver;
-        opts.setWithProperties = pubsub_zmqTopicReceiver_setSerializer;
-        receiver->serializerTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    }
-
     //track subscribers
     if (receiver->zmqSocket != NULL ) {
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -239,39 +219,28 @@ pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_conte
     return receiver;
 }
 
-void pubsub_zmqTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
+void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
     if (receiver != NULL) {
 
+        celixThreadMutex_lock(&receiver->recvThread.mutex);
+        receiver->recvThread.running = false;
+        celixThreadMutex_unlock(&receiver->recvThread.mutex);
+        celixThread_join(receiver->recvThread.thread, NULL);
+
+        celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
         celixThreadMutex_lock(&receiver->subscribers.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL)  {
-                celixThreadMutex_lock(&receiver->serializer.mutex);
-                if (receiver->serializer.svc != NULL) {
-                    if (entry->msgTypes != NULL) {
-                        receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle,
-                                                                       entry->msgTypes);
-                    }
-                } else {
-                    L_ERROR("Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
-                }
-                celixThreadMutex_unlock(&receiver->serializer.mutex);
+                receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
                 free(entry);
             }
         }
         hashMap_destroy(receiver->subscribers.map, false, false);
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
-
-        celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
-        celix_bundleContext_stopTracker(receiver->ctx, receiver->serializerTrackerId);
-
-        celixThreadMutex_lock(&receiver->recvThread.mutex);
-        receiver->recvThread.running = false;
-        celixThreadMutex_unlock(&receiver->recvThread.mutex);
-        celixThread_join(receiver->recvThread.thread, NULL);
-
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
         iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
@@ -284,7 +253,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
         hashMap_destroy(receiver->requestedConnections.map, false, false);
         celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 
-        celixThreadMutex_destroy(&receiver->serializer.mutex);
         celixThreadMutex_destroy(&receiver->subscribers.mutex);
         celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
         celixThreadMutex_destroy(&receiver->recvThread.mutex);
@@ -294,29 +262,20 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
     free(receiver);
 }
 
-const char* pubsub_zmqTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver) {
-    return PSA_ZMQ_PUBSUB_ADMIN_TYPE;
-}
-
-const char* pubsub_zmqTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver) {
-    const char *result = NULL;
-    celixThreadMutex_lock(&receiver->serializer.mutex);
-    if (receiver->serializer.props != NULL) {
-        result = celix_properties_get(receiver->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
-    }
-    celixThreadMutex_unlock(&receiver->serializer.mutex);
-    return result;
-}
-
-const char* pubsub_zmqTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver) {
+const char* pubsub_zmqTopicReceiver_scope(pubsub_zmq_topic_receiver_t *receiver) {
     return receiver->scope;
 }
-const char* pubsub_zmqTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver) {
+const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver) {
     return receiver->topic;
 }
 
+long pubsub_zmqTopicReceiver_serializerSvcId(pubsub_zmq_topic_receiver_t *receiver) {
+    return receiver->serializerSvcId;
+}
+
+
 void pubsub_zmqTopicReceiver_connectTo(
-        pubsub_updmc_topic_receiver_t *receiver,
+        pubsub_zmq_topic_receiver_t *receiver,
         const char *url) {
     L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope, receiver->topic, url);
 
@@ -338,7 +297,7 @@ void pubsub_zmqTopicReceiver_connectTo(
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
-void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *url) {
+void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url) {
     L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope, receiver->topic, url);
 
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -357,39 +316,8 @@ void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *recei
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
-
-static void pubsub_zmqTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_updmc_topic_receiver_t *receiver = handle;
-    pubsub_serializer_service_t *ser = svc;
-
-    //check if current serializer is set, if so destroy existing type maps
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL && entry->msgTypes != NULL) {
-            celixThreadMutex_lock(&receiver->serializer.mutex);
-            if (receiver->serializer.svc != NULL) {
-                receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle, entry->msgTypes);
-            }
-            celixThreadMutex_unlock(&receiver->serializer.mutex);
-            entry->msgTypes = NULL;
-        }
-    }
-    celixThreadMutex_unlock(&receiver->subscribers.mutex);
-
-    if (ser == NULL) {
-        //TODO -> no serializer -> remove all publishers
-    }
-
-    celixThreadMutex_lock(&receiver->serializer.mutex);
-    receiver->serializer.svc = ser;
-    receiver->serializer.props = props;
-    celixThreadMutex_unlock(&receiver->serializer.mutex);
-}
-
 static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
-    pubsub_updmc_topic_receiver_t *receiver = handle;
+    pubsub_zmq_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -408,22 +336,19 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
         entry->usageCount = 1;
         entry->svc = svc;
 
-        celixThreadMutex_lock(&receiver->serializer.mutex);
-        if (receiver->serializer.svc != NULL) {
-            receiver->serializer.svc->createSerializerMap(receiver->serializer.svc->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+        if (rc == 0) {
+            hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
         } else {
-            fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            L_ERROR("[PSA_ZMQ] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            free(entry);
         }
-        celixThreadMutex_unlock(&receiver->serializer.mutex);
-
-        hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
-
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
 static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
-    pubsub_updmc_topic_receiver_t *receiver = handle;
+    pubsub_zmq_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
 
@@ -435,20 +360,17 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
     if (entry != NULL && entry->usageCount <= 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
-        celixThreadMutex_lock(&receiver->serializer.mutex);
-        if (receiver->serializer.svc != NULL) {
-            receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle, entry->msgTypes);
-        } else {
-            fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+        int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+        if (rc != 0) {
+            L_ERROR("[PSA_ZMQ] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
         }
-        celixThreadMutex_unlock(&receiver->serializer.mutex);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
 static void* psa_zmq_recvThread(void * data) {
-    pubsub_updmc_topic_receiver_t *receiver = data;
+    pubsub_zmq_topic_receiver_t *receiver = data;
 
     celixThreadMutex_lock(&receiver->recvThread.mutex);
     bool running = receiver->recvThread.running;
@@ -629,7 +551,7 @@ static mp_handle_t* psa_zmq_createMultipartHandle(hash_map_pt svc_msg_db,array_l
 }
 
 
-static void psa_zmq_processMsg(pubsub_updmc_topic_receiver_t *receiver, celix_array_list_t *messages) {
+static void psa_zmq_processMsg(pubsub_zmq_topic_receiver_t *receiver, celix_array_list_t *messages) {
 
     pubsub_msg_header_t *first_msg_hdr = (pubsub_msg_header_t*)zframe_data(((complete_zmq_msg_t*)celix_arrayList_get(messages,0))->header);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index 269d1db..ea931a1 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -21,21 +21,22 @@
 
 #include "celix_bundle_context.h"
 
-typedef struct pubsub_updmc_topic_receiver pubsub_updmc_topic_receiver_t;
+typedef struct pubsub_zmq_topic_receiver pubsub_zmq_topic_receiver_t;
 
-pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
         log_helper_t *logHelper,
         const char *scope, 
         const char *topic,
-        long serializerSvcId);
-void pubsub_zmqTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver);
+        long serializerSvcId,
+        pubsub_serializer_service_t *serializer);
+void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);
 
-const char* pubsub_zmqTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_zmqTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_zmqTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_zmqTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_zmqTopicReceiver_scope(pubsub_zmq_topic_receiver_t *receiver);
+const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver);
 
-void pubsub_zmqTopicReceiver_connectTo(pubsub_updmc_topic_receiver_t *receiver, const char *url);
-void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *url);
+long pubsub_zmqTopicReceiver_serializerSvcId(pubsub_zmq_topic_receiver_t *receiver);
+
+void pubsub_zmqTopicReceiver_connectTo(pubsub_zmq_topic_receiver_t *receiver, const char *url);
+void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url);
 
 #endif //CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 9477055..9d44c94 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -47,6 +47,8 @@
 struct pubsub_zmq_topic_sender {
     celix_bundle_context_t *ctx;
     log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
 
     char *scope;
     char *topic;
@@ -58,13 +60,6 @@ struct pubsub_zmq_topic_sender {
         zcert_t *cert;
     } zmq;
 
-    long serTrackerId;
-    struct {
-        celix_thread_mutex_t mutex;
-        pubsub_serializer_service_t *svc;
-        const celix_properties_t *props;
-    } serializer;
-
     struct {
         long svcId;
         celix_service_factory_t factory;
@@ -97,7 +92,6 @@ typedef struct pubsub_msg {
     int payloadSize;
 } pubsub_msg_t;
 
-static void pubsub_zmqTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
 static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static unsigned int rand_range(unsigned int min, unsigned int max);
@@ -112,12 +106,15 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         const char *scope,
         const char *topic,
         long serializerSvcId,
+        pubsub_serializer_service_t *ser,
         const char *bindIP,
         unsigned int basePort,
         unsigned int maxPort) {
     pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->logHelper = logHelper;
+    sender->serializerSvcId = serializerSvcId;
+    sender->serializer = ser;
 
     //setting up zmq socket for ZMQ TopicSender
     {
@@ -217,26 +214,11 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         sender->scope = strndup(scope, 1024 * 1024);
         sender->topic = strndup(topic, 1024 * 1024);
 
-        celixThreadMutex_create(&sender->serializer.mutex, NULL);
         celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
         celixThreadMutex_create(&sender->zmq.mutex, NULL);
         sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
     }
 
-    //track serializer svc based on the provided serializerSvcId
-    if (sender->url != NULL ) {
-        char filter[64];
-        snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
-
-        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
-        opts.filter.filter = filter;
-        opts.filter.ignoreServiceLanguage = true;
-        opts.callbackHandle = sender;
-        opts.setWithProperties = pubsub_zmqTopicSender_setSerializer;
-        sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    }
-
     //register publisher services using a service factory
     if (sender->url != NULL) {
         sender->publisher.factory.handle = sender;
@@ -261,16 +243,30 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
 
 void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
     if (sender != NULL) {
-        celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
         celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
 
-        celixThreadMutex_destroy(&sender->serializer.mutex);
+        celixThreadMutex_lock(&sender->boundedServices.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (entry != NULL) {
+                sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+                celixThreadMutex_destroy(&entry->multipart.mutex);
+                for (int i = 0; i < celix_arrayList_size(entry->multipart.parts); ++i) {
+                    pubsub_msg_t *msg = celix_arrayList_get(entry->multipart.parts, i);
+                    free(msg->header);
+                    free(msg->payload);
+                    free(msg);
+                }
+                free(entry);
+            }
+        }
+        hashMap_destroy(sender->boundedServices.map, false, false);
+        celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->zmq.mutex);
 
-        //TODO loop and cleanup
-        hashMap_destroy(sender->boundedServices.map, false, false);
-
         free(sender->scope);
         free(sender->topic);
         free(sender->url);
@@ -278,18 +274,8 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
     }
 }
 
-const char* pubsub_zmqTopicSender_psaType(pubsub_zmq_topic_sender_t *sender __attribute__((unused))) {
-    return PSA_ZMQ_PUBSUB_ADMIN_TYPE;
-}
-
-const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
-    const char *result = NULL;
-    celixThreadMutex_lock(&sender->serializer.mutex);
-    if (sender->serializer.props != NULL) {
-        result = celix_properties_get(sender->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
-    }
-    celixThreadMutex_unlock(&sender->serializer.mutex);
-    return result;
+long pubsub_zmqTopicSender_serializerSvcId(pubsub_zmq_topic_sender_t *sender) {
+    return sender->serializerSvcId;
 }
 
 const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender) {
@@ -313,20 +299,6 @@ void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, con
     //TODO
 }
 
-static void pubsub_zmqTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
-    pubsub_zmq_topic_sender_t *sender = handle;
-    pubsub_serializer_service_t *ser = svc;
-
-    if (ser == NULL) {
-        //TODO -> no serializer -> remove all publishers
-    }
-
-    celixThreadMutex_lock(&sender->serializer.mutex);
-    sender->serializer.svc = ser;
-    sender->serializer.props = props;
-    celixThreadMutex_unlock(&sender->serializer.mutex);
-}
-
 static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
     pubsub_zmq_topic_sender_t *sender = handle;
     long bndId = celix_bundle_getId(requestingBundle);
@@ -341,24 +313,19 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
         entry->parent = sender;
         entry->bndId = bndId;
 
-        celixThreadMutex_lock(&sender->serializer.mutex);
-        celix_status_t rc = CELIX_SUCCESS;
-        if (sender->serializer.svc != NULL) {
-            rc = sender->serializer.svc->createSerializerMap(sender->serializer.svc->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
-        }
-        if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
-            //TODO destroy and return NULL?
-            L_ERROR("Error creating publisher service, serializer not available / cannot get msg serializer map\n");
+        int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+        if (rc == 0) {
+            entry->service.handle = entry;
+            entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
+            entry->service.send = psa_zmq_topicPublicationSend;
+            entry->service.sendMultipart = psa_zmq_topicPublicationSendMultipart;
+            hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+        } else {
+            L_ERROR("Error creating serializer map for ZMQ TopicSender %s/%s", sender->scope, sender->topic);
         }
-        celixThreadMutex_unlock(&sender->serializer.mutex);
 
 
-        entry->service.handle = entry;
-        entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
-        entry->service.send = psa_zmq_topicPublicationSend;
-        entry->service.sendMultipart = psa_zmq_topicPublicationSendMultipart;
 
-        hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 
@@ -377,17 +344,10 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
     if (entry != NULL && entry->getCount == 0) {
         //free entry
         hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
-
-        celixThreadMutex_lock(&sender->serializer.mutex);
-        celix_status_t rc = CELIX_SUCCESS;
-        if (sender->serializer.svc != NULL) {
-            rc = sender->serializer.svc->destroySerializerMap(sender->serializer.svc->handle, entry->msgTypes);
-        }
-        if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+        int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+        if (rc != 0) {
             L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
         }
-        celixThreadMutex_unlock(&sender->serializer.mutex);
 
         free(entry);
     }

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
index 23a0c25..e537edd 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -29,17 +29,18 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         const char *scope,
         const char *topic,
         long serializerSvcId,
+        pubsub_serializer_service_t *ser,
         const char *bindIP,
         unsigned int basePort,
         unsigned int maxPort);
 void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);
 
-const char* pubsub_zmqTopicSender_psaType(pubsub_zmq_topic_sender_t *sender);
-const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender);
 const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender);
 const char* pubsub_zmqTopicSender_topic(pubsub_zmq_topic_sender_t *sender);
 const char* pubsub_zmqTopicSender_url(pubsub_zmq_topic_sender_t *sender);
 
+long pubsub_zmqTopicSender_serializerSvcId(pubsub_zmq_topic_sender_t *sender);
+
 void pubsub_zmqTopicSender_connectTo(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
 void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 788e2de..a8e31ef 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -61,7 +61,6 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
 	status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
 	celixThreadMutexAttr_destroy(&psaAttr);
 
-	status |= celixThreadMutex_create(&manager->announcedEndpoints.mutex, NULL);
 	status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
 	status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
 	status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
@@ -70,7 +69,6 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
 
 	status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
 
-	manager->announcedEndpoints.map = hashMap_create(NULL, NULL, NULL, NULL);
 	manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 	manager->announceEndpointListeners.list = celix_arrayList_create();
 	manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL);
@@ -96,31 +94,13 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
 	celixThreadMutex_unlock(&manager->psaHandling.mutex);
 	celixThread_join(manager->psaHandling.thread, NULL);
 
-
-	celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
-	hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
-	while (hashMapIterator_hasNext(&iter)) {
-	    celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
-	    if (endpoints != NULL) {
-	        int size = celix_arrayList_size(endpoints);
-	        for (int i = 0; i < size; ++i) {
-	            celix_properties_t *ep = celix_arrayList_get(endpoints, i);
-	            celix_properties_destroy(ep);
-	        }
-	        celix_arrayList_destroy(endpoints);
-	    }
-	}
-	hashMap_destroy(manager->announcedEndpoints.map, false, false);
-	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
-	celixThreadMutex_destroy(&manager->announcedEndpoints.mutex);
-
 	celixThreadMutex_lock(&manager->pubsubadmins.mutex);
 	hashMap_destroy(manager->pubsubadmins.map, false, false);
 	celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
 	celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
 
 	celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
-    iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+    hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
         if (entry != NULL) {
@@ -140,8 +120,10 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
             free(entry->scopeAndTopicKey);
             free(entry->scope);
             free(entry->topic);
+            if (entry->endpoint != NULL) {
+                celix_properties_destroy(entry->endpoint);
+            }
             celix_properties_destroy(entry->subscriberProperties);
-            celix_properties_destroy(entry->endpoint);
             free(entry);
         }
     }
@@ -157,7 +139,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
             free(entry->scopeAndTopicKey);
             free(entry->scope);
             free(entry->topic);
-            celix_properties_destroy(entry->endpoint);
+            if (entry->endpoint != NULL) {
+                celix_properties_destroy(entry->endpoint);
+            }
             celix_filter_destroy(entry->publisherFilter);
             free(entry);
         }
@@ -175,6 +159,7 @@ void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_prope
 	pubsub_topology_manager_t *manager = handle;
 	pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
 
+
 	long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
 	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA");
 
@@ -184,6 +169,8 @@ void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_prope
 		celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
 	}
 
+	//NOTE new psa, so no endpoints announce yet
+
 	/* NOTE for now it assumed PSA / PST and PSD are started before subscribers/publisher
 	 * so no retroactively adding subscribers
 	 *
@@ -196,26 +183,6 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
 	//pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
 	long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
 
-	/* de-announce all publications */
-	celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
-	celix_array_list_t *endpointsList = hashMap_remove(manager->announcedEndpoints.map, (void*)svcId);
-	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
-
-	if (endpointsList != NULL) {
-		for (int i = 0; i < celix_arrayList_size(endpointsList); ++i) {
-			celix_properties_t *endpoint = celix_arrayList_get(endpointsList, i);
-			celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-			for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
-				pubsub_announce_endpoint_listener_t *listener;
-				listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
-				listener->removeEndpoint(listener->handle, endpoint);
-			}
-			celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-			celix_properties_destroy(endpoint);
-		}
-		celix_arrayList_destroy(endpointsList);
-	}
-
 	//NOTE psa shutdown will teardown topic receivers / topic senders
 	//de-setup all topic receivers/senders for the removed psa.
 	//the next psaHandling run will try to find new psa.
@@ -225,7 +192,18 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
 	while (hashMapIterator_hasNext(&iter)) {
 	    pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
 	    if (entry->selectedPsaSvcId == svcId) {
-	        entry->setup = false;
+			/* de-announce all senders */
+			if (entry->endpoint != NULL) {
+				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+				for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+					pubsub_announce_endpoint_listener_t *listener;
+					listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+					listener->removeEndpoint(listener->handle, entry->endpoint);
+				}
+				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+			}
+
+			entry->setup = false;
 	        entry->selectedSerializerSvcId = -1L;
 	        entry->selectedPsaSvcId = -1L;
 	        if (entry->endpoint != NULL) {
@@ -241,6 +219,17 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
         if (entry->selectedPsaSvcId == svcId) {
+			/* de-announce all receivers */
+			if (entry->endpoint != NULL) {
+				celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+				for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+					pubsub_announce_endpoint_listener_t *listener;
+					listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+					listener->removeEndpoint(listener->handle, entry->endpoint);
+				}
+				celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+			}
+
             entry->setup = false;
             entry->selectedSerializerSvcId = -1L;
             entry->selectedPsaSvcId = -1L;
@@ -334,16 +323,25 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, vo
 	//1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
 	//2) Add listener to manager->announceEndpointListeners
 
-	celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
-	hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
+	celixThreadMutex_lock(&manager->topicSenders.mutex);
+	hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
 	while (hashMapIterator_hasNext(&iter)) {
-		celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
-		for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
-			celix_properties_t *ep = celix_arrayList_get(endpoints, i);
-			listener->announceEndpoint(listener->handle, ep);
+		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+		if (entry != NULL && entry->endpoint != NULL) {
+			listener->announceEndpoint(listener->handle, entry->endpoint);
 		}
 	}
-	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
+	celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+	celixThreadMutex_lock(&manager->topicReceivers.mutex);
+	iter = hashMapIterator_construct(manager->topicReceivers.map);
+	while (hashMapIterator_hasNext(&iter)) {
+		pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+		if (entry != NULL && entry->endpoint != NULL) {
+			listener->announceEndpoint(listener->handle, entry->endpoint);
+		}
+	}
+	celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 
 	celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
 	celix_arrayList_add(manager->announceEndpointListeners.list, listener);
@@ -361,17 +359,6 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle,
 	celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
 	celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
 	celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-
-	celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
-	hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
-	while (hashMapIterator_hasNext(&iter)) {
-		celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
-		for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
-			celix_properties_t *ep = celix_arrayList_get(endpoints, i);
-			listener->removeEndpoint(listener->handle, ep);
-		}
-	}
-	celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
 }
 
 void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {

http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 3cca612..d6db117 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -46,11 +46,6 @@ typedef struct pubsub_topology_manager {
 
 	struct {
 		celix_thread_mutex_t mutex;
-		hash_map_t *map; //key = psa svc id, value = list<celix_properties_t /*endpoint*/>
-	} announcedEndpoints;
-
-	struct {
-		celix_thread_mutex_t mutex;
 		hash_map_t *map; //key = uuid , value = pstm_discovered_endpoint_entry_t
 	} discoveredEndpoints;