You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 18:34:29 UTC
[17/19] celix git commit: CELIX-389: Refactors pubsub.
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
deleted file mode 100644
index bd3bb2f..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ /dev/null
@@ -1,710 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin_impl.c
- *
- * \date Sep 30, 2011
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-
-#ifndef ANDROID
-#include <ifaddrs.h>
-#endif
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netdb.h>
-
-#include "constants.h"
-#include "utils.h"
-#include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
-#include "bundle.h"
-#include "service_reference.h"
-#include "service_registration.h"
-#include "log_helper.h"
-#include "log_service.h"
-#include "celix_threads.h"
-#include "service_factory.h"
-
-#include "pubsub_admin_impl.h"
-#include "topic_subscription.h"
-#include "pubsub_publish_service_private.h"
-#include "pubsub_endpoint.h"
-#include "subscriber.h"
-
-static const char *DEFAULT_MC_IP = "224.100.1.1";
-static char *DEFAULT_MC_PREFIX = "224.100";
-
-static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip);
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score);
-
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
- celix_status_t status = CELIX_SUCCESS;
-
- *admin = calloc(1, sizeof(**admin));
-
- if (!*admin) {
- status = CELIX_ENOMEM;
- }
- else{
-
- char *mc_ip = NULL;
- char *if_ip = NULL;
- (*admin)->bundle_context= context;
- (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
- celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
- celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
- celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL);
- celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
-
- if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
- logHelper_start((*admin)->loghelper);
- }
- const char *mc_ip_prop = NULL;
- bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
- if(mc_ip_prop) {
- mc_ip = strdup(mc_ip_prop);
- }
-#ifndef ANDROID
- if (mc_ip == NULL) {
- const char *mc_prefix = NULL;
- const char *interface = NULL;
- int b0 = 224, b1 = 100, b2 = 1, b3 = 1;
- bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
- if(mc_prefix == NULL) {
- mc_prefix = DEFAULT_MC_PREFIX;
- }
-
- bundleContext_getProperty(context, PSA_ITF, &interface);
- if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not retrieve IP address for interface %s", interface);
- }
-
- printf("IP Detected : %s\n", if_ip);
- if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not parse IP address %s", if_ip);
- b2 = 1;
- b3 = 1;
- }
-
- asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
-
- int sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
- if(sendSocket == -1) {
- perror("pubsubAdmin_create:socket");
- status = CELIX_SERVICE_EXCEPTION;
- }
-
- if (status == CELIX_SUCCESS){
- char loop = 1;
- if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
- perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
- status = CELIX_SERVICE_EXCEPTION;
- }
-
- if (status == CELIX_SUCCESS){
- struct in_addr multicast_interface;
- inet_aton(if_ip, &multicast_interface);
- if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
- perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
- status = CELIX_SERVICE_EXCEPTION;
- }
-
- (*admin)->sendSocket = sendSocket;
- }
-
- }
-
- }
-#endif
- if (if_ip != NULL) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s as interface for multicast communication", if_ip);
- (*admin)->ifIpAddress = if_ip;
- } else {
- (*admin)->ifIpAddress = strdup("127.0.0.1");
- }
-
- if (mc_ip != NULL) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s for service annunciation", mc_ip);
- (*admin)->mcIpAddress = mc_ip;
- }
- else {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: No IP address for service annunciation set. Using %s", DEFAULT_MC_IP);
- (*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
- }
-
- if (status != CELIX_SUCCESS){
- pubsubAdmin_destroy(*admin);
- }
-
- }
-
- return status;
-}
-
-
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
-{
- celix_status_t status = CELIX_SUCCESS;
-
- free(admin->mcIpAddress);
- free(admin->ifIpAddress);
-
- celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
- hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
- while(hashMapIterator_hasNext(iter)){
- hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
- free((char*)hashMapEntry_getKey(entry));
- arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
- }
- hashMapIterator_destroy(iter);
- hashMap_destroy(admin->pendingSubscriptions,false,false);
- celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
- celixThreadMutex_lock(&admin->subscriptionsLock);
- hashMap_destroy(admin->subscriptions,false,false);
- celixThreadMutex_unlock(&admin->subscriptionsLock);
-
- celixThreadMutex_lock(&admin->localPublicationsLock);
- hashMap_destroy(admin->localPublications,true,false);
- celixThreadMutex_unlock(&admin->localPublicationsLock);
-
- celixThreadMutex_lock(&admin->externalPublicationsLock);
- iter = hashMapIterator_create(admin->externalPublications);
- while(hashMapIterator_hasNext(iter)){
- hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
- free((char*)hashMapEntry_getKey(entry));
- arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
- }
- hashMapIterator_destroy(iter);
- hashMap_destroy(admin->externalPublications,false,false);
- celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
- celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
- celixThreadMutex_destroy(&admin->subscriptionsLock);
- celixThreadMutex_destroy(&admin->localPublicationsLock);
- celixThreadMutex_destroy(&admin->externalPublicationsLock);
-
- logHelper_stop(admin->loghelper);
-
- logHelper_destroy(&admin->loghelper);
-
- free(admin);
-
- return status;
-}
-
-celix_status_t pubsubAdmin_stop(pubsub_admin_pt admin) {
- celix_status_t status = CELIX_SUCCESS;
-
- return status;
-}
-
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
- celix_status_t status = CELIX_SUCCESS;
-
- celixThreadMutex_lock(&admin->subscriptionsLock);
-
- topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-
- if(any_sub==NULL){
-
- int i;
-
- status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC,&any_sub);
-
- /* Connect all internal publishers */
- celixThreadMutex_lock(&admin->localPublicationsLock);
- hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications);
- while(hashMapIterator_hasNext(lp_iter)){
- service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter);
- topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
- array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
-
- if(topic_publishers!=NULL){
- for(i=0;i<arrayList_size(topic_publishers);i++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
- if(pubEP->endpoint !=NULL){
- status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
- }
- }
- }
- }
- hashMapIterator_destroy(lp_iter);
- celixThreadMutex_unlock(&admin->localPublicationsLock);
-
- /* Connect also all external publishers */
- celixThreadMutex_lock(&admin->externalPublicationsLock);
- hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications);
- while(hashMapIterator_hasNext(extp_iter)){
- array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
- if(ext_pub_list!=NULL){
- for(i=0;i<arrayList_size(ext_pub_list);i++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- if(pubEP->endpoint !=NULL){
- status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
- }
- }
- }
- }
- hashMapIterator_destroy(extp_iter);
- celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
-
- pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
-
- status += pubsub_topicSubscriptionStart(any_sub);
-
- hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
-
- }
-
- celixThreadMutex_unlock(&admin->subscriptionsLock);
-
- return status;
-}
-
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
- celix_status_t status = CELIX_SUCCESS;
-
- printf("PSA: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope,subEP->topic);
-
- if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
- return pubsubAdmin_addAnySubscription(admin,subEP);
- }
-
- /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
- celixThreadMutex_lock(&admin->localPublicationsLock);
- celixThreadMutex_lock(&admin->externalPublicationsLock);
- char* scope_topic = createScopeTopicKey(subEP->scope,subEP->topic);
-
- service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
- array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
-
- if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic
- celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
- pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
- celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
- }
- else{
- int i;
-
- topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
-
- if(subscription == NULL) {
-
- status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, subEP->scope, subEP->topic,&subscription);
-
- /* Try to connect internal publishers */
- if(factory!=NULL){
- topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
- array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
-
- if(topic_publishers!=NULL){
- for(i=0;i<arrayList_size(topic_publishers);i++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
- if(pubEP->endpoint !=NULL){
- status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
- }
- }
- }
-
- }
-
- /* Look also for external publishers */
- if(ext_pub_list!=NULL){
- for(i=0;i<arrayList_size(ext_pub_list);i++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- if(pubEP->endpoint !=NULL){
- status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
- }
- }
- }
-
- pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
-
- status += pubsub_topicSubscriptionStart(subscription);
-
- if(status==CELIX_SUCCESS){
- celixThreadMutex_lock(&admin->subscriptionsLock);
- hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
- celixThreadMutex_unlock(&admin->subscriptionsLock);
- }
- }
- pubsub_topicIncreaseNrSubscribers(subscription);
-
- }
- free(scope_topic);
- celixThreadMutex_unlock(&admin->externalPublicationsLock);
- celixThreadMutex_unlock(&admin->localPublicationsLock);
-
- return status;
-
-}
-
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
- celix_status_t status = CELIX_SUCCESS;
-
- printf("PSA: Removing subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic);
-
- celixThreadMutex_lock(&admin->subscriptionsLock);
- char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
- topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
- free(scope_topic);
- if(sub!=NULL){
- pubsub_topicDecreaseNrSubscribers(sub);
- if(pubsub_topicGetNrSubscribers(sub) == 0) {
- status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
- }
- }
- else{
- status = CELIX_ILLEGAL_STATE;
- }
-
- celixThreadMutex_unlock(&admin->subscriptionsLock);
-
- return status;
-
-}
-
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
- celix_status_t status = CELIX_SUCCESS;
-
- printf("PSA: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic);
-
- const char* fwUUID = NULL;
-
- bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- if(fwUUID==NULL){
- printf("PSA: Cannot retrieve fwUUID.\n");
- return CELIX_INVALID_BUNDLE_CONTEXT;
- }
- char* scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
-
- if((strcmp(pubEP->frameworkUUID,fwUUID)==0) && (pubEP->endpoint==NULL)){
-
- celixThreadMutex_lock(&admin->localPublicationsLock);
-
-
- service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
-
- if (factory == NULL) {
- topic_publication_pt pub = NULL;
- status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP,admin->mcIpAddress,&pub);
- if(status == CELIX_SUCCESS){
- status = pubsub_topicPublicationStart(admin->bundle_context,pub,&factory);
- if(status==CELIX_SUCCESS && factory !=NULL){
- hashMap_put(admin->localPublications,strdup(scope_topic),factory);
- }
- }
- else{
- printf("PSA: Cannot create a topicPublication for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
- }
- }
- else{
- //just add the new EP to the list
- topic_publication_pt pub = (topic_publication_pt)factory->handle;
- pubsub_topicPublicationAddPublisherEP(pub,pubEP);
- }
-
-
- celixThreadMutex_unlock(&admin->localPublicationsLock);
- }
- else{
- celixThreadMutex_lock(&admin->externalPublicationsLock);
- array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
- if(ext_pub_list==NULL){
- arrayList_create(&ext_pub_list);
- hashMap_put(admin->externalPublications,strdup(scope_topic),ext_pub_list);
- }
-
- arrayList_add(ext_pub_list,pubEP);
-
- celixThreadMutex_unlock(&admin->externalPublicationsLock);
- }
-
- /* Connect the new publisher to the subscription for his topic, if there is any */
- celixThreadMutex_lock(&admin->subscriptionsLock);
-
- topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
- if(sub!=NULL && pubEP->endpoint!=NULL){
- pubsub_topicSubscriptionConnectPublisher(sub,pubEP->endpoint);
- }
-
- /* And check also for ANY subscription */
- topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
- if(any_sub!=NULL && pubEP->endpoint!=NULL){
- pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
- }
-
- celixThreadMutex_unlock(&admin->subscriptionsLock);
-
- /* Re-evaluate the pending subscriptions */
- celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-
- hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions,scope_topic);
- if(pendingSub!=NULL){ //There were pending subscription for the just published topic. Let's connect them.
- char* key = (char*)hashMapEntry_getKey(pendingSub);
- array_list_pt pendingSubList = (array_list_pt)hashMapEntry_getValue(pendingSub);
- int i;
- for(i=0;i<arrayList_size(pendingSubList);i++){
- pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(pendingSubList,i);
- pubsubAdmin_addSubscription(admin,subEP);
- }
- hashMap_remove(admin->pendingSubscriptions,key);
- arrayList_clear(pendingSubList);
- arrayList_destroy(pendingSubList);
- free(key);
- }
- free(scope_topic);
-
- celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
- return status;
-
-}
-
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
- celix_status_t status = CELIX_SUCCESS;
-
- printf("PSA: Removing publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic);
-
- const char* fwUUID = NULL;
-
- bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- if(fwUUID==NULL){
- printf("PSA: Cannot retrieve fwUUID.\n");
- return CELIX_INVALID_BUNDLE_CONTEXT;
- }
- char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
-
- if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
-
- celixThreadMutex_lock(&admin->localPublicationsLock);
-
- service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
- if(factory!=NULL){
- topic_publication_pt pub = (topic_publication_pt)factory->handle;
- pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
- }
- else{
- status = CELIX_ILLEGAL_STATE;
- }
-
- celixThreadMutex_unlock(&admin->localPublicationsLock);
- }
- else{
-
- celixThreadMutex_lock(&admin->externalPublicationsLock);
- array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
- if(ext_pub_list!=NULL){
- int i;
- bool found = false;
- for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
- pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- found = pubsubEndpoint_equals(pubEP,p);
- if(found){
- arrayList_remove(ext_pub_list,i);
- }
- }
- if(arrayList_size(ext_pub_list)==0){
- hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
- char* topic = (char*)hashMapEntry_getKey(entry);
- array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
- hashMap_remove(admin->externalPublications,scope_topic);
- arrayList_destroy(list);
- free(topic);
- }
- }
-
- celixThreadMutex_unlock(&admin->externalPublicationsLock);
- }
-
- /* Check if this publisher was connected to one of our subscribers*/
- celixThreadMutex_lock(&admin->subscriptionsLock);
-
- topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
- if(sub!=NULL && pubEP->endpoint!=NULL){
- pubsub_topicSubscriptionDisconnectPublisher(sub,pubEP->endpoint);
- }
-
- /* And check also for ANY subscription */
- topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
- if(any_sub!=NULL && pubEP->endpoint!=NULL){
- pubsub_topicSubscriptionDisconnectPublisher(any_sub,pubEP->endpoint);
- }
- free(scope_topic);
- celixThreadMutex_unlock(&admin->subscriptionsLock);
-
- return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scope, char* topic){
- celix_status_t status = CELIX_SUCCESS;
-
- printf("PSA: Closing all publications for scope=%s,topic=%s\n", scope, topic);
-
- celixThreadMutex_lock(&admin->localPublicationsLock);
- char* scope_topic =createScopeTopicKey(scope, topic);
- hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
- if(pubsvc_entry!=NULL){
- char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
- service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
- topic_publication_pt pub = (topic_publication_pt)factory->handle;
- status += pubsub_topicPublicationStop(pub);
- status += pubsub_topicPublicationDestroy(pub);
-
- hashMap_remove(admin->localPublications,scope_topic);
- free(key);
- free(factory);
- }
- free(scope_topic);
- celixThreadMutex_unlock(&admin->localPublicationsLock);
-
- return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *scope, char* topic){
- celix_status_t status = CELIX_SUCCESS;
-
- printf("PSA: Closing all subscriptions\n");
-
- celixThreadMutex_lock(&admin->subscriptionsLock);
- char* scope_topic =createScopeTopicKey(scope, topic);
- hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
- if(sub_entry!=NULL){
- char* topic = (char*)hashMapEntry_getKey(sub_entry);
-
- topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry);
- status += pubsub_topicSubscriptionStop(ts);
- status += pubsub_topicSubscriptionDestroy(ts);
-
- hashMap_remove(admin->subscriptions,topic);
- free(topic);
-
- }
- free(scope_topic);
- celixThreadMutex_unlock(&admin->subscriptionsLock);
-
- return status;
-
-}
-
-celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score){
- celix_status_t status = CELIX_SUCCESS;
- status = pubsubAdmin_match(admin, pubEP, score);
- return status;
-}
-
-celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score){
- celix_status_t status = CELIX_SUCCESS;
- status = pubsubAdmin_match(admin, subEP, score);
- return status;
-}
-
-static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score){
- celix_status_t status = CELIX_SUCCESS;
-
- char topic_psa_prop[1024];
- snprintf(topic_psa_prop, 1024, "%s.psa", psEP->topic);
-
- const char* psa_to_use = NULL;
- bundleContext_getPropertyWithDefault(admin->bundle_context, topic_psa_prop, PSA_DEFAULT, &psa_to_use);
-
- *score = 0;
- if (strcmp(psa_to_use, "udp") == 0){
- *score += 100;
- }else{
- *score += 1;
- }
-
- return status;
-}
-
-#ifndef ANDROID
-static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) {
- celix_status_t status = CELIX_BUNDLE_EXCEPTION;
-
- struct ifaddrs *ifaddr, *ifa;
- char host[NI_MAXHOST];
-
- if (getifaddrs(&ifaddr) != -1)
- {
- for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
- {
- if (ifa->ifa_addr == NULL)
- continue;
-
- if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
- if (interface == NULL) {
- *ip = strdup(host);
- status = CELIX_SUCCESS;
- }
- else if (strcmp(ifa->ifa_name, interface) == 0) {
- *ip = strdup(host);
- status = CELIX_SUCCESS;
- }
- }
- }
-
- freeifaddrs(ifaddr);
- }
- if(status == CELIX_SUCCESS) {
-
- }
- return status;
-}
-#endif
-
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
- celix_status_t status = CELIX_SUCCESS;
-
- char* scope_topic =createScopeTopicKey(subEP->scope, subEP->topic);
- array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
- if(pendingListPerTopic==NULL){
- arrayList_create(&pendingListPerTopic);
- hashMap_put(admin->pendingSubscriptions,scope_topic,pendingListPerTopic);
- } else {
- free(scope_topic);
- }
- arrayList_add(pendingListPerTopic,subEP);
-
- return status;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
deleted file mode 100644
index 9a9fa55..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * topic_publication.c
- *
- * \date Sep 24, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <errno.h>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
-#include "array_list.h"
-#include "celixbool.h"
-#include "service_registration.h"
-#include "dyn_msg_utils.h"
-#include "utils.h"
-#include "service_factory.h"
-#include "version.h"
-
-#include "pubsub_publish_service_private.h"
-#include "pubsub_common.h"
-#include "publisher.h"
-#include "large_udp.h"
-
-#include "pubsub_serializer.h"
-
-#define EP_ADDRESS_LEN 32
-
-#define FIRST_SEND_DELAY 2
-
-struct topic_publication {
- int sendSocket;
- char* endpoint;
- service_registration_pt svcFactoryReg;
- array_list_pt pub_ep_list; //List<pubsub_endpoint>
- hash_map_pt boundServices; //<bundle_pt,bound_service>
- celix_thread_mutex_t tp_lock;
- struct sockaddr_in destAddr;
-};
-
-typedef struct publish_bundle_bound_service {
- topic_publication_pt parent;
- pubsub_publisher_pt service;
- bundle_pt bundle;
- char *scope;
- char *topic;
- hash_map_pt msgTypes;
- unsigned short getCount;
- celix_thread_mutex_t mp_lock;
- bool mp_send_in_progress;
- array_list_pt mp_parts;
- largeUdp_pt largeUdpHandle;
-}* publish_bundle_bound_service_pt;
-
-typedef struct pubsub_msg{
- pubsub_msg_header_pt header;
- char* payload;
- int payloadSize;
-}* pubsub_msg_pt;
-
-static unsigned int rand_range(unsigned int min, unsigned int max);
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
-
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
-
-static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, void *msg);
-
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
-
-
-static void delay_first_send_for_late_joiners(void);
-
-
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP,char* bindIP, topic_publication_pt *out){
-
- char* ep = malloc(EP_ADDRESS_LEN);
- memset(ep,0,EP_ADDRESS_LEN);
- unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
- snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
-
-
- topic_publication_pt pub = calloc(1,sizeof(*pub));
-
- arrayList_create(&(pub->pub_ep_list));
- pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
- celixThreadMutex_create(&(pub->tp_lock),NULL);
-
- pub->endpoint = ep;
- pub->sendSocket = sendSocket;
- pub->destAddr.sin_family = AF_INET;
- pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
- pub->destAddr.sin_port = htons(port);
-
- pubsub_topicPublicationAddPublisherEP(pub,pubEP);
-
- *out = pub;
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
- celix_status_t status = CELIX_SUCCESS;
-
- celixThreadMutex_lock(&(pub->tp_lock));
-
- free(pub->endpoint);
- arrayList_destroy(pub->pub_ep_list);
-
- hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
- while(hashMapIterator_hasNext(iter)){
- publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
- pubsub_destroyPublishBundleBoundService(bound);
- }
- hashMapIterator_destroy(iter);
- hashMap_destroy(pub->boundServices,false,false);
-
- pub->svcFactoryReg = NULL;
- status = close(pub->sendSocket);
-
- celixThreadMutex_unlock(&(pub->tp_lock));
-
- celixThreadMutex_destroy(&(pub->tp_lock));
-
- free(pub);
-
- return status;
-}
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
- celix_status_t status = CELIX_SUCCESS;
-
- /* Let's register the new service */
- //celixThreadMutex_lock(&(pub->tp_lock));
-
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
-
- if(pubEP!=NULL){
- service_factory_pt factory = calloc(1, sizeof(*factory));
- factory->handle = pub;
- factory->getService = pubsub_topicPublicationGetService;
- factory->ungetService = pubsub_topicPublicationUngetService;
-
- properties_pt props = properties_create();
- properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
- properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
-
- status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
-
- if(status != CELIX_SUCCESS){
- properties_destroy(props);
- printf("PSA: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID);
- }
- else{
- *svcFactory = factory;
- }
- }
- else{
- printf("PSA: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
- status = CELIX_SERVICE_EXCEPTION;
- }
-
- //celixThreadMutex_unlock(&(pub->tp_lock));
-
- return status;
-}
-
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
- celix_status_t status = CELIX_SUCCESS;
-
- //celixThreadMutex_lock(&(pub->tp_lock));
-
- status = serviceRegistration_unregister(pub->svcFactoryReg);
-
- //celixThreadMutex_unlock(&(pub->tp_lock));
-
- return status;
-}
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
-
- celixThreadMutex_lock(&(pub->tp_lock));
- ep->endpoint = strdup(pub->endpoint);
- arrayList_add(pub->pub_ep_list,ep);
- celixThreadMutex_unlock(&(pub->tp_lock));
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
-
- celixThreadMutex_lock(&(pub->tp_lock));
- arrayList_removeElement(pub->pub_ep_list,ep);
- celixThreadMutex_unlock(&(pub->tp_lock));
-
- return CELIX_SUCCESS;
-}
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
- return pub->pub_ep_list;
-}
-
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
- celix_status_t status = CELIX_SUCCESS;
-
- topic_publication_pt publish = (topic_publication_pt)handle;
-
- celixThreadMutex_lock(&(publish->tp_lock));
-
- publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
- if(bound==NULL){
- bound = pubsub_createPublishBundleBoundService(publish,bundle);
- if(bound!=NULL){
- hashMap_put(publish->boundServices,bundle,bound);
- }
- }
- else{
- bound->getCount++;
- }
-
- *service = bound->service;
-
- celixThreadMutex_unlock(&(publish->tp_lock));
-
- return status;
-}
-
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
-
- topic_publication_pt publish = (topic_publication_pt)handle;
-
- celixThreadMutex_lock(&(publish->tp_lock));
-
- publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
- if(bound!=NULL){
-
- bound->getCount--;
- if(bound->getCount==0){
- pubsub_destroyPublishBundleBoundService(bound);
- hashMap_remove(publish->boundServices,bundle);
- }
-
- }
- else{
- long bundleId = -1;
- bundle_getBundleId(bundle,&bundleId);
- printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
- }
-
- /* service should be never used for unget, so let's set the pointer to NULL */
- *service = NULL;
-
- celixThreadMutex_unlock(&(publish->tp_lock));
-
- return CELIX_SUCCESS;
-}
-
-static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt msg, bool last, pubsub_release_callback_t *releaseCallback){
- const int iovec_len = 3; // header + size + payload
- bool ret = true;
- pubsub_udp_msg_pt udpMsg;
-
- int compiledMsgSize = sizeof(*udpMsg) + msg->payloadSize;
-
- struct iovec msg_iovec[iovec_len];
- msg_iovec[0].iov_base = msg->header;
- msg_iovec[0].iov_len = sizeof(*msg->header);
- msg_iovec[1].iov_base = &msg->payloadSize;
- msg_iovec[1].iov_len = sizeof(msg->payloadSize);
- msg_iovec[2].iov_base = msg->payload;
- msg_iovec[2].iov_len = msg->payloadSize;
-
- delay_first_send_for_late_joiners();
-
- if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, msg_iovec, iovec_len, 0, &bound->parent->destAddr, sizeof(bound->parent->destAddr)) == -1) {
- fprintf(stderr, "Socket: %d, size: %i",bound->parent->sendSocket, compiledMsgSize);
- perror("send_pubsub_msg:sendSocket");
- ret = false;
- }
-
- //free(udpMsg);
- if(releaseCallback) {
- releaseCallback->release(msg->payload, bound);
- }
- return ret;
-
-}
-
-
-static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, void *msg) {
- int status = 0;
- publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
-
- celixThreadMutex_lock(&(bound->mp_lock));
-
- pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
- int major=0, minor=0;
-
- if (msgType != NULL) {
-
- version_pt msgVersion = pubsubSerializer_getVersion(msgType);
-
- pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
-
- strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
- msg_hdr->type = msgTypeId;
- if (msgVersion != NULL){
- version_getMajor(msgVersion, &major);
- version_getMinor(msgVersion, &minor);
- msg_hdr->major = major;
- msg_hdr->minor = minor;
- }
-
- void* serializedOutput = NULL;
- int serializedOutputLen = 0;
- pubsubSerializer_serialize(msgType, msg, &serializedOutput, &serializedOutputLen);
-
- pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
- msg->header = msg_hdr;
- msg->payload = (char *) serializedOutput;
- msg->payloadSize = serializedOutputLen;
-
- celixThreadMutex_lock(&(bound->parent->tp_lock));
- if(send_pubsub_msg(bound, msg,true, NULL) == false) {
- status = -1;
- }
- free(msg_hdr);
- free(msg);
- free(serializedOutput);
- celixThreadMutex_unlock(&(bound->parent->tp_lock));
-
- } else {
- printf("TP: Message %u not supported.",msgTypeId);
- status=-1;
- }
-
- celixThreadMutex_unlock(&(bound->mp_lock));
-
- return status;
-}
-
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
- *msgTypeId = pubsubSerializer_hashCode(msgType);
- return 0;
-}
-
-
-static unsigned int rand_range(unsigned int min, unsigned int max){
-
- double scaled = (double)(((double)rand())/((double)RAND_MAX));
- return (max-min+1)*scaled + min;
-
-}
-
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
-
- publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
-
- if (bound != NULL) {
- bound->service = calloc(1, sizeof(*bound->service));
- }
-
- if (bound != NULL && bound->service != NULL) {
-
- bound->parent = tp;
- bound->bundle = bundle;
- bound->getCount = 1;
- bound->mp_send_in_progress = false;
- celixThreadMutex_create(&bound->mp_lock,NULL);
- bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type>
- arrayList_create(&bound->mp_parts);
-
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
- bound->scope=strdup(pubEP->scope);
- bound->topic=strdup(pubEP->topic);
- bound->largeUdpHandle = largeUdp_create(1);
- bound->service->handle = bound;
- bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
- bound->service->send = pubsub_topicPublicationSend;
- bound->service->sendMultipart = NULL; //Multipart not supported (jet) for UDP
-
- pubsubSerializer_fillMsgTypesMap(bound->msgTypes,bound->bundle);
-
- }
- else
- {
- if (bound != NULL) {
- free(bound->service);
- }
- free(bound);
- return NULL;
- }
-
- return bound;
-}
-
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
-
- celixThreadMutex_lock(&boundSvc->mp_lock);
-
- if(boundSvc->service != NULL){
- free(boundSvc->service);
- }
-
- if(boundSvc->msgTypes != NULL){
- pubsubSerializer_emptyMsgTypesMap(boundSvc->msgTypes);
- hashMap_destroy(boundSvc->msgTypes,false,false);
- }
-
- if(boundSvc->mp_parts!=NULL){
- arrayList_destroy(boundSvc->mp_parts);
- }
-
- if(boundSvc->scope!=NULL){
- free(boundSvc->scope);
- }
-
- if(boundSvc->topic!=NULL){
- free(boundSvc->topic);
- }
-
- largeUdp_destroy(boundSvc->largeUdpHandle);
-
- celixThreadMutex_unlock(&boundSvc->mp_lock);
- celixThreadMutex_destroy(&boundSvc->mp_lock);
-
- free(boundSvc);
-
-}
-
-static void delay_first_send_for_late_joiners(){
-
- static bool firstSend = true;
-
- if(firstSend){
- printf("TP: Delaying first send for late joiners...\n");
- sleep(FIRST_SEND_DELAY);
- firstSend = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
deleted file mode 100644
index 0caf084..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ /dev/null
@@ -1,563 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * topic_subscription.c
- *
- * \date Oct 2, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <string.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <signal.h>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
-#if defined(__APPLE__) && defined(__MACH__)
- #include <sys/event.h>
- #include <sys/time.h>
-#else
- #include <sys/epoll.h>
-#endif
-
-#include "utils.h"
-#include "celix_errno.h"
-#include "constants.h"
-#include "version.h"
-
-#include "topic_subscription.h"
-#include "subscriber.h"
-#include "publisher.h"
-#include "dyn_msg_utils.h"
-#include "pubsub_publish_service_private.h"
-#include "large_udp.h"
-
-#include "pubsub_serializer.h"
-
-#define MAX_EPOLL_EVENTS 10
-#define RECV_THREAD_TIMEOUT 5
-#define UDP_BUFFER_SIZE 65535
-#define MAX_UDP_SESSIONS 16
-
-struct topic_subscription{
-
- char* ifIpAddress;
- service_tracker_pt tracker;
- array_list_pt sub_ep_list;
- celix_thread_t recv_thread;
- bool running;
- celix_thread_mutex_t ts_lock;
- bundle_context_pt context;
- int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
- hash_map_pt servicesMap; // key = service, value = msg types map
- hash_map_pt socketMap; // key = URL, value = listen-socket
- unsigned int nrSubscribers;
- largeUdp_pt largeUdpHandle;
-
-};
-
-typedef struct mp_handle{
- hash_map_pt svc_msg_db;
- hash_map_pt rcv_msg_map;
-}* mp_handle_pt;
-
-typedef struct msg_map_entry{
- bool retain;
- void* msgInst;
-}* msg_map_entry_pt;
-
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service);
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service);
-static void* udp_recv_thread_func(void* arg);
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
-static void sigusr1_sighandler(int signo);
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
-
-
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, char* scope, char* topic,topic_subscription_pt* out){
- celix_status_t status = CELIX_SUCCESS;
-
- topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
- ts->context = bundle_context;
- ts->ifIpAddress = strdup(ifIp);
-#if defined(__APPLE__) && defined(__MACH__)
- //TODO: Use kqueue for OSX
-#else
- ts->topicEpollFd = epoll_create1(0);
-#endif
- if(ts->topicEpollFd == -1) {
- status += CELIX_SERVICE_EXCEPTION;
- }
-
- ts->running = false;
- ts->nrSubscribers = 0;
-
- celixThreadMutex_create(&ts->ts_lock,NULL);
- arrayList_create(&ts->sub_ep_list);
- ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
- ts->socketMap = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
- ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
-
- char filter[128];
- memset(filter,0,128);
- if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, scope, strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) {
- // default scope, means that subscriber has not defined a scope property
- snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
- (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
- PUBSUB_SUBSCRIBER_TOPIC,topic);
-
- } else {
- snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
- (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
- PUBSUB_SUBSCRIBER_TOPIC,topic,
- PUBSUB_SUBSCRIBER_SCOPE,scope);
- }
-
- service_tracker_customizer_pt customizer = NULL;
- status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
- status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker);
-
- struct sigaction actions;
- memset(&actions, 0, sizeof(actions));
- sigemptyset(&actions.sa_mask);
- actions.sa_flags = 0;
- actions.sa_handler = sigusr1_sighandler;
-
- sigaction(SIGUSR1,&actions,NULL);
-
- *out=ts;
-
- return status;
-}
-
-celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
- celix_status_t status = CELIX_SUCCESS;
-
- celixThreadMutex_lock(&ts->ts_lock);
- ts->running = false;
- free(ts->ifIpAddress);
- serviceTracker_destroy(ts->tracker);
- arrayList_clear(ts->sub_ep_list);
- arrayList_destroy(ts->sub_ep_list);
- hashMap_destroy(ts->servicesMap,false,false);
-
- hashMap_destroy(ts->socketMap,false,false);
- largeUdp_destroy(ts->largeUdpHandle);
-#if defined(__APPLE__) && defined(__MACH__)
- //TODO: Use kqueue for OSX
-#else
- close(ts->topicEpollFd);
-#endif
-
- celixThreadMutex_unlock(&ts->ts_lock);
-
- celixThreadMutex_destroy(&ts->ts_lock);
-
- free(ts);
-
- return status;
-}
-
-celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
- celix_status_t status = CELIX_SUCCESS;
-
- status = serviceTracker_open(ts->tracker);
-
- ts->running = true;
-
- if(status==CELIX_SUCCESS){
- status=celixThread_create(&ts->recv_thread,NULL,udp_recv_thread_func,ts);
- }
-
- return status;
-}
-
-celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
- celix_status_t status = CELIX_SUCCESS;
-
- ts->running = false;
-
- pthread_kill(ts->recv_thread.thread,SIGUSR1);
-
- celixThread_join(ts->recv_thread,NULL);
-
- status = serviceTracker_close(ts->tracker);
-
- hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
- while(hashMapIterator_hasNext(it)) {
- hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
- char *url = hashMapEntry_getKey(entry);
- pubsub_topicSubscriptionDisconnectPublisher(ts, url);
- }
- hashMapIterator_destroy(it);
-
-
- return status;
-}
-
-celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL) {
-
- printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
-
- celix_status_t status = CELIX_SUCCESS;
-
- if (!hashMap_containsKey(ts->socketMap, pubURL)){
-
- celixThreadMutex_lock(&ts->ts_lock);
-
- int *recvSocket = calloc(sizeof(int), 1);
- *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
- if (*recvSocket < 0) {
- perror("pubsub_topicSubscriptionCreate:socket");
- status = CELIX_SERVICE_EXCEPTION;
- }
-
- if (status == CELIX_SUCCESS){
- int reuse = 1;
- if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
- perror("setsockopt() SO_REUSEADDR");
- status = CELIX_SERVICE_EXCEPTION;
- }
- }
-
- if (status == CELIX_SUCCESS){
- // TODO Check if there is a better way to parse the URL to IP/Portnr
- //replace ':' by spaces
- char *url = strdup(pubURL);
- char *pt = url;
- while((pt=strchr(pt, ':')) != NULL) {
- *pt = ' ';
- }
- char mcIp[100];
- unsigned short mcPort;
- sscanf(url, "udp //%s %hu", mcIp, &mcPort);
- free (url);
-
- printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
-
- struct ip_mreq mc_addr;
- mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
- mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
- printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
- if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
- perror("setsockopt() IP_ADD_MEMBERSHIP");
- status = CELIX_SERVICE_EXCEPTION;
- }
-
- if (status == CELIX_SUCCESS){
- struct sockaddr_in mcListenAddr;
- mcListenAddr.sin_family = AF_INET;
- mcListenAddr.sin_addr.s_addr = INADDR_ANY;
- mcListenAddr.sin_port = htons(mcPort);
- if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
- perror("bind()");
- status = CELIX_SERVICE_EXCEPTION;
- }
- }
-
- if (status == CELIX_SUCCESS){
- #if defined(__APPLE__) && defined(__MACH__)
- //TODO: Use kqueue for OSX
- #else
- struct epoll_event ev;
- memset(&ev, 0, sizeof(ev));
- ev.events = EPOLLIN;
- ev.data.fd = *recvSocket;
- if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
- perror("epoll_ctl() EPOLL_CTL_ADD");
- status = CELIX_SERVICE_EXCEPTION;
- }
- #endif
- }
-
- }
-
- if (status == CELIX_SUCCESS){
- hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
- }else{
- free(recvSocket);
- }
-
- celixThreadMutex_unlock(&ts->ts_lock);
-
- }
-
- return status;
-}
-
-celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){
- printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL);
- celix_status_t status = CELIX_SUCCESS;
-
- if (hashMap_containsKey(ts->socketMap, pubURL)){
-
-#if defined(__APPLE__) && defined(__MACH__)
- //TODO: Use kqueue for OSX
-#else
- struct epoll_event ev;
- memset(&ev, 0, sizeof(ev));
-
- celixThreadMutex_lock(&ts->ts_lock);
-
- int *s = hashMap_remove(ts->socketMap, pubURL);
- if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
- printf("in if error()\n");
- perror("epoll_ctl() EPOLL_CTL_DEL");
- status = CELIX_SERVICE_EXCEPTION;
- }
- free(s);
-
- celixThreadMutex_unlock(&ts->ts_lock);
-#endif
-
- }
-
- return status;
-}
-
-celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){
- celix_status_t status = CELIX_SUCCESS;
-
- celixThreadMutex_lock(&ts->ts_lock);
-
- arrayList_add(ts->sub_ep_list,subEP);
-
- celixThreadMutex_unlock(&ts->ts_lock);
-
- return status;
-
-}
-
-celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
- celix_status_t status = CELIX_SUCCESS;
-
- celixThreadMutex_lock(&ts->ts_lock);
-
- ts->nrSubscribers++;
-
- celixThreadMutex_unlock(&ts->ts_lock);
-
- return status;
-}
-
-celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){
- celix_status_t status = CELIX_SUCCESS;
-
- celixThreadMutex_lock(&ts->ts_lock);
-
- arrayList_removeElement(ts->sub_ep_list,subEP);
-
- celixThreadMutex_unlock(&ts->ts_lock);
-
- return status;
-
-}
-
-celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
- celix_status_t status = CELIX_SUCCESS;
-
- celixThreadMutex_lock(&ts->ts_lock);
-
- ts->nrSubscribers--;
-
- celixThreadMutex_unlock(&ts->ts_lock);
-
- return status;
-}
-
-unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
- return ts->nrSubscribers;
-}
-
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
- celix_status_t status = CELIX_SUCCESS;
- topic_subscription_pt ts = handle;
-
- celixThreadMutex_lock(&ts->ts_lock);
- if (!hashMap_containsKey(ts->servicesMap, service)) {
- hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
- bundle_pt bundle = NULL;
- serviceReference_getBundle(reference, &bundle);
- pubsubSerializer_fillMsgTypesMap(msgTypes,bundle);
-
- if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
- hashMap_destroy(msgTypes,false,false);
- printf("TS: Unsupported subscriber!\n");
- }
- else{
- hashMap_put(ts->servicesMap, service, msgTypes);
- }
-
- }
- celixThreadMutex_unlock(&ts->ts_lock);
- printf("TS: New subscriber registered.\n");
- return status;
-
-}
-
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
- celix_status_t status = CELIX_SUCCESS;
- topic_subscription_pt ts = handle;
-
- celixThreadMutex_lock(&ts->ts_lock);
- if (hashMap_containsKey(ts->servicesMap, service)) {
- hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
- if(msgTypes!=NULL){
- pubsubSerializer_emptyMsgTypesMap(msgTypes);
- hashMap_destroy(msgTypes,false,false);
- }
- }
- celixThreadMutex_unlock(&ts->ts_lock);
-
- printf("TS: Subscriber unregistered.\n");
- return status;
-}
-
-
-static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
-
- hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
- while (hashMapIterator_hasNext(iter)) {
- hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
- pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
- hash_map_pt msgTypes = hashMapEntry_getValue(entry);
-
- pubsub_message_type *msgType = hashMap_get(msgTypes,&(msg->header.type));
-
- if (msgType == NULL) {
- printf("TS: Primary message %d not supported. NOT receiving any part of the whole message.\n",msg->header.type);
- }
- else{
- void *msgInst = NULL;
- char *name = pubsubSerializer_getName(msgType);
- version_pt msgVersion = pubsubSerializer_getVersion(msgType);
-
- bool validVersion = checkVersion(msgVersion,&msg->header);
-
- if(validVersion){
- celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst);
-
- if (status == CELIX_SUCCESS) {
- bool release = true;
- pubsub_multipart_callbacks_t mp_callbacks;
- mp_callbacks.handle = sub;
- mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
- mp_callbacks.getMultipart = NULL;
-
- subsvc->receive(subsvc->handle, name, msg->header.type, msgInst, &mp_callbacks, &release);
- if(release){
- pubsubSerializer_freeMsg(msgType, msgInst);
- }
- }
- else{
- printf("TS: Cannot deserialize msgType %s.\n",name);
- }
-
- }
- else{
- int major=0,minor=0;
- version_getMajor(msgVersion,&major);
- version_getMinor(msgVersion,&minor);
- printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,msg->header.major,msg->header.minor);
- }
-
- }
- }
- hashMapIterator_destroy(iter);
-}
-
-static void* udp_recv_thread_func(void * arg) {
- topic_subscription_pt sub = (topic_subscription_pt) arg;
-
-#if defined(__APPLE__) && defined(__MACH__)
- //TODO: use kqueue for OSX
- //struct kevent events[MAX_EPOLL_EVENTS];
- while (sub->running) {
- int nfds = 0;
- if(nfds > 0) {
- pubsub_udp_msg_pt udpMsg = NULL;
- process_msg(sub, udpMsg);
- }
- }
-#else
-
- struct epoll_event events[MAX_EPOLL_EVENTS];
-
- while (sub->running) {
- int nfds = epoll_wait(sub->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
- int i;
- for(i = 0; i < nfds; i++ ) {
- unsigned int index;
- unsigned int size;
- if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
- // Handle data
- pubsub_udp_msg_pt udpMsg = NULL;
- if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
- printf("TS: ERROR largeUdp_read with index %d\n", index);
- continue;
- }
-
- if (udpMsg->header.type == 0){
- //Raw msg, since raw messages are not supported, don't do anything.
- }else{
- process_msg(sub, udpMsg);
- }
-
- free(udpMsg);
- }
- }
- }
-#endif
-
- return NULL;
-}
-
-
-static void sigusr1_sighandler(int signo){
- printf("TS: Topic subscription being shut down...\n");
- return;
-}
-
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
- bool check=false;
- int major=0,minor=0;
-
- if(msgVersion!=NULL){
- version_getMajor(msgVersion,&major);
- version_getMinor(msgVersion,&minor);
- if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
- check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
- }
- }
-
- return check;
-}
-
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
- *msgTypeId = pubsubSerializer_hashCode(msgType);
- return 0;
-}
-
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/CMakeLists.txt b/celix-pubsub/pubsub/pubsub_admin_zmq/CMakeLists.txt
deleted file mode 100644
index 0c708ea..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ /dev/null
@@ -1,73 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if (BUILD_PSA_ZMQ)
-
- find_package(ZMQ REQUIRED)
- find_package(CZMQ REQUIRED)
- find_package(Jansson REQUIRED)
-
- include_directories("${ZMQ_INCLUDE_DIR}")
- include_directories("${CZMQ_INCLUDE_DIR}")
- include_directories("${JANSSON_INCLUDE_DIR}")
- include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
- include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
- include_directories("${PROJECT_SOURCE_DIR}/dfi/public/include")
- include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
- include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
- include_directories("private/include")
- include_directories("public/include")
- if (SERIALIZER_PATH)
- include_directories("${SERIALIZER_PATH}/include")
- endif()
- if (SERIALIZER_LIB_INCLUDE_DIR)
- include_directories("${SERIALIZER_LIB_INCLUDE_DIR}")
- endif()
- if (SERIALIZER_LIB_DIR)
- link_directories("${SERIALIZER_LIB_DIR}")
- endif()
-
- if (BUILD_ZMQ_SECURITY)
- add_definitions(-DBUILD_WITH_ZMQ_SECURITY=1)
-
- find_package(OpenSSL 1.1.0 REQUIRED)
- include_directories("${OPENSSL_INCLUDE_DIR}")
-
- set (ZMQ_CRYPTO_C "private/src/zmq_crypto.c")
- endif()
-
- add_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin"
- VERSION "1.0.0"
- SOURCES
- private/src/psa_activator.c
- private/src/pubsub_admin_impl.c
- private/src/topic_subscription.c
- private/src/topic_publication.c
- ${ZMQ_CRYPTO_C}
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
- ${PUBSUB_SERIALIZER_SRC}
- )
-
- set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminZmq PROPERTIES INSTALL_RPATH "$ORIGIN")
- target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${JANSSON_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} ${SERIALIZER_LIBRARY})
- install_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq)
-
-endif()
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
deleted file mode 100644
index efe76c3..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin_impl.h
- *
- * \date Dec 5, 2013
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_ADMIN_IMPL_H_
-#define PUBSUB_ADMIN_IMPL_H_
-
-#include <czmq.h>
-/* The following undefs prevent the collision between:
- * - sys/syslog.h (which is included within czmq)
- * - celix/dfi/dfi_log_util.h
- */
-#undef LOG_DEBUG
-#undef LOG_WARNING
-#undef LOG_INFO
-#undef LOG_WARNING
-
-#include "pubsub_admin.h"
-#include "log_helper.h"
-
-#define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT"
-#define PSA_ZMQ_MAX_PORT "PSA_ZMQ_MAX_PORT"
-
-#define PSA_ZMQ_DEFAULT_BASE_PORT 5501
-#define PSA_ZMQ_DEFAULT_MAX_PORT 6000
-
-struct pubsub_admin {
-
- bundle_context_pt bundle_context;
- log_helper_pt loghelper;
-
- celix_thread_mutex_t localPublicationsLock;
- hash_map_pt localPublications;//<topic(string),service_factory_pt>
-
- celix_thread_mutex_t externalPublicationsLock;
- hash_map_pt externalPublications;//<topic(string),List<pubsub_ep>>
-
- celix_thread_mutex_t subscriptionsLock;
- hash_map_pt subscriptions; //<topic(string),topic_subscription>
-
- celix_thread_mutex_t pendingSubscriptionsLock;
- hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
-
- char* ipAddress;
-
- zactor_t* zmq_auth;
-
- unsigned int basePort;
- unsigned int maxPort;
-};
-
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
-
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
-
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic);
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic);
-
-celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
-celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
-
-#endif /* PUBSUB_ADMIN_IMPL_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
deleted file mode 100644
index 1c12eb8..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_publish_service_private.h
- *
- * \date Sep 24, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-#define PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-
-#include "publisher.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_common.h"
-
-typedef struct topic_publication *topic_publication_pt;
-
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP,char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
-
-#endif /* PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
deleted file mode 100644
index d6bf8fb..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * topic_subscription.h
- *
- * \date Sep 22, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef TOPIC_SUBSCRIPTION_H_
-#define TOPIC_SUBSCRIPTION_H_
-
-#include "celix_threads.h"
-#include "array_list.h"
-#include "celixbool.h"
-#include "service_tracker.h"
-
-#include "pubsub_endpoint.h"
-#include "pubsub_common.h"
-
-typedef struct topic_subscription* topic_subscription_pt;
-
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, char* scope, char* topic,topic_subscription_pt* out);
-celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
-celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
-celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
-
-celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL);
-celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL);
-celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL);
-celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL);
-
-celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
-celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
-
-celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
-celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
-unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);
-
-#endif /*TOPIC_SUBSCRIPTION_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/zmq_crypto.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/zmq_crypto.h b/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/zmq_crypto.h
deleted file mode 100644
index f1a990f..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/zmq_crypto.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * zmq_crypto.h
- *
- * \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef ZMQ_CRYPTO_H_
-#define ZMQ_CRYPTO_H_
-
-#include <czmq.h>
-
-#define PROPERTY_KEYS_FILE_PATH "keys.file.path"
-#define PROPERTY_KEYS_FILE_NAME "keys.file.name"
-#define DEFAULT_KEYS_FILE_PATH "/etc/"
-#define DEFAULT_KEYS_FILE_NAME "pubsub.keys"
-
-zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path);
-int generate_sha256_hash(char* text, unsigned char* digest);
-int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext);
-
-#endif
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/psa_activator.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
deleted file mode 100644
index 20f6070..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * psa_activator.c
- *
- * \date Sep 30, 2011
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "bundle_activator.h"
-#include "service_registration.h"
-
-#include "pubsub_admin_impl.h"
-
-
-struct activator {
- pubsub_admin_pt admin;
- pubsub_admin_service_pt adminService;
- service_registration_pt registration;
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator;
-
- activator = calloc(1, sizeof(*activator));
- if (!activator) {
- status = CELIX_ENOMEM;
- }
- else{
- *userData = activator;
- status = pubsubAdmin_create(context, &(activator->admin));
- }
-
- return status;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
- pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc));
-
- if (!pubsubAdminSvc) {
- status = CELIX_ENOMEM;
- }
- else{
- pubsubAdminSvc->admin = activator->admin;
-
- pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
- pubsubAdminSvc->removePublication = pubsubAdmin_removePublication;
-
- pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
- pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription;
-
- pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
- pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
-
- pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
- pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
-
- activator->adminService = pubsubAdminSvc;
-
- status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
-
- }
-
-
- return status;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
-
- serviceRegistration_unregister(activator->registration);
- activator->registration = NULL;
-
- free(activator->adminService);
- activator->adminService = NULL;
-
- return status;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
-
- pubsubAdmin_destroy(activator->admin);
- activator->admin = NULL;
-
- free(activator);
-
- return status;
-}
-
-