You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 18:34:18 UTC
[06/19] celix git commit: Fixed coverity issues
Fixed coverity issues
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/8c84284f
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/8c84284f
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/8c84284f
Branch: refs/heads/develop
Commit: 8c84284f6f63de076bbc1d8f66ac77115d2e5707
Parents: 1af1ff0
Author: Roy Lenferink <le...@gmail.com>
Authored: Mon Feb 6 17:43:19 2017 +0100
Committer: Roy Lenferink <le...@gmail.com>
Committed: Mon Feb 6 17:43:19 2017 +0100
----------------------------------------------------------------------
.../publisher/private/src/mp_pub_activator.c | 28 +++--
.../private/src/pubsub_admin_impl.c | 38 ++++---
.../private/src/topic_subscription.c | 112 +++++++++++--------
.../private/src/pubsub_admin_impl.c | 36 +++---
.../private/src/topic_publication.c | 2 +-
.../private/src/topic_subscription.c | 12 +-
.../public/include/pubsub_serializer.h | 5 +-
.../pubsub_common/public/src/dyn_msg_utils.c | 44 ++++----
.../pubsub/pubsub_common/public/src/etcd.c | 6 +-
.../pubsub_common/public/src/log_helper.c | 48 ++++----
.../pubsub_common/public/src/pubsub_endpoint.c | 63 +++++------
.../public/src/pubsub_serializer.c | 24 ++--
.../pubsub_discovery/private/src/etcd_watcher.c | 8 +-
.../private/src/pubsub_discovery_impl.c | 4 +-
.../private/src/pstm_activator.c | 15 +++
.../private/src/pubsub_topology_manager.c | 14 ++-
16 files changed, 253 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c b/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
index 231157a..5c8b145 100644
--- a/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
+++ b/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
@@ -44,26 +44,32 @@ struct publisherActivator {
};
celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+ celix_status_t status = CELIX_SUCCESS;
+
struct publisherActivator * act = malloc(sizeof(*act));
const char* fwUUID = NULL;
bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- if(fwUUID==NULL){
+ if(fwUUID == NULL){
printf("MP_PUBLISHER: Cannot retrieve fwUUID.\n");
- return CELIX_INVALID_BUNDLE_CONTEXT;
+ status = CELIX_INVALID_BUNDLE_CONTEXT;
}
- bundle_pt bundle = NULL;
- long bundleId = 0;
- bundleContext_getBundle(context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
-
- arrayList_create(&(act->trackerList));
- act->client = publisher_create(act->trackerList,fwUUID,bundleId);
- *userData = act;
+ if (status == CELIX_SUCCESS){
+ bundle_pt bundle = NULL;
+ long bundleId = 0;
+ bundleContext_getBundle(context,&bundle);
+ bundle_getBundleId(bundle,&bundleId);
+
+ arrayList_create(&(act->trackerList));
+ act->client = publisher_create(act->trackerList,fwUUID,bundleId);
+ *userData = act;
+ } else {
+ free(act);
+ }
- return CELIX_SUCCESS;
+ return status;
}
celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 9182e1b..bd3bb2f 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -104,7 +104,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
if (mc_ip == NULL) {
const char *mc_prefix = NULL;
const char *interface = NULL;
- int b0, b1, b2, b3;
+ int b0 = 224, b1 = 100, b2 = 1, b3 = 1;
bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
if(mc_prefix == NULL) {
mc_prefix = DEFAULT_MC_PREFIX;
@@ -127,22 +127,28 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
int sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
if(sendSocket == -1) {
perror("pubsubAdmin_create:socket");
- return CELIX_SERVICE_EXCEPTION;
- }
- char loop = 1;
- if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
- perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
- return CELIX_SERVICE_EXCEPTION;
+ status = CELIX_SERVICE_EXCEPTION;
}
- struct in_addr multicast_interface;
- inet_aton(if_ip, &multicast_interface);
- if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
- perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
- return CELIX_SERVICE_EXCEPTION;
- }
+ if (status == CELIX_SUCCESS){
+ char loop = 1;
+ if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
+ perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
+
+ if (status == CELIX_SUCCESS){
+ struct in_addr multicast_interface;
+ inet_aton(if_ip, &multicast_interface);
+ if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
+ perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
- (*admin)->sendSocket = sendSocket;
+ (*admin)->sendSocket = sendSocket;
+ }
+
+ }
}
#endif
@@ -162,6 +168,10 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
(*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
}
+ if (status != CELIX_SUCCESS){
+ pubsubAdmin_destroy(*admin);
+ }
+
}
return status;
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 0907f16..0caf084 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -226,65 +226,79 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
if (!hashMap_containsKey(ts->socketMap, pubURL)){
celixThreadMutex_lock(&ts->ts_lock);
+
int *recvSocket = calloc(sizeof(int), 1);
*recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
if (*recvSocket < 0) {
perror("pubsub_topicSubscriptionCreate:socket");
- return CELIX_SERVICE_EXCEPTION;
+ status = CELIX_SERVICE_EXCEPTION;
}
- int reuse = 1;
- if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
- perror("setsockopt() SO_REUSEADDR");
- return CELIX_SERVICE_EXCEPTION;
+ if (status == CELIX_SUCCESS){
+ int reuse = 1;
+ if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
+ perror("setsockopt() SO_REUSEADDR");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
}
- // TODO Check if there is a better way to parse the URL to IP/Portnr
- //replace ':' by spaces
- char *url = strdup(pubURL);
- char *pt = url;
- while((pt=strchr(pt, ':')) != NULL) {
- *pt = ' ';
- }
- char mcIp[100];
- unsigned short mcPort;
- sscanf(url, "udp //%s %hu", mcIp, &mcPort);
- free (url);
-
- printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
-
- struct ip_mreq mc_addr;
- mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
- mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
- printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
- if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
- perror("setsockopt() IP_ADD_MEMBERSHIP");
- return CELIX_SERVICE_EXCEPTION;
- }
+ if (status == CELIX_SUCCESS){
+ // TODO Check if there is a better way to parse the URL to IP/Portnr
+ //replace ':' by spaces
+ char *url = strdup(pubURL);
+ char *pt = url;
+ while((pt=strchr(pt, ':')) != NULL) {
+ *pt = ' ';
+ }
+ char mcIp[100];
+ unsigned short mcPort;
+ sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+ free (url);
+
+ printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
+
+ struct ip_mreq mc_addr;
+ mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+ mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
+ printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
+ if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
+ perror("setsockopt() IP_ADD_MEMBERSHIP");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
- struct sockaddr_in mcListenAddr;
- mcListenAddr.sin_family = AF_INET;
- mcListenAddr.sin_addr.s_addr = INADDR_ANY;
- mcListenAddr.sin_port = htons(mcPort);
- if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
- perror("bind()");
- return CELIX_SERVICE_EXCEPTION;
- }
+ if (status == CELIX_SUCCESS){
+ struct sockaddr_in mcListenAddr;
+ mcListenAddr.sin_family = AF_INET;
+ mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+ mcListenAddr.sin_port = htons(mcPort);
+ if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
+ perror("bind()");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
+ }
+
+ if (status == CELIX_SUCCESS){
+ #if defined(__APPLE__) && defined(__MACH__)
+ //TODO: Use kqueue for OSX
+ #else
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ ev.events = EPOLLIN;
+ ev.data.fd = *recvSocket;
+ if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
+ perror("epoll_ctl() EPOLL_CTL_ADD");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
+ #endif
+ }
-#if defined(__APPLE__) && defined(__MACH__)
- //TODO: Use kqueue for OSX
-#else
- struct epoll_event ev;
- memset(&ev, 0, sizeof(ev));
- ev.events = EPOLLIN;
- ev.data.fd = *recvSocket;
- if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
- perror("epoll_ctl() EPOLL_CTL_ADD");
- return CELIX_SERVICE_EXCEPTION;
}
-#endif
- hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
+ if (status == CELIX_SUCCESS){
+ hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
+ }else{
+ free(recvSocket);
+ }
celixThreadMutex_unlock(&ts->ts_lock);
@@ -444,9 +458,9 @@ static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
bool validVersion = checkVersion(msgVersion,&msg->header);
if(validVersion){
- int rc = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst);
+ celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst);
- if (rc != -1) {
+ if (status == CELIX_SUCCESS) {
bool release = true;
pubsub_multipart_callbacks_t mp_callbacks;
mp_callbacks.handle = sub;
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 8e14800..e670899 100644
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -505,7 +505,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
/* And check also for ANY subscription */
topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
if (any_sub != NULL && pubEP->endpoint != NULL) {
- pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint);
+ pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint);
}
free(scope_topic);
@@ -559,22 +559,24 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
arrayList_remove(ext_pub_list,i);
}
}
- // Check if there are more publsihers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
- for(i=0; i<arrayList_size(ext_pub_list);i++) {
- pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
- count++;
- }
- }
+ // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
+ int ext_pub_list_size = arrayList_size(ext_pub_list);
+ for(i=0; i<ext_pub_list_size; i++) {
+ pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+ if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
+ count++;
+ }
+ }
+
+ if(ext_pub_list_size == 0){
+ hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
+ char* topic = (char*)hashMapEntry_getKey(entry);
+ array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
+ hashMap_remove(admin->externalPublications,topic);
+ arrayList_destroy(list);
+ free(topic);
+ }
- }
- if(arrayList_size(ext_pub_list)==0){
- hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
- char* topic = (char*)hashMapEntry_getKey(entry);
- array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
- hashMap_remove(admin->externalPublications,topic);
- arrayList_destroy(list);
- free(topic);
}
celixThreadMutex_unlock(&admin->externalPublicationsLock);
@@ -591,7 +593,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
/* And check also for ANY subscription */
topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
- pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
+ pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
}
free(scope_topic);
celixThreadMutex_unlock(&admin->subscriptionsLock);
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index b76b1ce..1a036db 100644
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -189,7 +189,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address
- rv = zsock_bind (socket, bindAddress);
+ rv = zsock_bind (socket, "%s", bindAddress);
if (rv == -1) {
perror("Error for zmq_bind");
}
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index f58f516..cb9aff5 100644
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -336,7 +336,7 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->socket_lock);
- if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,pubURL) != 0){
+ if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket, "%s", pubURL) != 0){
status = CELIX_SERVICE_EXCEPTION;
}
celixThreadMutex_unlock(&ts->socket_lock);
@@ -366,7 +366,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->socket_lock);
- if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,pubURL) != 0){
+ if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket, "%s", pubURL) != 0){
status = CELIX_SERVICE_EXCEPTION;
}
celixThreadMutex_unlock(&ts->socket_lock);
@@ -497,9 +497,9 @@ static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
if(validVersion){
- int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
+ celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
- if (rc != -1) {
+ if (status == CELIX_SUCCESS) {
bool release = true;
mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list);
@@ -735,9 +735,9 @@ static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_ms
bool validVersion = checkVersion(msgVersion,header);
if(validVersion){
- int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
+ celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
- if(rc != -1){
+ if(status == CELIX_SUCCESS){
unsigned int* msgId = calloc(1,sizeof(unsigned int));
*msgId = header->type;
msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry));
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
index c1f9a4b..565bac4 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -29,11 +29,12 @@
#include "bundle.h"
#include "hash_map.h"
+#include "celix_errno.h"
typedef struct _pubsub_message_type pubsub_message_type;
-int pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
-int pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output);
+celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
+celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output);
unsigned int pubsubSerializer_hashCode(const char *string);
version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType);
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c b/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
index 8309c11..11e4507 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
@@ -54,13 +54,14 @@ void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
char *metaInfPath = NULL;
root = getMsgDescriptionDir(bundle);
- asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
- addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
- addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
+ if(root != NULL){
+ asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
- free(metaInfPath);
- if(root!=NULL){
+ addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
+ addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
+
+ free(metaInfPath);
free(root);
}
}
@@ -127,25 +128,30 @@ static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash
snprintf(path, 128, "%s/%s", root, entry->d_name);
FILE *stream = fopen(path,"r");
- dyn_message_type* msgType = NULL;
+ if (stream != NULL){
+ dyn_message_type* msgType = NULL;
- int rc = dynMessage_parse(stream, &msgType);
- if (rc == 0 && msgType!=NULL) {
+ int rc = dynMessage_parse(stream, &msgType);
+ if (rc == 0 && msgType!=NULL) {
- char* msgName = NULL;
- dynMessage_getName(msgType,&msgName);
+ char* msgName = NULL;
+ dynMessage_getName(msgType,&msgName);
- if(msgName!=NULL){
- unsigned int* msgId = malloc(sizeof(unsigned int));
- *msgId = utils_stringHash(msgName);
- hashMap_put(msgTypesMap,msgId,msgType);
- }
+ if(msgName!=NULL){
+ unsigned int* msgId = malloc(sizeof(unsigned int));
+ *msgId = utils_stringHash(msgName);
+ hashMap_put(msgTypesMap,msgId,msgType);
+ }
+ }
+ else{
+ printf("DMU: cannot parse message from descriptor %s\n.",path);
+ }
+ fclose(stream);
+ }else{
+ printf("DMU: cannot open descriptor file %s\n.",path);
}
- else{
- printf("DMU: cannot parse message from descriptor %s\n.",path);
- }
- fclose(stream);
+
}
entry = readdir(dir);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
index bbb17c3..99ec87a 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
@@ -289,15 +289,13 @@ int etcd_set_with_check(const char* key, const char* value, int ttl, bool always
int result = 0;
if (etcd_get(key, &etcd_value, NULL) == 0) {
if (strcmp(etcd_value, value) != 0) {
- printf("[ETCDLIB} WARNING: value already exists and is different\n");
+ printf("[ETCDLIB] WARNING: value already exists and is different\n");
printf(" key = %s\n", key);
printf(" old value = %s\n", etcd_value);
printf(" new value = %s\n", value);
result = -1;
}
- if (etcd_value) {
- free(etcd_value);
- }
+ free(etcd_value);
}
if(always_write || !result) {
result = etcd_set(key, value, ttl, false);
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c b/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
index 7a63363..b18ef36 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
@@ -149,9 +149,6 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
return status;
}
-
-
-
celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
{
celix_status_t status = CELIX_SUCCESS;
@@ -169,7 +166,6 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m
int i = 0;
for (; i < arrayList_size(loghelper->logServices); i++) {
-
log_service_pt logService = arrayList_get(loghelper->logServices, i);
if (logService != NULL) {
@@ -179,31 +175,29 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m
}
pthread_mutex_unlock(&loghelper->logListLock);
- }
+ if (!logged && loghelper->stdOutFallback) {
+ char *levelStr = NULL;
+
+ switch (level) {
+ case OSGI_LOGSERVICE_ERROR:
+ levelStr = "ERROR";
+ break;
+ case OSGI_LOGSERVICE_WARNING:
+ levelStr = "WARNING";
+ break;
+ case OSGI_LOGSERVICE_INFO:
+ levelStr = "INFO";
+ break;
+ case OSGI_LOGSERVICE_DEBUG:
+ default:
+ levelStr = "DEBUG";
+ break;
+ }
- if (!logged && loghelper->stdOutFallback) {
- char *levelStr = NULL;
-
- switch (level) {
- case OSGI_LOGSERVICE_ERROR:
- levelStr = "ERROR";
- break;
- case OSGI_LOGSERVICE_WARNING:
- levelStr = "WARNING";
- break;
- case OSGI_LOGSERVICE_INFO:
- levelStr = "INFO";
- break;
- case OSGI_LOGSERVICE_DEBUG:
- default:
- levelStr = "DEBUG";
- break;
- }
-
- printf("%s: %s\n", levelStr, msg);
- }
-
+ printf("%s: %s\n", levelStr, msg);
+ }
+ }
return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
index 4af52ac..8586203 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
@@ -35,36 +35,43 @@
#include "constants.h"
#include "subscriber.h"
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* psEp) {
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* out) {
celix_status_t status = CELIX_SUCCESS;
- *psEp = calloc(1, sizeof(**psEp));
+
+ pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
if (fwUUID != NULL) {
- (*psEp)->frameworkUUID = strdup(fwUUID);
+ psEp->frameworkUUID = strdup(fwUUID);
}
if (scope != NULL) {
- (*psEp)->scope = strdup(scope);
+ psEp->scope = strdup(scope);
}
if (topic != NULL) {
- (*psEp)->topic = strdup(topic);
+ psEp->topic = strdup(topic);
}
- (*psEp)->serviceID = serviceId;
+ psEp->serviceID = serviceId;
if (endpoint != NULL) {
- (*psEp)->endpoint = strdup(endpoint);
+ psEp->endpoint = strdup(endpoint);
}
+ if (status != CELIX_SUCCESS) {
+ pubsubEndpoint_destroy(psEp);
+ } else {
+ *out = psEp;
+ }
+
return status;
}
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp){
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* out){
celix_status_t status = CELIX_SUCCESS;
- *psEp = calloc(1,sizeof(**psEp));
+ pubsub_endpoint_pt psEp = calloc(1,sizeof(*psEp));
bundle_pt bundle = NULL;
bundle_context_pt ctxt = NULL;
@@ -82,58 +89,49 @@ celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt re
const char* serviceId = NULL;
serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
-
if(fwUUID!=NULL){
- (*psEp)->frameworkUUID=strdup(fwUUID);
+ psEp->frameworkUUID = strdup(fwUUID);
}
if(scope!=NULL){
- (*psEp)->scope=strdup(scope);
+ psEp->scope = strdup(scope);
} else {
- (*psEp)->scope=strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
+ psEp->scope = strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
}
if(topic!=NULL){
- (*psEp)->topic=strdup(topic);
+ psEp->topic = strdup(topic);
}
if(serviceId!=NULL){
- (*psEp)->serviceID = strtol(serviceId,NULL,10);
+ psEp->serviceID = strtol(serviceId,NULL,10);
}
- if (!(*psEp)->frameworkUUID || !(*psEp)->serviceID || !(*psEp)->scope || !(*psEp)->topic) {
+ if (!psEp->frameworkUUID || !psEp->serviceID || !psEp->scope || !psEp->topic) {
fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!.");
status = CELIX_BUNDLE_EXCEPTION;
}
+ if (status != CELIX_SUCCESS) {
+ pubsubEndpoint_destroy(psEp);
+ } else {
+ *out = psEp;
+ }
+
return status;
}
celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
- if(psEp->frameworkUUID!=NULL){
+ if (psEp != NULL) {
free(psEp->frameworkUUID);
- psEp->frameworkUUID = NULL;
- }
-
- if(psEp->scope!=NULL){
free(psEp->scope);
- psEp->scope = NULL;
- }
-
- if(psEp->topic!=NULL){
free(psEp->topic);
- psEp->topic = NULL;
- }
-
- if(psEp->endpoint!=NULL){
free(psEp->endpoint);
- psEp->endpoint = NULL;
}
-
free(psEp);
- return CELIX_SUCCESS;
+ return CELIX_SUCCESS;
}
bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
@@ -145,7 +143,6 @@ bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
);
-
}
char *createScopeTopicKey(const char* scope, const char* topic) {
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
index 85ef868..bb6096a 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
@@ -42,35 +42,39 @@ struct _pubsub_message_type { /* _dyn_message_type */
version_pt msgVersion;
};
-int pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
-
- int rc = 0;
+celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
+ celix_status_t status = CELIX_SUCCESS;
dyn_type *type = NULL;
dynMessage_getMessageType((dyn_message_type *) msgType, &type);
char *jsonOutput = NULL;
- rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
+ int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
+ if (rc != 0){
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
*output = (void *) jsonOutput;
*outputLen = strlen(jsonOutput) + 1;
- return rc;
+ return status;
}
-int pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output){
-
- int rc = 0;
+celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output){
+ celix_status_t status = CELIX_SUCCESS;
dyn_type *type = NULL;
dynMessage_getMessageType((dyn_message_type *) msgType, &type);
void *textOutput = NULL;
- rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+ int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+ if (rc != 0){
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
*output = textOutput;
- return rc;
+ return status;
}
unsigned int pubsubSerializer_hashCode(const char *string){
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
index a394045..0d8468e 100644
--- a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
@@ -243,19 +243,17 @@ celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_c
(*watcher)->scope = strdup(scope);
(*watcher)->topic = strdup(topic);
-
celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
celixThreadMutex_lock(&(*watcher)->watcherLock);
- if ((status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher)) != CELIX_SUCCESS) {
- return status;
+ status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher);
+ if (status == CELIX_SUCCESS) {
+ (*watcher)->running = true;
}
- (*watcher)->running = true;
celixThreadMutex_unlock(&(*watcher)->watcherLock);
-
return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
index 1b6aca9..0c7d6c4 100644
--- a/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
@@ -131,7 +131,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
etcdWatcher_stop(wi->watcher);
}
hashMapIterator_destroy(iter);
- celixThreadMutex_unlock(&ps_discovery->watchersMutex);
celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
@@ -168,6 +167,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
hashMapIterator_destroy(iter);
hashMap_destroy(ps_discovery->watchers, true, true);
celixThreadMutex_unlock(&ps_discovery->watchersMutex);
+
return status;
}
@@ -316,7 +316,7 @@ celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt
free(pub_key);
if(pubEP_list==NULL){
printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",pubEP->topic);
- return CELIX_ILLEGAL_STATE;
+ status = CELIX_ILLEGAL_STATE;
}
else{
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
index ae7b4a9..a35d161 100644
--- a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
+++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
@@ -149,8 +149,23 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
if (status == CELIX_SUCCESS) {
*userData = activator;
}
+ if (status != CELIX_SUCCESS){
+ serviceTracker_destroy(activator->pubsubAdminTracker);
+ }
+ }
+ if (status != CELIX_SUCCESS){
+ serviceTracker_destroy(activator->pubsubDiscoveryTracker);
}
}
+ if (status != CELIX_SUCCESS){
+ pubsub_topologyManager_destroy(activator->manager);
+ }
+ }
+ if (status != CELIX_SUCCESS){ // an exception occurred so free allocated memory
+ logHelper_stop(activator->loghelper);
+ logHelper_destroy(&activator->loghelper);
+ free(activator);
+
}
return status;
http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 5ba1315..a6541b9 100644
--- a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -372,7 +372,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
celixThreadMutex_unlock(&manager->psaListLock);
}
else{
- status=CELIX_INVALID_BUNDLE_CONTEXT;
+ status = CELIX_INVALID_BUNDLE_CONTEXT;
}
return status;
@@ -457,7 +457,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
}
else{
- status=CELIX_INVALID_BUNDLE_CONTEXT;
+ status = CELIX_INVALID_BUNDLE_CONTEXT;
}
return status;
@@ -631,14 +631,14 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
celixThreadMutex_unlock(&manager->psaListLock);
}
- free(topic);
}
else{
status=CELIX_INVALID_BUNDLE_CONTEXT;
}
- free(scope);
+ free(topic);
+ free(scope);
}
return status;
@@ -731,8 +731,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
celixThreadMutex_unlock(&manager->psaListLock);
pubsubEndpoint_destroy(pubcmp);
- free(pub_scope);
- free(pub_topic);
+
free(pub_key);
}
@@ -741,6 +740,9 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
else{
status=CELIX_INVALID_BUNDLE_CONTEXT;
}
+
+ free(pub_scope);
+ free(pub_topic);
}
return status;