You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 18:34:15 UTC
[03/19] celix git commit: CELIX-398: Fixes bug for subscribers using
the PubsubAdminUDP
CELIX-398: Fixes bug for subscribers using the PubsubAdminUDP
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/51f0eb0d
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/51f0eb0d
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/51f0eb0d
Branch: refs/heads/develop
Commit: 51f0eb0d922edd47d75b3830cfaac4539f011c84
Parents: 091a112
Author: Roy Lenferink <le...@gmail.com>
Authored: Mon Feb 6 17:11:06 2017 +0100
Committer: Roy Lenferink <le...@gmail.com>
Committed: Mon Feb 6 17:11:06 2017 +0100
----------------------------------------------------------------------
.../private/src/topic_subscription.c | 134 ++++++++++---------
1 file changed, 72 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/51f0eb0d/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index e1cccca..210afd4 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -68,6 +68,7 @@ struct topic_subscription{
hash_map_pt socketMap; // key = URL, value = listen-socket
unsigned int nrSubscribers;
largeUdp_pt largeUdpHandle;
+
};
typedef struct mp_handle{
@@ -207,64 +208,69 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
celix_status_t status = CELIX_SUCCESS;
- celixThreadMutex_lock(&ts->ts_lock);
- int *recvSocket = calloc(sizeof(int), 1);
- *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
- if (*recvSocket < 0) {
- perror("pubsub_topicSubscriptionCreate:socket");
- return CELIX_SERVICE_EXCEPTION;
- }
- int reuse = 1;
- if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
- perror("setsockopt() SO_REUSEADDR");
- return CELIX_SERVICE_EXCEPTION;
- }
+ if (!hashMap_containsKey(ts->socketMap, pubURL)){
- // TODO Check if there is a better way to parse the URL to IP/Portnr
- //replace ':' by spaces
- char *url = strdup(pubURL);
- char *pt = url;
- while((pt=strchr(pt, ':')) != NULL) {
- *pt = ' ';
- }
- char mcIp[100];
- unsigned short mcPort;
- sscanf(url, "udp //%s %hu", mcIp, &mcPort);
- free (url);
-
- printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
-
- struct ip_mreq mc_addr;
- mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
- mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
- printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
- if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
- perror("setsockopt() IP_ADD_MEMBERSHIP");
- return CELIX_SERVICE_EXCEPTION;
- }
+ celixThreadMutex_lock(&ts->ts_lock);
+ int *recvSocket = calloc(sizeof(int), 1);
+ *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+ if (*recvSocket < 0) {
+ perror("pubsub_topicSubscriptionCreate:socket");
+ return CELIX_SERVICE_EXCEPTION;
+ }
- struct sockaddr_in mcListenAddr;
- mcListenAddr.sin_family = AF_INET;
- mcListenAddr.sin_addr.s_addr = INADDR_ANY;
- mcListenAddr.sin_port = htons(mcPort);
- if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
- perror("bind()");
- return CELIX_SERVICE_EXCEPTION;
- }
+ int reuse = 1;
+ if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
+ perror("setsockopt() SO_REUSEADDR");
+ return CELIX_SERVICE_EXCEPTION;
+ }
+ // TODO Check if there is a better way to parse the URL to IP/Portnr
+ //replace ':' by spaces
+ char *url = strdup(pubURL);
+ char *pt = url;
+ while((pt=strchr(pt, ':')) != NULL) {
+ *pt = ' ';
+ }
+ char mcIp[100];
+ unsigned short mcPort;
+ sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+ free (url);
+
+ printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
+
+ struct ip_mreq mc_addr;
+ mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+ mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
+ printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
+ if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
+ perror("setsockopt() IP_ADD_MEMBERSHIP");
+ return CELIX_SERVICE_EXCEPTION;
+ }
- struct epoll_event ev;
- memset(&ev, 0, sizeof(ev));
- ev.events = EPOLLIN;
- ev.data.fd = *recvSocket;
- if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
- perror("epoll_ctl() EPOLL_CTL_ADD");
- return CELIX_SERVICE_EXCEPTION;
- }
- hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
+ struct sockaddr_in mcListenAddr;
+ mcListenAddr.sin_family = AF_INET;
+ mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+ mcListenAddr.sin_port = htons(mcPort);
+ if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
+ perror("bind()");
+ return CELIX_SERVICE_EXCEPTION;
+ }
+
+
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ ev.events = EPOLLIN;
+ ev.data.fd = *recvSocket;
+ if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
+ perror("epoll_ctl() EPOLL_CTL_ADD");
+ return CELIX_SERVICE_EXCEPTION;
+ }
+ hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
+
+ celixThreadMutex_unlock(&ts->ts_lock);
- celixThreadMutex_unlock(&ts->ts_lock);
+ }
return status;
}
@@ -272,19 +278,23 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){
printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL);
celix_status_t status = CELIX_SUCCESS;
- struct epoll_event ev;
- memset(&ev, 0, sizeof(ev));
- celixThreadMutex_lock(&ts->ts_lock);
+ if (hashMap_containsKey(ts->socketMap, pubURL)){
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
- int *s = hashMap_remove(ts->socketMap, pubURL);
- if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
- perror("epoll_ctl() EPOLL_CTL_DEL");
- status = CELIX_SERVICE_EXCEPTION;
- }
- free(s);
+ celixThreadMutex_lock(&ts->ts_lock);
- celixThreadMutex_unlock(&ts->ts_lock);
+ int *s = hashMap_remove(ts->socketMap, pubURL);
+ if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+ printf("in if error()\n");
+ perror("epoll_ctl() EPOLL_CTL_DEL");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
+ free(s);
+
+ celixThreadMutex_unlock(&ts->ts_lock);
+ }
return status;
}