You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 18:34:18 UTC

[06/19] celix git commit: Fixed coverity issues

Fixed coverity issues


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/8c84284f
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/8c84284f
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/8c84284f

Branch: refs/heads/develop
Commit: 8c84284f6f63de076bbc1d8f66ac77115d2e5707
Parents: 1af1ff0
Author: Roy Lenferink <le...@gmail.com>
Authored: Mon Feb 6 17:43:19 2017 +0100
Committer: Roy Lenferink <le...@gmail.com>
Committed: Mon Feb 6 17:43:19 2017 +0100

----------------------------------------------------------------------
 .../publisher/private/src/mp_pub_activator.c    |  28 +++--
 .../private/src/pubsub_admin_impl.c             |  38 ++++---
 .../private/src/topic_subscription.c            | 112 +++++++++++--------
 .../private/src/pubsub_admin_impl.c             |  36 +++---
 .../private/src/topic_publication.c             |   2 +-
 .../private/src/topic_subscription.c            |  12 +-
 .../public/include/pubsub_serializer.h          |   5 +-
 .../pubsub_common/public/src/dyn_msg_utils.c    |  44 ++++----
 .../pubsub/pubsub_common/public/src/etcd.c      |   6 +-
 .../pubsub_common/public/src/log_helper.c       |  48 ++++----
 .../pubsub_common/public/src/pubsub_endpoint.c  |  63 +++++------
 .../public/src/pubsub_serializer.c              |  24 ++--
 .../pubsub_discovery/private/src/etcd_watcher.c |   8 +-
 .../private/src/pubsub_discovery_impl.c         |   4 +-
 .../private/src/pstm_activator.c                |  15 +++
 .../private/src/pubsub_topology_manager.c       |  14 ++-
 16 files changed, 253 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c b/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
index 231157a..5c8b145 100644
--- a/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
+++ b/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
@@ -44,26 +44,32 @@ struct publisherActivator {
 };
 
 celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+
 	struct publisherActivator * act = malloc(sizeof(*act));
 
 	const char* fwUUID = NULL;
 
 	bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-	if(fwUUID==NULL){
+	if(fwUUID == NULL){
 		printf("MP_PUBLISHER: Cannot retrieve fwUUID.\n");
-		return CELIX_INVALID_BUNDLE_CONTEXT;
+		status = CELIX_INVALID_BUNDLE_CONTEXT;
 	}
 
-	bundle_pt bundle = NULL;
-	long bundleId = 0;
-	bundleContext_getBundle(context,&bundle);
-	bundle_getBundleId(bundle,&bundleId);
-
-	arrayList_create(&(act->trackerList));
-	act->client = publisher_create(act->trackerList,fwUUID,bundleId);
-	*userData = act;
+	if (status == CELIX_SUCCESS){
+		bundle_pt bundle = NULL;
+		long bundleId = 0;
+		bundleContext_getBundle(context,&bundle);
+		bundle_getBundleId(bundle,&bundleId);
+
+		arrayList_create(&(act->trackerList));
+		act->client = publisher_create(act->trackerList,fwUUID,bundleId);
+		*userData = act;
+	} else {
+		free(act);
+	}
 
-	return CELIX_SUCCESS;
+	return status;
 }
 
 celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 9182e1b..bd3bb2f 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -104,7 +104,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
 		if (mc_ip == NULL) {
 		    const char *mc_prefix = NULL;
             const char *interface = NULL;
-            int b0, b1, b2, b3;
+            int b0 = 224, b1 = 100, b2 = 1, b3 = 1;
             bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
             if(mc_prefix == NULL) {
                 mc_prefix = DEFAULT_MC_PREFIX;
@@ -127,22 +127,28 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
 	        int sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
 	        if(sendSocket == -1) {
 	            perror("pubsubAdmin_create:socket");
-	            return CELIX_SERVICE_EXCEPTION;
-	        }
-	        char loop = 1;
-	        if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
-	            perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
-	            return CELIX_SERVICE_EXCEPTION;
+	            status = CELIX_SERVICE_EXCEPTION;
 	        }
 
-	        struct in_addr multicast_interface;
-	        inet_aton(if_ip, &multicast_interface);
-	        if(setsockopt(sendSocket,  IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
-	            perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
-	            return CELIX_SERVICE_EXCEPTION;
-	        }
+	        if (status == CELIX_SUCCESS){
+				char loop = 1;
+				if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
+					perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
+					status = CELIX_SERVICE_EXCEPTION;
+				}
+
+				if (status == CELIX_SUCCESS){
+					struct in_addr multicast_interface;
+					inet_aton(if_ip, &multicast_interface);
+					if(setsockopt(sendSocket,  IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
+						perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
+						status = CELIX_SERVICE_EXCEPTION;
+					}
 
-	        (*admin)->sendSocket = sendSocket;
+					(*admin)->sendSocket = sendSocket;
+				}
+
+	        }
 
 		}
 #endif
@@ -162,6 +168,10 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
 			(*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
 		}
 
+		if (status != CELIX_SUCCESS){
+			pubsubAdmin_destroy(*admin);
+		}
+
 	}
 
 	return status;

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 0907f16..0caf084 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -226,65 +226,79 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
     if (!hashMap_containsKey(ts->socketMap, pubURL)){
 
 		celixThreadMutex_lock(&ts->ts_lock);
+
 		int *recvSocket = calloc(sizeof(int), 1);
 		*recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
 		if (*recvSocket < 0) {
 			perror("pubsub_topicSubscriptionCreate:socket");
-			return CELIX_SERVICE_EXCEPTION;
+			status = CELIX_SERVICE_EXCEPTION;
 		}
 
-		int reuse = 1;
-		if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
-			perror("setsockopt() SO_REUSEADDR");
-			return CELIX_SERVICE_EXCEPTION;
+		if (status == CELIX_SUCCESS){
+			int reuse = 1;
+			if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
+				perror("setsockopt() SO_REUSEADDR");
+				status = CELIX_SERVICE_EXCEPTION;
+			}
 		}
 
-		// TODO Check if there is a better way to parse the URL to IP/Portnr
-		//replace ':' by spaces
-		char *url = strdup(pubURL);
-		char *pt = url;
-		while((pt=strchr(pt, ':')) != NULL) {
-			*pt = ' ';
-		}
-		char mcIp[100];
-		unsigned short mcPort;
-		sscanf(url, "udp //%s %hu", mcIp, &mcPort);
-		free (url);
-
-		printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
-
-		struct ip_mreq mc_addr;
-		mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
-		mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
-		printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
-		if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
-			perror("setsockopt() IP_ADD_MEMBERSHIP");
-			return CELIX_SERVICE_EXCEPTION;
-		}
+		if (status == CELIX_SUCCESS){
+			// TODO Check if there is a better way to parse the URL to IP/Portnr
+			//replace ':' by spaces
+			char *url = strdup(pubURL);
+			char *pt = url;
+			while((pt=strchr(pt, ':')) != NULL) {
+				*pt = ' ';
+			}
+			char mcIp[100];
+			unsigned short mcPort;
+			sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+			free (url);
+
+			printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
+
+			struct ip_mreq mc_addr;
+			mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+			mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
+			printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
+			if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
+				perror("setsockopt() IP_ADD_MEMBERSHIP");
+				status = CELIX_SERVICE_EXCEPTION;
+			}
 
-		struct sockaddr_in mcListenAddr;
-		mcListenAddr.sin_family = AF_INET;
-		mcListenAddr.sin_addr.s_addr = INADDR_ANY;
-		mcListenAddr.sin_port = htons(mcPort);
-		if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
-			perror("bind()");
-			return CELIX_SERVICE_EXCEPTION;
-		}
+			if (status == CELIX_SUCCESS){
+				struct sockaddr_in mcListenAddr;
+				mcListenAddr.sin_family = AF_INET;
+				mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+				mcListenAddr.sin_port = htons(mcPort);
+				if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
+					perror("bind()");
+					status = CELIX_SERVICE_EXCEPTION;
+				}
+			}
+
+			if (status == CELIX_SUCCESS){
+				#if defined(__APPLE__) && defined(__MACH__)
+					//TODO: Use kqueue for OSX
+				#else
+					struct epoll_event ev;
+					memset(&ev, 0, sizeof(ev));
+					ev.events = EPOLLIN;
+					ev.data.fd = *recvSocket;
+					if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
+						perror("epoll_ctl() EPOLL_CTL_ADD");
+						status = CELIX_SERVICE_EXCEPTION;
+					}
+				#endif
+			}
 
-#if defined(__APPLE__) && defined(__MACH__)
-    //TODO: Use kqueue for OSX
-#else
-		struct epoll_event ev;
-		memset(&ev, 0, sizeof(ev));
-		ev.events = EPOLLIN;
-		ev.data.fd = *recvSocket;
-		if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
-			perror("epoll_ctl() EPOLL_CTL_ADD");
-			return CELIX_SERVICE_EXCEPTION;
 		}
-#endif
 
-		hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
+		if (status == CELIX_SUCCESS){
+			hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
+		}else{
+			free(recvSocket);
+		}
 
 		celixThreadMutex_unlock(&ts->ts_lock);
 
@@ -444,9 +458,9 @@ static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
 			bool validVersion = checkVersion(msgVersion,&msg->header);
 
 			if(validVersion){
-				int rc = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst);
+				celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst);
 
-				if (rc != -1) {
+				if (status == CELIX_SUCCESS) {
 					bool release = true;
 					pubsub_multipart_callbacks_t mp_callbacks;
 					mp_callbacks.handle = sub;

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 8e14800..e670899 100644
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -505,7 +505,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
     /* And check also for ANY subscription */
     topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
     if (any_sub != NULL && pubEP->endpoint != NULL) {
-        pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint);
+        pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint);
     }
     free(scope_topic);
 
@@ -559,22 +559,24 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 					arrayList_remove(ext_pub_list,i);
 				}
 			}
-			// Check if there are more publsihers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
-                        for(i=0; i<arrayList_size(ext_pub_list);i++) {
-                            pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                            if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
-                                count++;
-                            }
-                        }
+			// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
+			int ext_pub_list_size = arrayList_size(ext_pub_list);
+			for(i=0; i<ext_pub_list_size; i++) {
+				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+				if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
+					count++;
+				}
+			}
+
+			if(ext_pub_list_size == 0){
+				hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
+				char* topic = (char*)hashMapEntry_getKey(entry);
+				array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
+				hashMap_remove(admin->externalPublications,topic);
+				arrayList_destroy(list);
+				free(topic);
+			}
 
-		}
-		if(arrayList_size(ext_pub_list)==0){
-			hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
-			char* topic = (char*)hashMapEntry_getKey(entry);
-			array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
-			hashMap_remove(admin->externalPublications,topic);
-			arrayList_destroy(list);
-			free(topic);
 		}
 
 		celixThreadMutex_unlock(&admin->externalPublicationsLock);
@@ -591,7 +593,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 	/* And check also for ANY subscription */
 	topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
 	if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
+		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
 	}
 	free(scope_topic);
 	celixThreadMutex_unlock(&admin->subscriptionsLock);

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index b76b1ce..1a036db 100644
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -189,7 +189,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
 
 		snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
         snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address
-		rv = zsock_bind (socket, bindAddress);
+		rv = zsock_bind (socket, "%s", bindAddress);
         if (rv == -1) {
             perror("Error for zmq_bind");
         }

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index f58f516..cb9aff5 100644
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -336,7 +336,7 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){
 	celix_status_t status = CELIX_SUCCESS;
 	celixThreadMutex_lock(&ts->socket_lock);
-	if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,pubURL) != 0){
+	if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket, "%s", pubURL) != 0){
 		status = CELIX_SERVICE_EXCEPTION;
 	}
 	celixThreadMutex_unlock(&ts->socket_lock);
@@ -366,7 +366,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->socket_lock);
-	if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,pubURL) != 0){
+	if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket, "%s", pubURL) != 0){
 		status = CELIX_SERVICE_EXCEPTION;
 	}
 	celixThreadMutex_unlock(&ts->socket_lock);
@@ -497,9 +497,9 @@ static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
 
 			if(validVersion){
 
-				int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
+				celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
 
-				if (rc != -1) {
+				if (status == CELIX_SUCCESS) {
 					bool release = true;
 
 					mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list);
@@ -735,9 +735,9 @@ static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_ms
 			bool validVersion = checkVersion(msgVersion,header);
 
 			if(validVersion){
-				int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
+				celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
 
-				if(rc != -1){
+				if(status == CELIX_SUCCESS){
 					unsigned int* msgId = calloc(1,sizeof(unsigned int));
 					*msgId = header->type;
 					msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry));

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
index c1f9a4b..565bac4 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -29,11 +29,12 @@
 
 #include "bundle.h"
 #include "hash_map.h"
+#include "celix_errno.h"
 
 typedef struct _pubsub_message_type pubsub_message_type;
 
-int pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
-int pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output);
+celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
+celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output);
 
 unsigned int pubsubSerializer_hashCode(const char *string);
 version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType);

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c b/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
index 8309c11..11e4507 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
@@ -54,13 +54,14 @@ void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
 	char *metaInfPath = NULL;
 
 	root = getMsgDescriptionDir(bundle);
-	asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
 
-	addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
-	addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
+	if(root != NULL){
+		asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
 
-	free(metaInfPath);
-	if(root!=NULL){
+		addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
+		addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
+
+		free(metaInfPath);
 		free(root);
 	}
 }
@@ -127,25 +128,30 @@ static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash
 			snprintf(path, 128, "%s/%s", root, entry->d_name);
 			FILE *stream = fopen(path,"r");
 
-			dyn_message_type* msgType = NULL;
+			if (stream != NULL){
+				dyn_message_type* msgType = NULL;
 
-			int rc = dynMessage_parse(stream, &msgType);
-			if (rc == 0 && msgType!=NULL) {
+				int rc = dynMessage_parse(stream, &msgType);
+				if (rc == 0 && msgType!=NULL) {
 
-				char* msgName = NULL;
-				dynMessage_getName(msgType,&msgName);
+					char* msgName = NULL;
+					dynMessage_getName(msgType,&msgName);
 
-				if(msgName!=NULL){
-					unsigned int* msgId = malloc(sizeof(unsigned int));
-					*msgId = utils_stringHash(msgName);
-					hashMap_put(msgTypesMap,msgId,msgType);
-				}
+					if(msgName!=NULL){
+						unsigned int* msgId = malloc(sizeof(unsigned int));
+						*msgId = utils_stringHash(msgName);
+						hashMap_put(msgTypesMap,msgId,msgType);
+					}
 
+				}
+				else{
+					printf("DMU: cannot parse message from descriptor %s\n.",path);
+				}
+				fclose(stream);
+			}else{
+				printf("DMU: cannot open descriptor file %s\n.",path);
 			}
-			else{
-				printf("DMU: cannot parse message from descriptor %s\n.",path);
-			}
-			fclose(stream);
+
 		}
 		entry = readdir(dir);
 	}

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
index bbb17c3..99ec87a 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
@@ -289,15 +289,13 @@ int etcd_set_with_check(const char* key, const char* value, int ttl, bool always
     int result = 0;
     if (etcd_get(key, &etcd_value, NULL) == 0) {
         if (strcmp(etcd_value, value) != 0) {
-            printf("[ETCDLIB} WARNING: value already exists and is different\n");
+            printf("[ETCDLIB] WARNING: value already exists and is different\n");
             printf("   key       = %s\n", key);
             printf("   old value = %s\n", etcd_value);
             printf("   new value = %s\n", value);
             result = -1;
         }
-        if (etcd_value) {
-            free(etcd_value);
-        }
+		free(etcd_value);
     }
     if(always_write || !result) {
         result = etcd_set(key, value, ttl, false);

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c b/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
index 7a63363..b18ef36 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
@@ -149,9 +149,6 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
         return status;
 }
 
-
-
-
 celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
 {
     celix_status_t status = CELIX_SUCCESS;
@@ -169,7 +166,6 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m
 		int i = 0;
 
 		for (; i < arrayList_size(loghelper->logServices); i++) {
-
 			log_service_pt logService = arrayList_get(loghelper->logServices, i);
 
 			if (logService != NULL) {
@@ -179,31 +175,29 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m
 		}
 
 		pthread_mutex_unlock(&loghelper->logListLock);
-	}
 
+		if (!logged && loghelper->stdOutFallback) {
+			char *levelStr = NULL;
+
+			switch (level) {
+				case OSGI_LOGSERVICE_ERROR:
+					levelStr = "ERROR";
+					break;
+				case OSGI_LOGSERVICE_WARNING:
+					levelStr = "WARNING";
+					break;
+				case OSGI_LOGSERVICE_INFO:
+					levelStr = "INFO";
+					break;
+				case OSGI_LOGSERVICE_DEBUG:
+				default:
+					levelStr = "DEBUG";
+					break;
+			}
 
-    if (!logged && loghelper->stdOutFallback) {
-        char *levelStr = NULL;
-
-        switch (level) {
-            case OSGI_LOGSERVICE_ERROR:
-                levelStr = "ERROR";
-                break;
-            case OSGI_LOGSERVICE_WARNING:
-                levelStr = "WARNING";
-                break;
-            case OSGI_LOGSERVICE_INFO:
-                levelStr = "INFO";
-                break;
-            case OSGI_LOGSERVICE_DEBUG:
-            default:
-                levelStr = "DEBUG";
-                break;
-        }
-
-        printf("%s: %s\n", levelStr, msg);
-    }
-
+			printf("%s: %s\n", levelStr, msg);
+		}
+	}
 
 	return status;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
index 4af52ac..8586203 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
@@ -35,36 +35,43 @@
 #include "constants.h"
 #include "subscriber.h"
 
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* psEp) {
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* out) {
     celix_status_t status = CELIX_SUCCESS;
-    *psEp = calloc(1, sizeof(**psEp));
+
+    pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
 
     if (fwUUID != NULL) {
-        (*psEp)->frameworkUUID = strdup(fwUUID);
+        psEp->frameworkUUID = strdup(fwUUID);
     }
 
     if (scope != NULL) {
-        (*psEp)->scope = strdup(scope);
+        psEp->scope = strdup(scope);
     }
 
     if (topic != NULL) {
-        (*psEp)->topic = strdup(topic);
+        psEp->topic = strdup(topic);
     }
 
-    (*psEp)->serviceID = serviceId;
+    psEp->serviceID = serviceId;
 
     if (endpoint != NULL) {
-        (*psEp)->endpoint = strdup(endpoint);
+        psEp->endpoint = strdup(endpoint);
     }
 
+    if (status != CELIX_SUCCESS) {
+		pubsubEndpoint_destroy(psEp);
+	} else {
+		*out = psEp;
+	}
+
     return status;
 
 }
 
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp){
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
-	*psEp = calloc(1,sizeof(**psEp));
+	pubsub_endpoint_pt psEp = calloc(1,sizeof(*psEp));
 
 	bundle_pt bundle = NULL;
 	bundle_context_pt ctxt = NULL;
@@ -82,58 +89,49 @@ celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt re
 	const char* serviceId = NULL;
 	serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
 
-
 	if(fwUUID!=NULL){
-		(*psEp)->frameworkUUID=strdup(fwUUID);
+		psEp->frameworkUUID = strdup(fwUUID);
 	}
 
 	if(scope!=NULL){
-		(*psEp)->scope=strdup(scope);
+		psEp->scope = strdup(scope);
 	} else {
-	    (*psEp)->scope=strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
+	    psEp->scope = strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
 	}
 
 	if(topic!=NULL){
-		(*psEp)->topic=strdup(topic);
+		psEp->topic = strdup(topic);
 	}
 
 	if(serviceId!=NULL){
-		(*psEp)->serviceID = strtol(serviceId,NULL,10);
+		psEp->serviceID = strtol(serviceId,NULL,10);
 	}
 
-	if (!(*psEp)->frameworkUUID || !(*psEp)->serviceID || !(*psEp)->scope || !(*psEp)->topic) {
+	if (!psEp->frameworkUUID || !psEp->serviceID || !psEp->scope || !psEp->topic) {
 		fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!.");
 		status = CELIX_BUNDLE_EXCEPTION;
 	}
 
+	if (status != CELIX_SUCCESS) {
+		pubsubEndpoint_destroy(psEp);
+	} else {
+		*out = psEp;
+	}
+
 	return status;
 
 }
 
 celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
-    if(psEp->frameworkUUID!=NULL){
+	if (psEp != NULL) {
 		free(psEp->frameworkUUID);
-		psEp->frameworkUUID = NULL;
-	}
-
-	if(psEp->scope!=NULL){
 		free(psEp->scope);
-		psEp->scope = NULL;
-	}
-
-	if(psEp->topic!=NULL){
 		free(psEp->topic);
-		psEp->topic = NULL;
-	}
-
-	if(psEp->endpoint!=NULL){
 		free(psEp->endpoint);
-		psEp->endpoint = NULL;
 	}
-
 	free(psEp);
-	return CELIX_SUCCESS;
 
+	return CELIX_SUCCESS;
 }
 
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
@@ -145,7 +143,6 @@ bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
 			((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
 	);
 
-
 }
 
 char *createScopeTopicKey(const char* scope, const char* topic) {

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
index 85ef868..bb6096a 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
@@ -42,35 +42,39 @@ struct _pubsub_message_type {	/* _dyn_message_type */
 	version_pt msgVersion;
 };
 
-int pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
-
-	int rc = 0;
+celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
+	celix_status_t status = CELIX_SUCCESS;
 
 	dyn_type *type = NULL;
 	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
 
 	char *jsonOutput = NULL;
-	rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
+	int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
+	if (rc != 0){
+		status = CELIX_BUNDLE_EXCEPTION;
+	}
 
 	*output = (void *) jsonOutput;
 	*outputLen = strlen(jsonOutput) + 1;
 
-	return rc;
+	return status;
 }
 
-int pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output){
-
-	int rc = 0;
+celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output){
+	celix_status_t status = CELIX_SUCCESS;
 
 	dyn_type *type = NULL;
 	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
 
 	void *textOutput = NULL;
-	rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+	int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+	if (rc != 0){
+		status = CELIX_BUNDLE_EXCEPTION;
+	}
 
 	*output = textOutput;
 
-	return rc;
+	return status;
 }
 
 unsigned int pubsubSerializer_hashCode(const char *string){

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
index a394045..0d8468e 100644
--- a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
@@ -243,19 +243,17 @@ celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_c
 	(*watcher)->scope = strdup(scope);
 	(*watcher)->topic = strdup(topic);
 
-
     celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
 
     celixThreadMutex_lock(&(*watcher)->watcherLock);
 
-    if ((status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher)) != CELIX_SUCCESS) {
-        return status;
+    status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher);
+    if (status == CELIX_SUCCESS) {
+    	(*watcher)->running = true;
     }
-    (*watcher)->running = true;
 
     celixThreadMutex_unlock(&(*watcher)->watcherLock);
 
-
 	return status;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
index 1b6aca9..0c7d6c4 100644
--- a/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
@@ -131,7 +131,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
         etcdWatcher_stop(wi->watcher);
     }
     hashMapIterator_destroy(iter);
-    celixThreadMutex_unlock(&ps_discovery->watchersMutex);
 
     celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
 
@@ -168,6 +167,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
     hashMapIterator_destroy(iter);
     hashMap_destroy(ps_discovery->watchers, true, true);
     celixThreadMutex_unlock(&ps_discovery->watchersMutex);
+
     return status;
 }
 
@@ -316,7 +316,7 @@ celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt
 	free(pub_key);
 	if(pubEP_list==NULL){
 		printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",pubEP->topic);
-		return CELIX_ILLEGAL_STATE;
+		status = CELIX_ILLEGAL_STATE;
 	}
 	else{
 

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
index ae7b4a9..a35d161 100644
--- a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
+++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
@@ -149,8 +149,23 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
 				if (status == CELIX_SUCCESS) {
 					*userData = activator;
 				}
+				if (status != CELIX_SUCCESS){
+					serviceTracker_destroy(activator->pubsubAdminTracker);
+				}
+			}
+			if (status != CELIX_SUCCESS){
+				serviceTracker_destroy(activator->pubsubDiscoveryTracker);
 			}
 		}
+		if (status != CELIX_SUCCESS){
+			pubsub_topologyManager_destroy(activator->manager);
+		}
+	}
+	if (status != CELIX_SUCCESS){ // an exception occurred so free allocated memory
+		logHelper_stop(activator->loghelper);
+		logHelper_destroy(&activator->loghelper);
+		free(activator);
+
 	}
 
 	return status;

http://git-wip-us.apache.org/repos/asf/celix/blob/8c84284f/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 5ba1315..a6541b9 100644
--- a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -372,7 +372,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
 		celixThreadMutex_unlock(&manager->psaListLock);
 	}
 	else{
-		status=CELIX_INVALID_BUNDLE_CONTEXT;
+		status = CELIX_INVALID_BUNDLE_CONTEXT;
 	}
 
 	return status;
@@ -457,7 +457,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
 
 	}
 	else{
-		status=CELIX_INVALID_BUNDLE_CONTEXT;
+		status = CELIX_INVALID_BUNDLE_CONTEXT;
 	}
 
 	return status;
@@ -631,14 +631,14 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
 				celixThreadMutex_unlock(&manager->psaListLock);
 
 			}
-			free(topic);
 
 		}
 		else{
 			status=CELIX_INVALID_BUNDLE_CONTEXT;
 		}
-        free(scope);
 
+		free(topic);
+        free(scope);
 	}
 
 	return status;
@@ -731,8 +731,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
 				celixThreadMutex_unlock(&manager->psaListLock);
 
 				pubsubEndpoint_destroy(pubcmp);
-				free(pub_scope);
-				free(pub_topic);
+
 				free(pub_key);
 
 			}
@@ -741,6 +740,9 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
 		else{
 			status=CELIX_INVALID_BUNDLE_CONTEXT;
 		}
+
+		free(pub_scope);
+		free(pub_topic);
 	}
 
 	return status;