You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by gr...@apache.org on 2017/10/02 12:59:31 UTC
celix git commit: Fixed some Coverity issues
Repository: celix
Updated Branches:
refs/heads/develop 76882849a -> 6818c4f57
Fixed some Coverity issues
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/6818c4f5
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/6818c4f5
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/6818c4f5
Branch: refs/heads/develop
Commit: 6818c4f57ebd95b1b9eba478254da2ec559a91c4
Parents: 7688284
Author: gricciardi <gr...@apache.org>
Authored: Mon Oct 2 14:59:14 2017 +0200
Committer: gricciardi <gr...@apache.org>
Committed: Mon Oct 2 14:59:14 2017 +0200
----------------------------------------------------------------------
.gitignore | 2 +
pubsub/pubsub_admin_udp_mc/CMakeLists.txt | 2 +-
.../private/include/pubsub_admin_impl.h | 1 +
.../private/src/pubsub_admin_impl.c | 182 +++++++++-------
.../private/src/topic_subscription.c | 30 ++-
pubsub/pubsub_admin_zmq/CMakeLists.txt | 2 +-
.../private/include/pubsub_admin_impl.h | 1 +
.../private/src/pubsub_admin_impl.c | 14 +-
.../private/src/topic_publication.c | 7 +-
.../private/src/topic_subscription.c | 8 +-
pubsub/pubsub_common/public/src/log_helper.c | 209 -------------------
pubsub/pubsub_serializer_json/CMakeLists.txt | 2 +-
pubsub/pubsub_topology_manager/CMakeLists.txt | 2 +-
.../private/src/pubsub_topology_manager.c | 4 +-
14 files changed, 159 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index cb42137..0c6fae6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,5 @@ cmake-build*
.idea
nbproject
*.pyc
+*.enc
+*.pub
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index 7be1ee9..9e3f063 100644
--- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -35,8 +35,8 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
private/src/topic_subscription.c
private/src/topic_publication.c
private/src/large_udp.c
+ ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
)
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
index 731b037..7696722 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -51,6 +51,7 @@ struct pubsub_admin {
hash_map_pt subscriptions; //<topic(string),topic_subscription>
celix_thread_mutex_t pendingSubscriptionsLock;
+ celix_thread_mutexattr_t pendingSubscriptionsAttr;
hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
/* Those are used to keep track of valid subscriptions/publications that still have no valid serializer */
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 5a651da..c3136f9 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -79,103 +79,128 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
*admin = calloc(1, sizeof(**admin));
if (!*admin) {
- status = CELIX_ENOMEM;
+ return CELIX_ENOMEM;
+ }
+
+ char *mc_ip = NULL;
+ char *if_ip = NULL;
+ int sendSocket = -1;
+
+ if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
+ logHelper_start((*admin)->loghelper);
+ }
+ const char *mc_ip_prop = NULL;
+ bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
+ if(mc_ip_prop) {
+ mc_ip = strdup(mc_ip_prop);
}
- else{
- char *mc_ip = NULL;
- char *if_ip = NULL;
- (*admin)->bundle_context= context;
- (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
- (*admin)->topicPublicationsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
- arrayList_create(&((*admin)->noSerializerSubscriptions));
- arrayList_create(&((*admin)->noSerializerPublications));
- arrayList_create(&((*admin)->serializerList));
-
- celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
- celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
- celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL);
- celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
- celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, NULL);
- celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
- celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
-
- if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
- logHelper_start((*admin)->loghelper);
+#ifndef ANDROID
+ if (mc_ip == NULL) {
+ const char *mc_prefix = NULL;
+ const char *interface = NULL;
+ int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
+ bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
+ if(mc_prefix == NULL) {
+ mc_prefix = DEFAULT_MC_PREFIX;
}
- const char *mc_ip_prop = NULL;
- bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
- if(mc_ip_prop) {
- mc_ip = strdup(mc_ip_prop);
+
+ bundleContext_getProperty(context, PSA_ITF, &interface);
+ if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not retrieve IP address for interface %s", interface);
}
-#ifndef ANDROID
- if (mc_ip == NULL) {
- const char *mc_prefix = NULL;
- const char *interface = NULL;
- int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
- bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
- if(mc_prefix == NULL) {
- mc_prefix = DEFAULT_MC_PREFIX;
- }
- bundleContext_getProperty(context, PSA_ITF, &interface);
- if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not retrieve IP address for interface %s", interface);
- }
+ printf("IP Detected : %s\n", if_ip);
+ if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not parse IP address %s", if_ip);
+ b2 = 1;
+ b3 = 1;
+ }
- printf("IP Detected : %s\n", if_ip);
- if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not parse IP address %s", if_ip);
- b2 = 1;
- b3 = 1;
- }
+ asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
- asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
+ sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
+ if(sendSocket == -1) {
+ perror("pubsubAdmin_create:socket");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
- int sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
- if(sendSocket == -1) {
- perror("pubsubAdmin_create:socket");
- return CELIX_SERVICE_EXCEPTION;
- }
+ if(status == CELIX_SUCCESS){
char loop = 1;
if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
- close(sendSocket);
- return CELIX_SERVICE_EXCEPTION;
+ status = CELIX_SERVICE_EXCEPTION;
}
+ }
+ if(status == CELIX_SUCCESS){
struct in_addr multicast_interface;
inet_aton(if_ip, &multicast_interface);
if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
- close(sendSocket);
- return CELIX_SERVICE_EXCEPTION;
+ status = CELIX_SERVICE_EXCEPTION;
}
+ }
- (*admin)->sendSocket = sendSocket;
+ }
- }
-#endif
- if (if_ip != NULL) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s as interface for multicast communication", if_ip);
- (*admin)->ifIpAddress = if_ip;
- } else {
- (*admin)->ifIpAddress = strdup("127.0.0.1");
- }
- if (mc_ip != NULL) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s for service annunciation", mc_ip);
- (*admin)->mcIpAddress = mc_ip;
+ if(status != CELIX_SUCCESS){
+ logHelper_stop((*admin)->loghelper);
+ logHelper_destroy(&((*admin)->loghelper));
+ if(sendSocket >=0){
+ close(sendSocket);
+ }
+ if(if_ip != NULL){
+ free(if_ip);
}
- else {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: No IP address for service annunciation set. Using %s", DEFAULT_MC_IP);
- (*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
+ if(mc_ip != NULL){
+ free(mc_ip);
}
+ return status;
+ }
+ else{
+ (*admin)->sendSocket = sendSocket;
+ }
+
+#endif
+
+ (*admin)->bundle_context= context;
+ (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
+ (*admin)->topicPublicationsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
+ arrayList_create(&((*admin)->noSerializerSubscriptions));
+ arrayList_create(&((*admin)->noSerializerPublications));
+ arrayList_create(&((*admin)->serializerList));
+
+ celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
+ celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
+ celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
+ celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, NULL);
+ celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
+ celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
+
+ celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
+ celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+ celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
+
+ if (if_ip != NULL) {
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s as interface for multicast communication", if_ip);
+ (*admin)->ifIpAddress = if_ip;
+ } else {
+ (*admin)->ifIpAddress = strdup("127.0.0.1");
+ }
+ if (mc_ip != NULL) {
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s for service annunciation", mc_ip);
+ (*admin)->mcIpAddress = mc_ip;
+ }
+ else {
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: No IP address for service annunciation set. Using %s", DEFAULT_MC_IP);
+ (*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
}
return status;
@@ -249,7 +274,10 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
celixThreadMutex_destroy(&admin->usedSerializersLock);
celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
celixThreadMutex_destroy(&admin->serializerListLock);
+
celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
+ celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
+
celixThreadMutex_destroy(&admin->subscriptionsLock);
celixThreadMutex_destroy(&admin->localPublicationsLock);
celixThreadMutex_destroy(&admin->externalPublicationsLock);
@@ -353,6 +381,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
}
/* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
+ celixThreadMutex_lock(&admin->subscriptionsLock);
celixThreadMutex_lock(&admin->localPublicationsLock);
celixThreadMutex_lock(&admin->externalPublicationsLock);
@@ -418,9 +447,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
}
if(status==CELIX_SUCCESS){
- celixThreadMutex_lock(&admin->subscriptionsLock);
+
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
- celixThreadMutex_unlock(&admin->subscriptionsLock);
+
connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
}
}
@@ -433,6 +462,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
free(scope_topic);
celixThreadMutex_unlock(&admin->externalPublicationsLock);
celixThreadMutex_unlock(&admin->localPublicationsLock);
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
return status;
@@ -658,7 +688,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
/* And check also for ANY subscription */
topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
- pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
+ pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
}
free(scope_topic);
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 9bf0f80..d8e6f45 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -67,6 +67,7 @@ struct topic_subscription{
int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
hash_map_pt servicesMap; // key = service, value = msg types map
hash_map_pt socketMap; // key = URL, value = listen-socket
+ celix_thread_mutex_t socketMap_lock;
celix_thread_mutex_t pendingConnections_lock;
array_list_pt pendingConnections;
@@ -122,6 +123,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
arrayList_create(&ts->pendingDisconnections);
celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
+ celixThreadMutex_create(&ts->socketMap_lock, NULL);
ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
@@ -170,7 +172,10 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
arrayList_destroy(ts->sub_ep_list);
hashMap_destroy(ts->servicesMap,false,false);
+ celixThreadMutex_lock(&ts->socketMap_lock);
hashMap_destroy(ts->socketMap,true,true);
+ celixThreadMutex_unlock(&ts->socketMap_lock);
+ celixThreadMutex_destroy(&ts->socketMap_lock);
celixThreadMutex_lock(&ts->pendingConnections_lock);
arrayList_destroy(ts->pendingConnections);
@@ -214,6 +219,8 @@ celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
celix_status_t status = CELIX_SUCCESS;
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
ts->running = false;
@@ -223,14 +230,25 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
status = serviceTracker_close(ts->tracker);
+ celixThreadMutex_lock(&ts->socketMap_lock);
hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
while(hashMapIterator_hasNext(it)) {
hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
char *url = hashMapEntry_getKey(entry);
- pubsub_topicSubscriptionDisconnectPublisher(ts, url);
+ int *s = hashMapEntry_getValue(entry);
+ memset(&ev, 0, sizeof(ev));
+ if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+ printf("in if error()\n");
+ perror("epoll_ctl() EPOLL_CTL_DEL");
+ status += CELIX_SERVICE_EXCEPTION;
+ }
+ free(s);
free(url);
+ //hashMapIterator_remove(it);
}
hashMapIterator_destroy(it);
+ hashMap_clear(ts->socketMap, false, false);
+ celixThreadMutex_unlock(&ts->socketMap_lock);
return status;
@@ -241,7 +259,8 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
celix_status_t status = CELIX_SUCCESS;
- celixThreadMutex_lock(&ts->ts_lock);
+
+ celixThreadMutex_lock(&ts->socketMap_lock);
if(!hashMap_containsKey(ts->socketMap, pubURL)){
@@ -319,7 +338,8 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
free(recvSocket);
}
}
- celixThreadMutex_unlock(&ts->ts_lock);
+
+ celixThreadMutex_unlock(&ts->socketMap_lock);
return status;
}
@@ -348,7 +368,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
- celixThreadMutex_lock(&ts->ts_lock);
+ celixThreadMutex_lock(&ts->socketMap_lock);
if (hashMap_containsKey(ts->socketMap, pubURL)){
@@ -366,7 +386,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
}
- celixThreadMutex_unlock(&ts->ts_lock);
+ celixThreadMutex_unlock(&ts->socketMap_lock);
return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 8c3c727..ab250f9 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -50,8 +50,8 @@ if (BUILD_PUBSUB_PSA_ZMQ)
private/src/topic_subscription.c
private/src/topic_publication.c
${ZMQ_CRYPTO_C}
+ ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
)
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index 3a39a93..a18e9cf 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -68,6 +68,7 @@ struct pubsub_admin {
hash_map_pt subscriptions; //<topic(string),topic_subscription>
celix_thread_mutex_t pendingSubscriptionsLock;
+ celix_thread_mutexattr_t pendingSubscriptionsAttr;
hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
/* Those are used to keep track of valid subscriptions/publications that still have no valid serializer */
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 522b2a5..efba13a 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -106,12 +106,15 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
- celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL);
celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, NULL);
celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
+ celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
+ celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+ celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
+
if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
logHelper_start((*admin)->loghelper);
}
@@ -267,7 +270,10 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
celixThreadMutex_destroy(&admin->serializerListLock);
celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
+
+ celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
celixThreadMutex_destroy(&admin->subscriptionsLock);
+
celixThreadMutex_destroy(&admin->localPublicationsLock);
celixThreadMutex_destroy(&admin->externalPublicationsLock);
@@ -376,6 +382,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
}
/* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
+ celixThreadMutex_lock(&admin->subscriptionsLock);
celixThreadMutex_lock(&admin->localPublicationsLock);
celixThreadMutex_lock(&admin->externalPublicationsLock);
@@ -441,9 +448,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
}
if(status==CELIX_SUCCESS){
- celixThreadMutex_lock(&admin->subscriptionsLock);
+
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
- celixThreadMutex_unlock(&admin->subscriptionsLock);
+
connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
}
}
@@ -456,6 +463,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
free(scope_topic);
celixThreadMutex_unlock(&admin->externalPublicationsLock);
celixThreadMutex_unlock(&admin->localPublicationsLock);
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
return status;
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index b741771..2adb1af 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -464,10 +464,12 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+ celixThreadMutex_lock(&(bound->parent->tp_lock));
celixThreadMutex_lock(&(bound->mp_lock));
if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n");
celixThreadMutex_unlock(&(bound->mp_lock));
+ celixThreadMutex_unlock(&(bound->parent->tp_lock));
return -3;
}
@@ -518,16 +520,12 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
}
else{
arrayList_add(bound->mp_parts,msg);
- celixThreadMutex_lock(&(bound->parent->tp_lock));
snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
bound->mp_send_in_progress = false;
- celixThreadMutex_unlock(&(bound->parent->tp_lock));
}
break;
case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case
- celixThreadMutex_lock(&(bound->parent->tp_lock));
snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true);
- celixThreadMutex_unlock(&(bound->parent->tp_lock));
break;
default:
printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
@@ -549,6 +547,7 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
}
celixThreadMutex_unlock(&(bound->mp_lock));
+ celixThreadMutex_unlock(&(bound->parent->tp_lock));
return status;
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index cf51ed9..3c4e2a0 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -248,6 +248,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
+ celixThreadMutex_unlock(&ts->ts_lock);
+
celixThreadMutex_lock(&ts->socket_lock);
zsock_destroy(&(ts->zmq_socket));
#ifdef BUILD_WITH_ZMQ_SECURITY
@@ -257,8 +259,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
celixThreadMutex_unlock(&ts->socket_lock);
celixThreadMutex_destroy(&ts->socket_lock);
- celixThreadMutex_unlock(&ts->ts_lock);
-
+ celixThreadMutex_destroy(&ts->ts_lock);
free(ts);
@@ -515,7 +516,8 @@ static void* zmq_recv_thread_func(void * arg) {
} else {
perror("PSA_ZMQ_TS: header_recv thread");
}
- } else {
+ }
+ else {
pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg);
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_common/public/src/log_helper.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/log_helper.c b/pubsub/pubsub_common/public/src/log_helper.c
deleted file mode 100644
index 7a63363..0000000
--- a/pubsub/pubsub_common/public/src/log_helper.c
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * log_helper.c
- *
- * \date Nov 10, 2014
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <stdarg.h>
-
-#include "bundle_context.h"
-#include "service_tracker.h"
-#include "celix_threads.h"
-#include "array_list.h"
-
-#include "celix_errno.h"
-#include "log_service.h"
-
-#include "log_helper.h"
-
-#define LOGHELPER_ENABLE_STDOUT_FALLBACK_PROPERTY_NAME "LOGHELPER_ENABLE_STDOUT_FALLBACK"
-
-
-struct log_helper {
- bundle_context_pt bundleContext;
- service_tracker_pt logServiceTracker;
- celix_thread_mutex_t logListLock;
- array_list_pt logServices;
- bool stdOutFallback;
-};
-
-celix_status_t logHelper_logServiceAdded(void *handle, service_reference_pt reference, void *service);
-celix_status_t logHelper_logServiceRemoved(void *handle, service_reference_pt reference, void *service);
-
-
-celix_status_t logHelper_create(bundle_context_pt context, log_helper_pt* loghelper)
-{
- celix_status_t status = CELIX_SUCCESS;
-
- (*loghelper) = calloc(1, sizeof(**loghelper));
-
- if (!(*loghelper))
- {
- status = CELIX_ENOMEM;
- }
- else
- {
- const char* stdOutFallbackStr = NULL;
- (*loghelper)->bundleContext = context;
- (*loghelper)->logServiceTracker = NULL;
- (*loghelper)->stdOutFallback = false;
-
- bundleContext_getProperty(context, LOGHELPER_ENABLE_STDOUT_FALLBACK_PROPERTY_NAME, &stdOutFallbackStr);
-
- if (stdOutFallbackStr != NULL) {
- (*loghelper)->stdOutFallback = true;
- }
-
- pthread_mutex_init(&(*loghelper)->logListLock, NULL);
- arrayList_create(&(*loghelper)->logServices);
- }
-
- return status;
-}
-
-celix_status_t logHelper_start(log_helper_pt loghelper)
-{
- celix_status_t status = CELIX_SUCCESS;
- service_tracker_customizer_pt logTrackerCustomizer = NULL;
-
- status = serviceTrackerCustomizer_create(loghelper, NULL, logHelper_logServiceAdded, NULL, logHelper_logServiceRemoved, &logTrackerCustomizer);
-
- if (status == CELIX_SUCCESS) {
- status = serviceTracker_create(loghelper->bundleContext, (char*) OSGI_LOGSERVICE_NAME, logTrackerCustomizer, &loghelper->logServiceTracker);
- }
-
- if (status == CELIX_SUCCESS) {
- status = serviceTracker_open(loghelper->logServiceTracker);
- }
-
- return status;
-}
-
-
-
-celix_status_t logHelper_logServiceAdded(void *handle, service_reference_pt reference, void *service)
-{
- log_helper_pt loghelper = handle;
-
- pthread_mutex_lock(&loghelper->logListLock);
- arrayList_add(loghelper->logServices, service);
- pthread_mutex_unlock(&loghelper->logListLock);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t logHelper_logServiceRemoved(void *handle, service_reference_pt reference, void *service)
-{
- log_helper_pt loghelper = handle;
-
- pthread_mutex_lock(&loghelper->logListLock);
- arrayList_removeElement(loghelper->logServices, service);
- pthread_mutex_unlock(&loghelper->logListLock);
-
- return CELIX_SUCCESS;
-}
-
-
-celix_status_t logHelper_stop(log_helper_pt loghelper) {
- celix_status_t status = CELIX_SUCCESS;
-
- status = serviceTracker_close(loghelper->logServiceTracker);
-
- return status;
-}
-
-celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
- celix_status_t status = CELIX_SUCCESS;
-
- serviceTracker_destroy((*loghelper)->logServiceTracker);
-
- pthread_mutex_lock(&(*loghelper)->logListLock);
- arrayList_destroy((*loghelper)->logServices);
- pthread_mutex_unlock(&(*loghelper)->logListLock);
-
- pthread_mutex_destroy(&(*loghelper)->logListLock);
-
- free(*loghelper);
- *loghelper = NULL;
- return status;
-}
-
-
-
-
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
-{
- celix_status_t status = CELIX_SUCCESS;
- va_list listPointer;
- char msg[1024];
- msg[0] = '\0';
- bool logged = false;
-
- va_start(listPointer, message);
- vsnprintf(msg, 1024, message, listPointer);
-
- if (loghelper != NULL) {
- pthread_mutex_lock(&loghelper->logListLock);
-
- int i = 0;
-
- for (; i < arrayList_size(loghelper->logServices); i++) {
-
- log_service_pt logService = arrayList_get(loghelper->logServices, i);
-
- if (logService != NULL) {
- (logService->log)(logService->logger, level, msg);
- logged = true;
- }
- }
-
- pthread_mutex_unlock(&loghelper->logListLock);
- }
-
-
- if (!logged && loghelper->stdOutFallback) {
- char *levelStr = NULL;
-
- switch (level) {
- case OSGI_LOGSERVICE_ERROR:
- levelStr = "ERROR";
- break;
- case OSGI_LOGSERVICE_WARNING:
- levelStr = "WARNING";
- break;
- case OSGI_LOGSERVICE_INFO:
- levelStr = "INFO";
- break;
- case OSGI_LOGSERVICE_DEBUG:
- default:
- levelStr = "DEBUG";
- break;
- }
-
- printf("%s: %s\n", levelStr, msg);
- }
-
-
- return status;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt b/pubsub/pubsub_serializer_json/CMakeLists.txt
index a5798a4..147873a 100644
--- a/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -32,7 +32,7 @@ add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson
SOURCES
private/src/ps_activator.c
private/src/pubsub_serializer_impl.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
+ ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
)
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_topology_manager/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/CMakeLists.txt b/pubsub/pubsub_topology_manager/CMakeLists.txt
index cf2f4fa..b6eb796 100644
--- a/pubsub/pubsub_topology_manager/CMakeLists.txt
+++ b/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -29,8 +29,8 @@ add_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
SOURCES
private/src/pstm_activator.c
private/src/pubsub_topology_manager.c
+ ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
)
http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index b4e8f46..987d864 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -189,8 +189,6 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
- celixThreadMutex_lock(&manager->psaListLock);
-
/* Deactivate all publications */
celixThreadMutex_lock(&manager->publicationsLock);
@@ -248,8 +246,8 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
hashMapIterator_destroy(subit);
celixThreadMutex_unlock(&manager->subscriptionsLock);
+ celixThreadMutex_lock(&manager->psaListLock);
arrayList_removeElement(manager->psaList, psa);
-
celixThreadMutex_unlock(&manager->psaListLock);
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA");