You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 18:34:15 UTC

[03/19] celix git commit: CELIX-398: Fixes bug for subscribers using the PubsubAdminUDP

CELIX-398: Fixes bug for subscribers using the PubsubAdminUDP


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/51f0eb0d
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/51f0eb0d
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/51f0eb0d

Branch: refs/heads/develop
Commit: 51f0eb0d922edd47d75b3830cfaac4539f011c84
Parents: 091a112
Author: Roy Lenferink <le...@gmail.com>
Authored: Mon Feb 6 17:11:06 2017 +0100
Committer: Roy Lenferink <le...@gmail.com>
Committed: Mon Feb 6 17:11:06 2017 +0100

----------------------------------------------------------------------
 .../private/src/topic_subscription.c            | 134 ++++++++++---------
 1 file changed, 72 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/51f0eb0d/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index e1cccca..210afd4 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -68,6 +68,7 @@ struct topic_subscription{
 	hash_map_pt socketMap; // key = URL, value = listen-socket
 	unsigned int nrSubscribers;
 	largeUdp_pt largeUdpHandle;
+
 };
 
 typedef struct mp_handle{
@@ -207,64 +208,69 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
     printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
 
     celix_status_t status = CELIX_SUCCESS;
-    celixThreadMutex_lock(&ts->ts_lock);
-    int *recvSocket = calloc(sizeof(int), 1);
-    *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
-    if (*recvSocket < 0) {
-        perror("pubsub_topicSubscriptionCreate:socket");
-        return CELIX_SERVICE_EXCEPTION;
-    }
 
-    int reuse = 1;
-    if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
-        perror("setsockopt() SO_REUSEADDR");
-        return CELIX_SERVICE_EXCEPTION;
-    }
+    if (!hashMap_containsKey(ts->socketMap, pubURL)){
 
-    // TODO Check if there is a better way to parse the URL to IP/Portnr
-    //replace ':' by spaces
-    char *url = strdup(pubURL);
-    char *pt = url;
-    while((pt=strchr(pt, ':')) != NULL) {
-        *pt = ' ';
-    }
-    char mcIp[100];
-    unsigned short mcPort;
-    sscanf(url, "udp //%s %hu", mcIp, &mcPort);
-    free (url);
-
-    printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
-
-    struct ip_mreq mc_addr;
-    mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
-    mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
-    printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
-    if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
-        perror("setsockopt() IP_ADD_MEMBERSHIP");
-        return CELIX_SERVICE_EXCEPTION;
-    }
+		celixThreadMutex_lock(&ts->ts_lock);
+		int *recvSocket = calloc(sizeof(int), 1);
+		*recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+		if (*recvSocket < 0) {
+			perror("pubsub_topicSubscriptionCreate:socket");
+			return CELIX_SERVICE_EXCEPTION;
+		}
 
-    struct sockaddr_in mcListenAddr;
-    mcListenAddr.sin_family = AF_INET;
-    mcListenAddr.sin_addr.s_addr = INADDR_ANY;
-    mcListenAddr.sin_port = htons(mcPort);
-    if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
-        perror("bind()");
-        return CELIX_SERVICE_EXCEPTION;
-    }
+		int reuse = 1;
+		if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
+			perror("setsockopt() SO_REUSEADDR");
+			return CELIX_SERVICE_EXCEPTION;
+		}
 
+		// TODO Check if there is a better way to parse the URL to IP/Portnr
+		//replace ':' by spaces
+		char *url = strdup(pubURL);
+		char *pt = url;
+		while((pt=strchr(pt, ':')) != NULL) {
+			*pt = ' ';
+		}
+		char mcIp[100];
+		unsigned short mcPort;
+		sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+		free (url);
+
+		printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
+
+		struct ip_mreq mc_addr;
+		mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+		mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
+		printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
+		if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
+			perror("setsockopt() IP_ADD_MEMBERSHIP");
+			return CELIX_SERVICE_EXCEPTION;
+		}
 
-    struct epoll_event ev;
-    memset(&ev, 0, sizeof(ev));
-    ev.events = EPOLLIN;
-    ev.data.fd = *recvSocket;
-    if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
-        perror("epoll_ctl() EPOLL_CTL_ADD");
-        return CELIX_SERVICE_EXCEPTION;
-    }
-    hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
+		struct sockaddr_in mcListenAddr;
+		mcListenAddr.sin_family = AF_INET;
+		mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+		mcListenAddr.sin_port = htons(mcPort);
+		if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
+			perror("bind()");
+			return CELIX_SERVICE_EXCEPTION;
+		}
+
+
+		struct epoll_event ev;
+		memset(&ev, 0, sizeof(ev));
+		ev.events = EPOLLIN;
+		ev.data.fd = *recvSocket;
+		if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
+			perror("epoll_ctl() EPOLL_CTL_ADD");
+			return CELIX_SERVICE_EXCEPTION;
+		}
+		hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
+
+		celixThreadMutex_unlock(&ts->ts_lock);
 
-    celixThreadMutex_unlock(&ts->ts_lock);
+    }
 
     return status;
 }
@@ -272,19 +278,23 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
 celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){
     printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL);
     celix_status_t status = CELIX_SUCCESS;
-    struct epoll_event ev;
-    memset(&ev, 0, sizeof(ev));
 
-    celixThreadMutex_lock(&ts->ts_lock);
+    if (hashMap_containsKey(ts->socketMap, pubURL)){
+		struct epoll_event ev;
+		memset(&ev, 0, sizeof(ev));
 
-    int *s = hashMap_remove(ts->socketMap, pubURL);
-    if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
-        perror("epoll_ctl() EPOLL_CTL_DEL");
-        status = CELIX_SERVICE_EXCEPTION;
-    }
-    free(s);
+		celixThreadMutex_lock(&ts->ts_lock);
 
-    celixThreadMutex_unlock(&ts->ts_lock);
+		int *s = hashMap_remove(ts->socketMap, pubURL);
+		if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+			printf("in if error()\n");
+			perror("epoll_ctl() EPOLL_CTL_DEL");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+		free(s);
+
+		celixThreadMutex_unlock(&ts->ts_lock);
+    }
 
 	return status;
 }