You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:33 UTC

[16/34] celix git commit: CELIX-454: Removes now unused psa zmq source files

CELIX-454: Removes now unused psa zmq source files


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/f1dfdbdf
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/f1dfdbdf
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/f1dfdbdf

Branch: refs/heads/develop
Commit: f1dfdbdfec7be8b8730a556953faad25140a4e08
Parents: 00454ef
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Thu Sep 27 17:00:23 2018 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Thu Sep 27 17:00:23 2018 +0200

----------------------------------------------------------------------
 .../pubsub_admin_zmq/src/pubsub_admin_impl.c    | 1206 ------------------
 .../pubsub_admin_zmq/src/pubsub_admin_impl.h    |  123 --
 .../pubsub_admin_zmq/src/topic_publication.c    |  639 ----------
 .../pubsub_admin_zmq/src/topic_publication.h    |   49 -
 .../pubsub_admin_zmq/src/topic_subscription.c   |  761 -----------
 .../pubsub_admin_zmq/src/topic_subscription.h   |   60 -
 6 files changed, 2838 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/f1dfdbdf/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
deleted file mode 100644
index 34cc6f9..0000000
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
+++ /dev/null
@@ -1,1206 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin_impl.c
- *
- *  \date       Sep 30, 2011
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include "pubsub_admin_impl.h"
-#include <zmq.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-#include <arpa/inet.h>
-#include <sys/socket.h>
-#include <netdb.h>
-
-#ifndef ANDROID
-#include <ifaddrs.h>
-#endif
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-
-#include "constants.h"
-#include "utils.h"
-#include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
-#include "bundle.h"
-#include "service_reference.h"
-#include "service_registration.h"
-#include "log_helper.h"
-#include "log_service.h"
-#include "celix_threads.h"
-#include "service_factory.h"
-
-#include "topic_subscription.h"
-#include "topic_publication.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_utils.h"
-#include "pubsub/subscriber.h"
-
-#define MAX_KEY_FOLDER_PATH_LENGTH 512
-
-static const char *DEFAULT_IP = "127.0.0.1";
-
-static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip);
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut);
-static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
-
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
-    celix_status_t status = CELIX_SUCCESS;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-    if (!zsys_has_curve()){
-        printf("PSA_ZMQ: zeromq curve unsupported\n");
-        return CELIX_SERVICE_EXCEPTION;
-    }
-#endif
-
-    *admin = calloc(1, sizeof(**admin));
-
-    if (!*admin){
-        return CELIX_ENOMEM;
-    }
-
-    const char *ip = NULL;
-    char *detectedIp = NULL;
-    (*admin)->bundle_context= context;
-    (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-    (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-    (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-    (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-    (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
-    (*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, NULL, NULL);
-    arrayList_create(&((*admin)->noSerializerSubscriptions));
-    arrayList_create(&((*admin)->noSerializerPublications));
-    arrayList_create(&((*admin)->serializerList));
-
-    celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
-    celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
-    celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
-    celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
-    celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
-
-    celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
-    celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-    celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr);
-
-    celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
-    celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-    celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
-
-    if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
-        logHelper_start((*admin)->loghelper);
-    }
-
-    bundleContext_getProperty(context,PSA_IP , &ip);
-
-#ifndef ANDROID
-    if (ip == NULL) {
-        const char *interface = NULL;
-
-        bundleContext_getProperty(context, PSA_ITF, &interface);
-        if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) {
-            logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface %s", interface);
-        }
-
-        ip = detectedIp;
-    }
-#endif
-
-    if (ip != NULL) {
-        logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip);
-        (*admin)->ipAddress = strdup(ip);
-    }
-    else {
-        logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. Using %s", DEFAULT_IP);
-        (*admin)->ipAddress = strdup(DEFAULT_IP);
-    }
-
-    if (detectedIp != NULL) {
-        free(detectedIp);
-    }
-
-    const char* basePortStr = NULL;
-    const char* maxPortStr = NULL;
-    char* endptrBase = NULL;
-    char* endptrMax = NULL;
-    bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
-    bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
-    (*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
-    (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
-    if (*endptrBase != '\0') {
-        (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
-    }
-    if (*endptrMax != '\0') {
-        (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
-    }
-
-    // Disable Signal Handling by CZMQ
-    setenv("ZSYS_SIGHANDLER", "false", true);
-
-    const char *nrZmqThreads = NULL;
-    bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads);
-
-    if(nrZmqThreads != NULL) {
-        char *endPtr = NULL;
-        unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10);
-        if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) {
-            zsys_set_io_threads(nrThreads);
-            logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads);
-            printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads);
-        }
-    }
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-    // Setup authenticator
-    zactor_t* auth = zactor_new (zauth, NULL);
-    zstr_sendx(auth, "VERBOSE", NULL);
-
-    // Load all public keys of subscribers into the application
-    // This step is done for authenticating subscribers
-    char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH];
-    char* keys_bundle_dir = pubsub_getKeysBundleDir(context);
-    snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir);
-    zstr_sendx (auth, "CURVE", curve_folder_path, NULL);
-    free(keys_bundle_dir);
-
-    (*admin)->zmq_auth = auth;
-#endif
-
-
-    (*admin)->defaultScore = PSA_ZMQ_DEFAULT_SCORE;
-    (*admin)->qosSampleScore = PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE;
-    (*admin)->qosControlScore = PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE;
-
-    const char *defaultScoreStr = NULL;
-    const char *sampleScoreStr = NULL;
-    const char *controlScoreStr = NULL;
-    bundleContext_getProperty(context, PSA_ZMQ_DEFAULT_SCORE_KEY, &defaultScoreStr);
-    bundleContext_getProperty(context, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, &sampleScoreStr);
-    bundleContext_getProperty(context, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, &controlScoreStr);
-
-    if (defaultScoreStr != NULL) {
-        (*admin)->defaultScore = strtof(defaultScoreStr, NULL);
-    }
-    if (sampleScoreStr != NULL) {
-        (*admin)->qosSampleScore = strtof(sampleScoreStr, NULL);
-    }
-    if (controlScoreStr != NULL) {
-        (*admin)->qosControlScore = strtof(controlScoreStr, NULL);
-    }
-
-    (*admin)->verbose = PSA_ZMQ_DEFAULT_VERBOSE;
-    const char *verboseStr = NULL;
-    bundleContext_getProperty(context, PSA_ZMQ_VERBOSE_KEY, &verboseStr);
-    if (verboseStr != NULL) {
-        (*admin)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
-    }
-
-    if ((*admin)->verbose) {
-        printf("PSA ZMQ Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
-    }
-
-    return status;
-}
-
-
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
-{
-    celix_status_t status = CELIX_SUCCESS;
-
-    free(admin->ipAddress);
-
-    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-    hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
-    while(hashMapIterator_hasNext(iter)){
-        hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-        free((char*)hashMapEntry_getKey(entry));
-        arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
-    }
-    hashMapIterator_destroy(iter);
-    hashMap_destroy(admin->pendingSubscriptions,false,false);
-    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-    celixThreadMutex_lock(&admin->subscriptionsLock);
-    hashMap_destroy(admin->subscriptions,false,false);
-    celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-    celixThreadMutex_lock(&admin->localPublicationsLock);
-    hashMap_destroy(admin->localPublications,true,false);
-    celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-    celixThreadMutex_lock(&admin->externalPublicationsLock);
-    iter = hashMapIterator_create(admin->externalPublications);
-    while(hashMapIterator_hasNext(iter)){
-        hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-        free((char*)hashMapEntry_getKey(entry));
-        arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
-    }
-    hashMapIterator_destroy(iter);
-    hashMap_destroy(admin->externalPublications,false,false);
-    celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
-    celixThreadMutex_lock(&admin->serializerListLock);
-    arrayList_destroy(admin->serializerList);
-    celixThreadMutex_unlock(&admin->serializerListLock);
-
-    celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-    arrayList_destroy(admin->noSerializerSubscriptions);
-    arrayList_destroy(admin->noSerializerPublications);
-    celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-
-
-    celixThreadMutex_lock(&admin->usedSerializersLock);
-
-    iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
-    while(hashMapIterator_hasNext(iter)){
-        arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
-    }
-    hashMapIterator_destroy(iter);
-    hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
-
-    iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
-    while(hashMapIterator_hasNext(iter)){
-        arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
-    }
-    hashMapIterator_destroy(iter);
-    hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
-
-    celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-    celixThreadMutex_destroy(&admin->usedSerializersLock);
-    celixThreadMutex_destroy(&admin->serializerListLock);
-    celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
-
-    celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
-    celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
-
-    celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
-    celixThreadMutex_destroy(&admin->subscriptionsLock);
-
-    celixThreadMutex_destroy(&admin->localPublicationsLock);
-    celixThreadMutex_destroy(&admin->externalPublicationsLock);
-
-    logHelper_stop(admin->loghelper);
-
-    logHelper_destroy(&admin->loghelper);
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-    if (admin->zmq_auth != NULL){
-        zactor_destroy(&(admin->zmq_auth));
-    }
-#endif
-
-    free(admin);
-
-    return status;
-}
-
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
-    celix_status_t status = CELIX_SUCCESS;
-
-    celixThreadMutex_lock(&admin->subscriptionsLock);
-
-    topic_subscription_pt any_sub = hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-
-    if(any_sub==NULL){
-
-        int i;
-        pubsub_serializer_service_t *best_serializer = NULL;
-        const char *serType = NULL;
-        if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
-            status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, serType, &any_sub);
-        }
-        else{
-            printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-                    properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-            celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-            arrayList_add(admin->noSerializerSubscriptions,subEP);
-            celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-        }
-
-        if (status == CELIX_SUCCESS){
-
-            /* Connect all internal publishers */
-            celixThreadMutex_lock(&admin->localPublicationsLock);
-            hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications);
-            while(hashMapIterator_hasNext(lp_iter)){
-                service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter);
-                topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
-                array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
-
-                if(topic_publishers!=NULL){
-                    for(i=0;i<arrayList_size(topic_publishers);i++){
-                        pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                        if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){
-                            status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-                        }
-                    }
-                    arrayList_destroy(topic_publishers);
-                }
-            }
-            hashMapIterator_destroy(lp_iter);
-            celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-            /* Connect also all external publishers */
-            celixThreadMutex_lock(&admin->externalPublicationsLock);
-            hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications);
-            while(hashMapIterator_hasNext(extp_iter)){
-                array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
-                if(ext_pub_list!=NULL){
-                    for(i=0;i<arrayList_size(ext_pub_list);i++){
-                        pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                        if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){
-                            status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-                        }
-                    }
-                }
-            }
-            hashMapIterator_destroy(extp_iter);
-            celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
-
-            pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
-
-            status += pubsub_topicSubscriptionStart(any_sub);
-
-        }
-
-        if (status == CELIX_SUCCESS){
-            hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
-            connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false);
-        }
-
-    }
-
-    celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-    return status;
-}
-
-/**
- * A subcriber service is registered and this PSA had won the match
- * (based on qos or other meta data)
- * Will update the pubsub endpoint with the chosen pubsub admin and serializer type
- */
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
-    celix_status_t status = CELIX_SUCCESS;
-
-    if(strcmp(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME), PUBSUB_ANY_SUB_TOPIC)==0) {
-        return pubsubAdmin_addAnySubscription(admin,subEP);
-    }
-
-    pubsub_serializer_service_t *best_serializer = NULL;
-    const char *serType = NULL;
-    status = pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType);
-
-
-    if (status == CELIX_SUCCESS) {
-        //admin type and ser type now known -> update ep
-        pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_ADMIN_TYPE, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
-        pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_SERIALIZER, serType);
-    } else {
-        printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-        celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-        arrayList_add(admin->noSerializerSubscriptions,subEP);
-        celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-    }
-
-    char *scope_topic = NULL;
-    if (status == CELIX_SUCCESS) {
-        /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
-        celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-        celixThreadMutex_lock(&admin->subscriptionsLock);
-        celixThreadMutex_lock(&admin->localPublicationsLock);
-        celixThreadMutex_lock(&admin->externalPublicationsLock);
-
-        scope_topic = pubsubEndpoint_createScopeTopicKey(
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-
-        service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic);
-        array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
-
-        if (factory == NULL && ext_pub_list == NULL) { //No (local or external) publishers yet for this topic
-            pubsubAdmin_addSubscriptionToPendingList(admin, subEP);
-        } else {
-            int i;
-            topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
-
-            if (subscription == NULL) {
-                status += pubsub_topicSubscriptionCreate(admin->bundle_context,
-                                                         (char *) properties_get(subEP->properties,
-                                                                                 PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                                                         (char *) properties_get(subEP->properties,
-                                                                                 PUBSUB_ENDPOINT_TOPIC_NAME),
-                                                         best_serializer, serType, &subscription);
-
-                if (status == CELIX_SUCCESS) {
-                    //got type and serializer -> update endpoint
-                    pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_ADMIN_TYPE, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
-                    pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_SERIALIZER, serType);
-
-                    /* Try to connect internal publishers */
-                    if (factory != NULL) {
-                        topic_publication_pt topic_pubs = (topic_publication_pt) factory->handle;
-                        array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
-
-                        if (topic_publishers != NULL) {
-                            for (i = 0; i < arrayList_size(topic_publishers); i++) {
-                                pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(topic_publishers, i);
-                                if (properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) {
-                                    status += pubsub_topicSubscriptionConnectPublisher(subscription,
-                                                                                       (char *) properties_get(
-                                                                                               pubEP->properties,
-                                                                                               PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-                                }
-                            }
-                            arrayList_destroy(topic_publishers);
-                        }
-
-                    }
-
-                    /* Look also for external publishers */
-                    if (ext_pub_list != NULL) {
-                        for (i = 0; i < arrayList_size(ext_pub_list); i++) {
-                            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(ext_pub_list, i);
-                            if (properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) {
-                                status += pubsub_topicSubscriptionConnectPublisher(subscription,
-                                                                                   (char *) properties_get(
-                                                                                           pubEP->properties,
-                                                                                           PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-                            }
-                        }
-                    }
-
-                    pubsub_topicSubscriptionAddSubscriber(subscription, subEP);
-
-                    status += pubsub_topicSubscriptionStart(subscription);
-
-                }
-
-                if (status == CELIX_SUCCESS) {
-
-                    hashMap_put(admin->subscriptions, strdup(scope_topic), subscription);
-
-                    connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
-                }
-            }
-
-            if (status == CELIX_SUCCESS) {
-                pubsub_topicIncreaseNrSubscribers(subscription);
-            }
-        }
-    }
-
-    free(scope_topic);
-    celixThreadMutex_unlock(&admin->externalPublicationsLock);
-    celixThreadMutex_unlock(&admin->localPublicationsLock);
-    celixThreadMutex_unlock(&admin->subscriptionsLock);
-    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-
-    if (admin->verbose) {
-        printf("PSA_ZMQ: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-                properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
-                properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-    }
-
-    return status;
-
-}
-
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
-    celix_status_t status = CELIX_SUCCESS;
-
-    if (admin->verbose) {
-        printf("PSA_ZMQ: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-                properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
-                properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
-                properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-    }
-
-    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-
-    celixThreadMutex_lock(&admin->subscriptionsLock);
-    topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-    if(sub!=NULL){
-        pubsub_topicDecreaseNrSubscribers(sub);
-        if(pubsub_topicGetNrSubscribers(sub) == 0) {
-            status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
-        }
-    }
-    celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-    if(sub==NULL){
-        /* Maybe the endpoint was pending */
-        celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-        if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){
-            status = CELIX_ILLEGAL_STATE;
-        }
-        celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-    }
-
-    free(scope_topic);
-
-
-
-    return status;
-
-}
-
-
-/**
- * A bundle has shown interested in a publisher service and this PSA had won the match
- * based on filter or embedded topic.properties (extender pattern)
- * OR !!
- * A remote publication has been discovered and forwarded to this call
- * Will update the pubsub endpoint with the chosen pubsub admin and serializer type
- */
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    const char *fwUUID = NULL;
-    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-    if (fwUUID == NULL) {
-        printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
-        return CELIX_INVALID_BUNDLE_CONTEXT;
-    }
-
-    const char *epFwUUID = properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
-    bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
-
-    if (isOwn) {
-        //should be null, willl be set in this call
-        assert(properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY) == NULL);
-        assert(properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
-    } else {
-        //inverse ADMIN_TYPE_KEY and SERIALIZER_TYPE shoudl not be null
-        assert(properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY) != NULL);
-        assert(properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY) != NULL);
-    }
-
-    if (isOwn) {
-        properties_set(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
-    }
-
-
-    char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-
-    if ((strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) &&
-            (properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) == NULL)) {
-
-        celixThreadMutex_lock(&admin->localPublicationsLock);
-
-        service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic);
-
-        if (factory == NULL) {
-            topic_publication_pt pub = NULL;
-            pubsub_serializer_service_t *best_serializer = NULL;
-            const char *serType = NULL;
-            if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
-                status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, serType, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
-                if (isOwn) {
-                    properties_set(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY, serType);
-                }
-            }
-            else {
-                printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
-                        properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                arrayList_add(admin->noSerializerPublications,pubEP);
-                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-            }
-
-            if (status == CELIX_SUCCESS) {
-                status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
-                if (status == CELIX_SUCCESS && factory != NULL) {
-                    hashMap_put(admin->localPublications, strdup(scope_topic), factory);
-                    connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
-                }
-            } else {
-                printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s.\n",
-                        properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                        properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-            }
-        } else {
-            //just add the new EP to the list
-            topic_publication_pt pub = (topic_publication_pt) factory->handle;
-            pubsub_topicPublicationAddPublisherEP(pub, pubEP);
-        }
-
-        celixThreadMutex_unlock(&admin->localPublicationsLock);
-    }
-    else{
-
-        celixThreadMutex_lock(&admin->externalPublicationsLock);
-        array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
-        if (ext_pub_list == NULL) {
-            arrayList_create(&ext_pub_list);
-            hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list);
-        }
-
-        arrayList_add(ext_pub_list, pubEP);
-
-        celixThreadMutex_unlock(&admin->externalPublicationsLock);
-    }
-
-    /* Re-evaluate the pending subscriptions */
-    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-
-    hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
-    if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them.
-        char* topic = (char*) hashMapEntry_getKey(pendingSub);
-        array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
-        int i;
-        for (i = 0; i < arrayList_size(pendingSubList); i++) {
-            pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
-            pubsubAdmin_addSubscription(admin, subEP);
-        }
-        hashMap_remove(admin->pendingSubscriptions, scope_topic);
-        arrayList_clear(pendingSubList);
-        arrayList_destroy(pendingSubList);
-        free(topic);
-    }
-
-    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-    /* Connect the new publisher to the subscription for his topic, if there is any */
-    celixThreadMutex_lock(&admin->subscriptionsLock);
-
-    topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
-    if (sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) {
-        pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-    }
-
-    /* And check also for ANY subscription */
-    topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-    if (any_sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) {
-        pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-    }
-
-    free(scope_topic);
-
-    celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-
-    if (admin->verbose) {
-        printf("PSA_ZMQ: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-                properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
-                properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
-                properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY),
-                isOwn);
-    }
-
-
-    return status;
-
-}
-
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
-    celix_status_t status = CELIX_SUCCESS;
-    int count = 0;
-
-    if (admin->verbose) {
-        printf("PSA_ZMQ: Removing publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-                properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
-                properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
-                properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-    }
-
-    const char* fwUUID = NULL;
-
-    bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-    if(fwUUID==NULL){
-        fprintf(stderr, "ERROR PSA_ZMQ: Cannot retrieve fwUUID.\n");
-        return CELIX_INVALID_BUNDLE_CONTEXT;
-    }
-    char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-
-    if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
-
-        celixThreadMutex_lock(&admin->localPublicationsLock);
-        service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
-        if(factory!=NULL){
-            topic_publication_pt pub = (topic_publication_pt)factory->handle;
-            pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
-        }
-        celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-        if(factory==NULL){
-            /* Maybe the endpoint was pending */
-            celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-            if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
-                status = CELIX_ILLEGAL_STATE;
-            }
-            celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-        }
-    }
-    else{
-
-        celixThreadMutex_lock(&admin->externalPublicationsLock);
-        array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
-        if(ext_pub_list!=NULL){
-            int i;
-            bool found = false;
-            for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
-                pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                found = pubsubEndpoint_equals(pubEP,p);
-                if (found){
-                    arrayList_remove(ext_pub_list,i);
-                }
-            }
-            // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
-            for(i=0; i<arrayList_size(ext_pub_list);i++) {
-                pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                if (strcmp(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY),properties_get(p->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)) == 0) {
-                    count++;
-                }
-            }
-
-            if(arrayList_size(ext_pub_list)==0){
-                hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
-                char* topic = (char*)hashMapEntry_getKey(entry);
-                array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
-                hashMap_remove(admin->externalPublications,topic);
-                arrayList_destroy(list);
-                free(topic);
-            }
-        }
-
-        celixThreadMutex_unlock(&admin->externalPublicationsLock);
-    }
-
-    /* Check if this publisher was connected to one of our subscribers*/
-    celixThreadMutex_lock(&admin->subscriptionsLock);
-
-    topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-    if(sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)!=NULL && count == 0){
-        pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-    }
-
-    /* And check also for ANY subscription */
-    topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-    if(any_sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)!=NULL && count == 0){
-        pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
-    }
-
-    free(scope_topic);
-    celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-    return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){
-    celix_status_t status = CELIX_SUCCESS;
-
-    if (admin->verbose) {
-        printf("PSA_ZMQ: Closing all publications\n");
-    }
-
-    celixThreadMutex_lock(&admin->localPublicationsLock);
-    char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
-    if(pubsvc_entry!=NULL){
-        char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
-        service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
-        topic_publication_pt pub = (topic_publication_pt)factory->handle;
-        status += pubsub_topicPublicationStop(pub);
-        disconnectTopicPubSubFromSerializer(admin, pub, true);
-        status += pubsub_topicPublicationDestroy(pub);
-        hashMap_remove(admin->localPublications,scope_topic);
-        free(key);
-        free(factory);
-    }
-    free(scope_topic);
-    celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-    return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){
-    celix_status_t status = CELIX_SUCCESS;
-
-    if (admin->verbose) {
-        printf("PSA_ZMQ: Closing all subscriptions\n");
-    }
-
-    celixThreadMutex_lock(&admin->subscriptionsLock);
-    char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
-    if(sub_entry!=NULL){
-        char* topic = (char*)hashMapEntry_getKey(sub_entry);
-
-        topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry);
-        status += pubsub_topicSubscriptionStop(ts);
-        disconnectTopicPubSubFromSerializer(admin, ts, false);
-        status += pubsub_topicSubscriptionDestroy(ts);
-        hashMap_remove(admin->subscriptions,scope_topic);
-        free(topic);
-
-    }
-    free(scope_topic);
-    celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-    return status;
-
-}
-
-
-#ifndef ANDROID
-static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) {
-    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
-
-    struct ifaddrs *ifaddr, *ifa;
-    char host[NI_MAXHOST];
-
-    if (getifaddrs(&ifaddr) != -1)
-    {
-        for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
-        {
-            if (ifa->ifa_addr == NULL)
-                continue;
-
-            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
-                if (interface == NULL) {
-                    *ip = strdup(host);
-                    status = CELIX_SUCCESS;
-                }
-                else if (strcmp(ifa->ifa_name, interface) == 0) {
-                    *ip = strdup(host);
-                    status = CELIX_SUCCESS;
-                }
-            }
-        }
-
-        freeifaddrs(ifaddr);
-    }
-
-    return status;
-}
-#endif
-
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
-    celix_status_t status = CELIX_SUCCESS;
-    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-    array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
-    if(pendingListPerTopic==NULL){
-        arrayList_create(&pendingListPerTopic);
-        hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
-    }
-    arrayList_add(pendingListPerTopic,subEP);
-    free(scope_topic);
-    return status;
-}
-
-celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){
-    /* Assumption: serializers are all available at startup.
-     * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */
-
-    celix_status_t status = CELIX_SUCCESS;
-    int i=0;
-
-    const char *serType = NULL;
-    serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-    if (serType == NULL) {
-        fprintf(stderr, "WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
-        return CELIX_SERVICE_EXCEPTION;
-    }
-
-    pubsub_admin_pt admin = (pubsub_admin_pt)handle;
-    celixThreadMutex_lock(&admin->serializerListLock);
-    arrayList_add(admin->serializerList, reference);
-    celixThreadMutex_unlock(&admin->serializerListLock);
-
-    /* Now let's re-evaluate the pending */
-    celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-
-    for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
-        pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
-        pubsub_serializer_service_t *best_serializer = NULL;
-        pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
-        if(best_serializer != NULL){ /* Finally we have a valid serializer! */
-            pubsubAdmin_addSubscription(admin, ep);
-        }
-    }
-
-    for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
-        pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
-        pubsub_serializer_service_t *best_serializer = NULL;
-        pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
-        if(best_serializer != NULL){ /* Finally we have a valid serializer! */
-            pubsubAdmin_addPublication(admin, ep);
-        }
-    }
-
-    celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-
-    if (admin->verbose) {
-        printf("PSA_ZMQ: %s serializer added\n", serType);
-    }
-
-    return status;
-}
-
-celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){
-
-    pubsub_admin_pt admin = (pubsub_admin_pt)handle;
-    int i=0, j=0;
-    const char *serType = NULL;
-
-    serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-    if (serType == NULL) {
-        printf("WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
-        return CELIX_SERVICE_EXCEPTION;
-    }
-
-    celixThreadMutex_lock(&admin->serializerListLock);
-    /* Remove the serializer from the list */
-    arrayList_removeElement(admin->serializerList, reference);
-    celixThreadMutex_unlock(&admin->serializerListLock);
-
-
-    celixThreadMutex_lock(&admin->usedSerializersLock);
-    array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
-    array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
-    celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-    /* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */
-    if(topicPubList!=NULL){
-        for(i=0;i<arrayList_size(topicPubList);i++){
-            topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i);
-            /* Stop the topic publication */
-            pubsub_topicPublicationStop(topicPub);
-            /* Get the endpoints that are going to be orphan */
-            array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub);
-            for(j=0;j<arrayList_size(pubList);j++){
-                pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j);
-                /* Remove the publication */
-                pubsubAdmin_removePublication(admin, pubEP);
-                /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
-                if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)!=NULL){
-                    properties_unset(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY);
-                }
-                /* Add the orphan endpoint to the noSerializer pending list */
-                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                arrayList_add(admin->noSerializerPublications,pubEP);
-                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-            }
-            arrayList_destroy(pubList);
-
-            /* Cleanup also the localPublications hashmap*/
-            celixThreadMutex_lock(&admin->localPublicationsLock);
-            hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications);
-            char *key = NULL;
-            service_factory_pt factory = NULL;
-            while(hashMapIterator_hasNext(iter)){
-                hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-                factory = (service_factory_pt)hashMapEntry_getValue(entry);
-                topic_publication_pt pub = (topic_publication_pt)factory->handle;
-                if(pub==topicPub){
-                    key = (char*)hashMapEntry_getKey(entry);
-                    break;
-                }
-            }
-            hashMapIterator_destroy(iter);
-            if(key!=NULL){
-                hashMap_remove(admin->localPublications, key);
-                free(factory);
-                free(key);
-            }
-            celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-            /* Finally destroy the topicPublication */
-            pubsub_topicPublicationDestroy(topicPub);
-        }
-        arrayList_destroy(topicPubList);
-    }
-
-    /* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */
-    if(topicSubList!=NULL){
-        for(i=0;i<arrayList_size(topicSubList);i++){
-            topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i);
-            /* Stop the topic subscription */
-            pubsub_topicSubscriptionStop(topicSub);
-            /* Get the endpoints that are going to be orphan */
-            array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub);
-            for(j=0;j<arrayList_size(subList);j++){
-                pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j);
-                /* Remove the subscription */
-                pubsubAdmin_removeSubscription(admin, subEP);
-                /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
-                if(properties_get(subEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)!=NULL){
-                    properties_unset(subEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY);
-                }
-                /* Add the orphan endpoint to the noSerializer pending list */
-                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                arrayList_add(admin->noSerializerSubscriptions,subEP);
-                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-            }
-
-            /* Cleanup also the subscriptions hashmap*/
-            celixThreadMutex_lock(&admin->subscriptionsLock);
-            hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions);
-            char *key = NULL;
-            while(hashMapIterator_hasNext(iter)){
-                hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-                topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry);
-                if(sub==topicSub){
-                    key = (char*)hashMapEntry_getKey(entry);
-                    break;
-                }
-            }
-            hashMapIterator_destroy(iter);
-            if(key!=NULL){
-                hashMap_remove(admin->subscriptions, key);
-                free(key);
-            }
-            celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-            /* Finally destroy the topicSubscription */
-            pubsub_topicSubscriptionDestroy(topicSub);
-        }
-        arrayList_destroy(topicSubList);
-    }
-
-
-    if (admin->verbose) {
-        printf("PSA_ZMQ: %s serializer removed\n", serType);
-    }
-
-    return CELIX_SUCCESS;
-}
-
-celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
-    celix_status_t status = CELIX_SUCCESS;
-
-    const char *fwUuid = NULL;
-    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
-    if (fwUuid == NULL) {
-        return CELIX_ILLEGAL_STATE;
-    }
-
-    celixThreadMutex_lock(&admin->serializerListLock);
-    status = pubsub_admin_match(endpoint, PSA_ZMQ_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, score);
-    celixThreadMutex_unlock(&admin->serializerListLock);
-
-    return status;
-}
-
-/* This one recall the same logic as in the match function */
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    pubsub_serializer_service_t *serSvc = NULL;
-    service_reference_pt svcRef = NULL;
-
-    celixThreadMutex_lock(&admin->serializerListLock);
-    status = pubsub_admin_get_best_serializer(ep->properties, admin->serializerList, &svcRef);
-    celixThreadMutex_unlock(&admin->serializerListLock);
-
-    if (svcRef != NULL) {
-        bundleContext_getService(admin->bundle_context, svcRef, (void**)&serSvc);
-        bundleContext_ungetService(admin->bundle_context, svcRef, NULL); //TODO, FIXME this should not be done this way. only unget if the service is not used any more
-        if (serTypeOut != NULL) {
-            serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, serTypeOut);
-        }
-    }
-
-
-    *svcOut = serSvc;
-
-    return status;
-}
-
-static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
-
-    celixThreadMutex_lock(&admin->usedSerializersLock);
-
-    hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
-    array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
-    if(list==NULL){
-        arrayList_create(&list);
-        hashMap_put(map,serializer,list);
-    }
-    arrayList_add(list,topicPubSub);
-
-    celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-}
-
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){
-
-    celixThreadMutex_lock(&admin->usedSerializersLock);
-
-    hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
-    hash_map_iterator_pt iter = hashMapIterator_create(map);
-    while(hashMapIterator_hasNext(iter)){
-        array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter);
-        if(arrayList_removeElement(list, topicPubSub)){ //Found it!
-            break;
-        }
-    }
-    hashMapIterator_destroy(iter);
-
-    celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-}
-

http://git-wip-us.apache.org/repos/asf/celix/blob/f1dfdbdf/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
deleted file mode 100644
index 47a946c..0000000
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin_impl.h
- *
- *  \date       Dec 5, 2013
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_ADMIN_ZMQ_IMPL_H_
-#define PUBSUB_ADMIN_ZMQ_IMPL_H_
-
-#include <czmq.h>
-/* The following undefs prevent the collision between:
- * - sys/syslog.h (which is included within czmq)
- * - celix/dfi/dfi_log_util.h
- */
-#undef LOG_DEBUG
-#undef LOG_WARNING
-#undef LOG_INFO
-#undef LOG_WARNING
-
-#include "pubsub_psa_zmq_constants.h"
-#include "pubsub_admin.h"
-#include "pubsub_utils.h"
-#include "log_helper.h"
-#include "command.h"
-
-#define PUBSUB_PSA_ZMQ_PSA_TYPE		"zmq"
-#define PUBSUB_PSA_ZMQ_IP 			"PSA_IP"
-#define PUBSUB_PSA_ZMQ_ITF			"PSA_INTERFACE"
-
-
-typedef struct pubsub_admin {
-
-	bundle_context_pt bundle_context;
-	log_helper_pt loghelper;
-
-	/* List of the available serializers */
-	celix_thread_mutex_t serializerListLock;
-	array_list_pt serializerList; // List<serializers service references>
-
-	celix_thread_mutex_t localPublicationsLock;
-	hash_map_pt localPublications;//<topic(string),service_factory_pt>
-
-	celix_thread_mutex_t externalPublicationsLock;
-	hash_map_pt externalPublications;//<topic(string),List<pubsub_ep>>
-
-	celix_thread_mutex_t subscriptionsLock;
-	hash_map_pt subscriptions; //<topic(string),topic_subscription>
-
-	celix_thread_mutex_t pendingSubscriptionsLock;
-	celix_thread_mutexattr_t pendingSubscriptionsAttr;
-	hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
-
-	/* Those are used to keep track of valid subscriptions/publications that still have no valid serializer */
-	celix_thread_mutex_t noSerializerPendingsLock;
-	celix_thread_mutexattr_t noSerializerPendingsAttr;
-	array_list_pt noSerializerSubscriptions; // List<pubsub_ep>
-	array_list_pt noSerializerPublications; // List<pubsub_ep>
-
-	celix_thread_mutex_t usedSerializersLock;
-	hash_map_pt topicSubscriptionsPerSerializer; // <serializer,List<topicSubscription>>
-	hash_map_pt topicPublicationsPerSerializer; // <serializer,List<topicPublications>>
-
-	char* ipAddress;
-
-	zactor_t* zmq_auth;
-
-    unsigned int basePort;
-    unsigned int maxPort;
-
-	double qosSampleScore;
-	double qosControlScore;
-	double defaultScore;
-
-	bool verbose;
-} pubsub_admin_t;
-
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t **admin);
-celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin);
-
-void pubsubAdmin_addSerializer(void * handle, void *svc, const celix_properties_t *properties);
-void pubsubAdmin_removeSerializer(void * handle, void *svc, const celix_properties_t *properties);
-
-
-
-//for the pubsub_admin_service
-
-celix_status_t pubsubAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-celix_status_t pubsubAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-celix_status_t pubsubAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score);
-
-//note endpoint is owned by caller
-celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
-celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-
-//note endpoint is owned by caller
-celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
-celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic);
-
-celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsubAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
-
-
-#endif /* PUBSUB_ADMIN_ZMQ_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f1dfdbdf/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
deleted file mode 100644
index 07f4466..0000000
--- a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
+++ /dev/null
@@ -1,639 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <czmq.h>
-/* The following undefs prevent the collision between:
- * - sys/syslog.h (which is included within czmq)
- * - celix/dfi/dfi_log_util.h
- */
-#undef LOG_DEBUG
-#undef LOG_WARNING
-#undef LOG_INFO
-#undef LOG_WARNING
-
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "array_list.h"
-#include "celixbool.h"
-#include "service_registration.h"
-#include "utils.h"
-#include "service_factory.h"
-#include "version.h"
-
-#include "pubsub_common.h"
-#include "pubsub_utils.h"
-#include "pubsub/publisher.h"
-
-#include "topic_publication.h"
-
-#include "pubsub_serializer.h"
-#include "pubsub_psa_zmq_constants.h"
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	#include "zmq_crypto.h"
-
-	#define MAX_CERT_PATH_LENGTH 512
-#endif
-
-#define EP_ADDRESS_LEN		32
-#define ZMQ_BIND_MAX_RETRY	5
-
-#define FIRST_SEND_DELAY	2
-
-struct topic_publication {
-	zsock_t* zmq_socket;
-	celix_thread_mutex_t socket_lock; //Protects zmq_socket access
-	zcert_t * zmq_cert;
-	char* endpoint;
-	service_registration_pt svcFactoryReg;
-	array_list_pt pub_ep_list; //List<pubsub_endpoint>
-	hash_map_pt boundServices; //<bundle_pt,bound_service>
-	struct {
-		const char* type;
-		pubsub_serializer_service_t *svc;
-	} serializer;
-	celix_thread_mutex_t tp_lock;
-};
-
-typedef struct publish_bundle_bound_service {
-	topic_publication_pt parent;
-	pubsub_publisher_t service;
-	bundle_pt bundle;
-	char *topic;
-	hash_map_pt msgTypes;
-	unsigned short getCount;
-	celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure
-	bool mp_send_in_progress;
-	array_list_pt mp_parts;
-}* publish_bundle_bound_service_pt;
-
-/* Note: correct locking order is
- * 1. tp_lock
- * 2. mp_lock
- * 3. socket_lock
- *
- * tp_lock and socket_lock are independent.
- */
-
-typedef struct pubsub_msg{
-	pubsub_msg_header_pt header;
-	char* payload;
-	int payloadSize;
-}* pubsub_msg_pt;
-
-static unsigned int rand_range(unsigned int min, unsigned int max);
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
-
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
-
-static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
-static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags);
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
-
-static void delay_first_send_for_late_joiners(void);
-
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, celix_properties_t *pubEP, pubsub_serializer_service_t *best_serializer, const char* serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
-	celix_status_t status = CELIX_SUCCESS;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	char* secure_topics = NULL;
-	bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
-
-	if (secure_topics){
-		array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
-
-		int i;
-		int secure_topics_size = arrayList_size(secure_topics_list);
-		for (i = 0; i < secure_topics_size; i++){
-			char* top = arrayList_get(secure_topics_list, i);
-			if (strcmp(pubEP->topic, top) == 0){
-				printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
-				pubEP->is_secure = true;
-			}
-			free(top);
-			top = NULL;
-		}
-
-		arrayList_destroy(secure_topics_list);
-	}
-
-	zcert_t* pub_cert = NULL;
-	if (pubEP->is_secure){
-		char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
-		if (keys_bundle_dir == NULL){
-			return CELIX_SERVICE_EXCEPTION;
-		}
-
-		const char* keys_file_path = NULL;
-		const char* keys_file_name = NULL;
-		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
-		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
-
-		char cert_path[MAX_CERT_PATH_LENGTH];
-
-		//certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
-		snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic);
-		free(keys_bundle_dir);
-		printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
-
-		pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path);
-		if (pub_cert == NULL){
-			printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
-			printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
-			pubEP->is_secure = false;
-		}
-	}
-#endif
-
-	zsock_t* socket = zsock_new (ZMQ_PUB);
-	if(socket==NULL){
-		#ifdef BUILD_WITH_ZMQ_SECURITY
-			if (pubEP->is_secure){
-				zcert_destroy(&pub_cert);
-			}
-		#endif
-
-        perror("Error for zmq_socket");
-		return CELIX_SERVICE_EXCEPTION;
-	}
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (pubEP->is_secure){
-		zcert_apply (pub_cert, socket); // apply certificate to socket
-		zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
-	}
-#endif
-
-	int rv = -1, retry=0;
-	char* ep = malloc(EP_ADDRESS_LEN);
-    char bindAddress[EP_ADDRESS_LEN];
-
-	while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){
-		/* Randomized part due to same bundle publishing on different topics */
-		unsigned int port = rand_range(basePort,maxPort);
-		memset(ep,0,EP_ADDRESS_LEN);
-        memset(bindAddress, 0, EP_ADDRESS_LEN);
-
-		snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
-        snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind address than endpoint address
-		rv = zsock_bind (socket, "%s", bindAddress);
-        if (rv == -1) {
-            perror("Error for zmq_bind");
-        }
-		retry++;
-	}
-
-	if(rv == -1){
-		free(ep);
-		return CELIX_SERVICE_EXCEPTION;
-	}
-
-	/* ZMQ stuffs are all fine at this point. Let's create and initialize the structure */
-
-	topic_publication_pt pub = calloc(1,sizeof(*pub));
-
-	arrayList_create(&(pub->pub_ep_list));
-	pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
-	celixThreadMutex_create(&(pub->tp_lock),NULL);
-
-	pub->endpoint = ep;
-	pub->zmq_socket = socket;
-	pub->serializer.svc = best_serializer;
-	pub->serializer.type = serType;
-
-	celixThreadMutex_create(&(pub->socket_lock),NULL);
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (pubEP->is_secure){
-		pub->zmq_cert = pub_cert;
-	}
-#endif
-
-	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
-
-	*out = pub;
-
-	return status;
-}
-
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&(pub->tp_lock));
-
-	free(pub->endpoint);
-	arrayList_destroy(pub->pub_ep_list);
-
-	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(iter)){
-		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
-		pubsub_destroyPublishBundleBoundService(bound);
-	}
-	hashMapIterator_destroy(iter);
-	hashMap_destroy(pub->boundServices,false,false);
-
-	pub->svcFactoryReg = NULL;
-	pub->serializer.svc = NULL;
-	pub->serializer.type = NULL;
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	zcert_destroy(&(pub->zmq_cert));
-#endif
-
-	celixThreadMutex_unlock(&(pub->tp_lock));
-
-	celixThreadMutex_destroy(&(pub->tp_lock));
-
-	celixThreadMutex_lock(&(pub->socket_lock));
-	zsock_destroy(&(pub->zmq_socket));
-	celixThreadMutex_unlock(&(pub->socket_lock));
-
-	celixThreadMutex_destroy(&(pub->socket_lock));
-
-	free(pub);
-
-	return status;
-}
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
-	celix_status_t status = CELIX_SUCCESS;
-
-	/* Let's register the new service */
-
-	celix_properties_t *pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
-
-	if(pubEP!=NULL){
-		service_factory_pt factory = calloc(1, sizeof(*factory));
-		factory->handle = pub;
-		factory->getService = pubsub_topicPublicationGetService;
-		factory->ungetService = pubsub_topicPublicationUngetService;
-
-		properties_pt props = properties_create();
-		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE));
-		properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
-
-		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
-
-		if(status != CELIX_SUCCESS){
-			properties_destroy(props);
-			printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s.\n",
-				   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-		}
-		else{
-			*svcFactory = factory;
-		}
-	}
-	else{
-		printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
-		status = CELIX_SERVICE_EXCEPTION;
-	}
-
-	return status;
-}
-
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
-	return serviceRegistration_unregister(pub->svcFactoryReg);
-}
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, celix_properties_t *ep) {
-
-	celixThreadMutex_lock(&(pub->tp_lock));
-	pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
-	pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type);
-    pubsubEndpoint_setField(ep, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY, pub->endpoint);
-	arrayList_add(pub->pub_ep_list,ep);
-	celixThreadMutex_unlock(&(pub->tp_lock));
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,celix_properties_t *ep){
-
-	celixThreadMutex_lock(&(pub->tp_lock));
-	for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) {
-	        celix_properties_t *e = arrayList_get(pub->pub_ep_list, i);
-	        if(pubsubEndpoint_equals(ep, e)) {
-	            arrayList_removeElement(pub->pub_ep_list,ep);
-	            break;
-	        }
-	}
-	celixThreadMutex_unlock(&(pub->tp_lock));
-
-	return CELIX_SUCCESS;
-}
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
-	array_list_pt list = NULL;
-	celixThreadMutex_lock(&(pub->tp_lock));
-	list = arrayList_clone(pub->pub_ep_list);
-	celixThreadMutex_unlock(&(pub->tp_lock));
-	return list;
-}
-
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
-	celix_status_t  status = CELIX_SUCCESS;
-
-	topic_publication_pt publish = (topic_publication_pt)handle;
-
-	celixThreadMutex_lock(&(publish->tp_lock));
-
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-	if(bound==NULL){
-		bound = pubsub_createPublishBundleBoundService(publish,bundle);
-		if(bound!=NULL){
-			hashMap_put(publish->boundServices,bundle,bound);
-		}
-	}
-	else{
-		bound->getCount++;
-	}
-
-	*service = &bound->service;
-
-	celixThreadMutex_unlock(&(publish->tp_lock));
-
-	return status;
-}
-
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service)  {
-
-	topic_publication_pt publish = (topic_publication_pt)handle;
-
-	celixThreadMutex_lock(&(publish->tp_lock));
-
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-	if(bound!=NULL){
-
-		bound->getCount--;
-		if(bound->getCount==0){
-			pubsub_destroyPublishBundleBoundService(bound);
-			hashMap_remove(publish->boundServices,bundle);
-		}
-
-	}
-	else{
-		long bundleId = -1;
-		bundle_getBundleId(bundle,&bundleId);
-		printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
-	}
-
-	/* service should be never used for unget, so let's set the pointer to NULL */
-	*service = NULL;
-
-	celixThreadMutex_unlock(&(publish->tp_lock));
-
-	return CELIX_SUCCESS;
-}
-
-static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
-
-	bool ret = true;
-
-	zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header));
-	if (headerMsg == NULL) ret=false;
-	zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize);
-	if (payloadMsg == NULL) ret=false;
-
-	delay_first_send_for_late_joiners();
-
-	if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
-
-	if(!last){
-		if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
-	}
-	else{
-		if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false;
-	}
-
-	if (!ret){
-		zframe_destroy(&headerMsg);
-		zframe_destroy(&payloadMsg);
-	}
-
-	free(msg->header);
-	free(msg->payload);
-	free(msg);
-
-	return ret;
-
-}
-
-static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
-
-	bool ret = true;
-
-	unsigned int i = 0;
-	unsigned int mp_num = arrayList_size(mp_msg_parts);
-	for(;i<mp_num;i++){
-		ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
-	}
-	arrayList_clear(mp_msg_parts);
-
-	return ret;
-
-}
-
-static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
-
-	return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
-
-}
-
-static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags){
-
-	int status = 0;
-
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
-
-	celixThreadMutex_lock(&(bound->parent->tp_lock));
-	celixThreadMutex_lock(&(bound->mp_lock));
-	if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
-		printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n");
-		celixThreadMutex_unlock(&(bound->mp_lock));
-		celixThreadMutex_unlock(&(bound->parent->tp_lock));
-		return -3;
-	}
-
-	pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId);
-
-	if (msgSer!= NULL) {
-		int major=0, minor=0;
-
-		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
-		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-		msg_hdr->type = msgTypeId;
-
-		if (msgSer->msgVersion != NULL){
-			version_getMajor(msgSer->msgVersion, &major);
-			version_getMinor(msgSer->msgVersion, &minor);
-			msg_hdr->major = major;
-			msg_hdr->minor = minor;
-		}
-
-		void *serializedOutput = NULL;
-		size_t serializedOutputLen = 0;
-		msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
-
-		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
-		msg->header = msg_hdr;
-		msg->payload = (char*)serializedOutput;
-		msg->payloadSize = serializedOutputLen;
-		bool snd = true;
-
-		switch(flags){
-		case PUBSUB_PUBLISHER_FIRST_MSG:
-			bound->mp_send_in_progress = true;
-			arrayList_add(bound->mp_parts,msg);
-			break;
-		case PUBSUB_PUBLISHER_PART_MSG:
-			if(!bound->mp_send_in_progress){
-				printf("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n");
-				status = -4;
-			}
-			else{
-				arrayList_add(bound->mp_parts,msg);
-			}
-			break;
-		case PUBSUB_PUBLISHER_LAST_MSG:
-			if(!bound->mp_send_in_progress){
-				printf("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n");
-				status = -4;
-			}
-			else{
-				arrayList_add(bound->mp_parts,msg);
-				snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
-				bound->mp_send_in_progress = false;
-			}
-			break;
-		case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:	//Normal send case
-			snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true);
-			break;
-		default:
-			printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
-			status = -4;
-			break;
-		}
-
-		if(status==-4){
-			free(msg);
-		}
-
-		if(!snd){
-			printf("PSA_ZMQ_TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
-		}
-
-	} else {
-        printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", msgTypeId);
-		status=-1;
-	}
-
-	celixThreadMutex_unlock(&(bound->mp_lock));
-	celixThreadMutex_unlock(&(bound->parent->tp_lock));
-
-	return status;
-
-}
-
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = utils_stringHash(msgType);
-	return 0;
-}
-
-
-static unsigned int rand_range(unsigned int min, unsigned int max){
-
-	double scaled = (double)(((double)random())/((double)RAND_MAX));
-	return (max-min+1)*scaled + min;
-
-}
-
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
-
-	//PRECOND lock on tp->lock
-
-	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
-
-	if (bound != NULL) {
-
-		bound->parent = tp;
-		bound->bundle = bundle;
-		bound->getCount = 1;
-		bound->mp_send_in_progress = false;
-		celixThreadMutex_create(&bound->mp_lock,NULL);
-
-		if(tp->serializer.svc != NULL){
-			tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes);
-		}
-
-		arrayList_create(&bound->mp_parts);
-
-		celix_properties_t *pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-		bound->topic=strdup(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-
-		bound->service.handle = bound;
-		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
-		bound->service.send = pubsub_topicPublicationSend;
-		bound->service.sendMultipart = pubsub_topicPublicationSendMultipart;
-
-	}
-
-	return bound;
-}
-
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
-
-	//PRECOND lock on tp->lock
-
-	celixThreadMutex_lock(&boundSvc->mp_lock);
-
-
-	if(boundSvc->parent->serializer.svc != NULL && boundSvc->msgTypes != NULL){
-		boundSvc->parent->serializer.svc->destroySerializerMap(boundSvc->parent->serializer.svc->handle, boundSvc->msgTypes);
-	}
-
-	if(boundSvc->mp_parts!=NULL){
-		arrayList_destroy(boundSvc->mp_parts);
-	}
-
-	if(boundSvc->topic!=NULL){
-		free(boundSvc->topic);
-	}
-
-	celixThreadMutex_unlock(&boundSvc->mp_lock);
-	celixThreadMutex_destroy(&boundSvc->mp_lock);
-
-	free(boundSvc);
-
-}
-
-static void delay_first_send_for_late_joiners(){
-
-	static bool firstSend = true;
-
-	if(firstSend){
-		printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n");
-		sleep(FIRST_SEND_DELAY);
-		firstSend = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/f1dfdbdf/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
deleted file mode 100644
index 0fab6d7..0000000
--- a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * topic_publication.h
- *
- *  \date       Sep 24, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef TOPIC_PUBLICATION_H_
-#define TOPIC_PUBLICATION_H_
-
-#include "pubsub/publisher.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_common.h"
-
-#include "pubsub_serializer.h"
-
-typedef struct topic_publication *topic_publication_pt;
-
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, celix_properties_t *endpoint, pubsub_serializer_service_t *best_serializer, const char *serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, celix_properties_t *ep);
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub, celix_properties_t *ep);
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
-
-#endif /* TOPIC_PUBLICATION_H_ */