You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/11/12 11:15:31 UTC
celix git commit: CELIX-454: Adds pubsub_updmc_tests
Repository: celix
Updated Branches:
refs/heads/feature/CELIX-454-pubsub-disc ac47373d2 -> 1df73a01f
CELIX-454: Adds pubsub_updmc_tests
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/1df73a01
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/1df73a01
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/1df73a01
Branch: refs/heads/feature/CELIX-454-pubsub-disc
Commit: 1df73a01f8d5ad5ac06e5cd991d0147782fb1e97
Parents: ac47373
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Mon Nov 12 12:15:07 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Mon Nov 12 12:15:07 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_psa_udpmc_constants.h | 29 ++-
.../src/pubsub_udpmc_admin.c | 50 ++--
.../src/pubsub_udpmc_admin.h | 12 +-
.../src/pubsub_udpmc_topic_receiver.c | 231 +++++++++++++++----
.../src/pubsub_udpmc_topic_receiver.h | 8 +-
.../src/pubsub_udpmc_topic_sender.c | 24 +-
.../src/pubsub_udpmc_topic_sender.h | 5 +-
.../src/pubsub_psa_zmq_constants.h | 42 +++-
.../pubsub_admin_zmq/src/pubsub_zmq_admin.c | 10 +-
.../pubsub_admin_zmq/src/pubsub_zmq_admin.h | 35 +--
.../src/pubsub_discovery_impl.c | 12 +-
bundles/pubsub/test/CMakeLists.txt | 20 +-
bundles/pubsub/test/meta_data/ping.properties | 2 +
13 files changed, 355 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
index 2a02da8..95779b7 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
@@ -31,9 +31,34 @@
#define PSA_UDPMC_QOS_CONTROL_SCORE_KEY "PSA_UDPMC_QOS_CONTROL_SCORE"
#define PSA_UDPMC_DEFAULT_SCORE_KEY "PSA_UDPMC_DEFAULT_SCORE"
-#define PSA_UDPMC_DEFAULT_VERBOSE false
-#define PSA_UDPMC_VERBOSE_KEY "PSA_UDPMC_VERBOSE"
+#define PUBSUB_UDPMC_ADMIN_TYPE "udp_mc"
+#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "udpmc.socket_address"
+#define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY "udpmc.socker_port"
+
+#define PUBSUB_UDPMC_IP_KEY "PSA_IP"
+#define PUBSUB_UDPMC_ITF_KEY "PSA_INTERFACE"
+#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY "PSA_MC_PREFIX"
+#define PUBSUB_UDPMC_VERBOSE_KEY "PSA_UDPMC_VERBOSE"
+
+#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT "224.100"
+#define PUBSUB_UDPMC_VERBOSE_DEFAULT true
+
+/**
+ * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_UDPMC_STATIC_CONFIGURED "updmc.static.configured"
+
+/**
+ * Can be set in the topic properties to fix a static mc port for topic senders
+ */
+#define PUBSUB_UDPMC_STATIC_BIND_PORT "udpmc.static.bind.port"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_UDPMC_STATIC_CONNECT_SOCKET_ADDRESSES "udpmc.static.connect.socket_addresses"
#endif /* PUBSUB_PSA_UDPMC_CONSTANTS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 73fab20..d7add8b 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -109,9 +109,7 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
}
- const char *mc_prefix = celix_bundleContext_getProperty(ctx,
- PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY,
- PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT);
+ const char *mc_prefix = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY, PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT);
const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_ITF_KEY, NULL);
if (udpmc_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
L_WARN("[PSA_UDPMC] Could not retrieve IP address for interface %s", interface);
@@ -234,7 +232,7 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
hashMap_destroy(psa->topicReceivers.map, true, false);
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
- hashMap_destroy(psa->discoveredEndpoints.map, true, false);
+ hashMap_destroy(psa->discoveredEndpoints.map, false, false);
celixThreadMutex_destroy(&psa->serializers.mutex);
hashMap_destroy(psa->serializers.map, false, false);
@@ -296,13 +294,12 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
if (sender == NULL) {
psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
if (serEntry != NULL) {
- sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, serEntry->svc, psa->sendSocket, psa->mcIpAddress);
+ sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, serEntry->svc, psa->sendSocket, psa->mcIpAddress, topicProps);
}
if (sender != NULL) {
const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
const char *serType = serEntry->serType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
//if available also set container name
@@ -310,6 +307,8 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
if (cn != NULL) {
celix_properties_set(newEndpoint, "container_name", cn);
}
+ const char *configuredPort = celix_properties_get(topicProps, PUBSUB_UDPMC_STATIC_BIND_PORT, NULL);
+ celix_properties_setBool(newEndpoint, PUBSUB_UDPMC_STATIC_CONFIGURED, configuredPort != NULL);
hashMap_put(psa->topicSenders.map, key, sender);
} else {
free(key);
@@ -370,7 +369,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
if (receiver == NULL) {
psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
if (serEntry != NULL) {
- receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, scope, topic, psa->ifIpAddress, serializerSvcId, serEntry->svc);
+ receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, psa->log, scope, topic, psa->ifIpAddress, topicProps, serializerSvcId, serEntry->svc);
}
if (receiver != NULL) {
const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
@@ -443,18 +442,23 @@ static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_a
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- const char *sockAdress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
+ const char *sockAddress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L);
- if (type == NULL || sockAdress == NULL || sockPort < 0) {
- fprintf(stderr, "[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type");
+ bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+
+ if (publisher && (sockAddress == NULL || sockPort < 0)) {
+ L_WARN("[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type. Properties:");
+ const char *key = NULL;
+ CELIX_PROPERTIES_FOR_EACH(endpoint, key) {
+ L_WARN("[PSA UPDMC] |- %s=%s\n", key, celix_properties_get(endpoint, key, NULL));
+ }
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != NULL && eTopic != NULL && type != NULL &&
+ if (eScope != NULL && eTopic != NULL && publisher &&
strncmp(eScope, scope, 1024 * 1024) == 0 &&
- strncmp(eTopic, topic, 1024 * 1024) == 0 &&
- strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- pubsub_udpmcTopicReceiver_connectTo(receiver, sockAdress, sockPort);
+ strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ pubsub_udpmcTopicReceiver_connectTo(receiver, sockAddress, sockPort);
}
}
@@ -550,9 +554,11 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
const char *scope = pubsub_udpmcTopicSender_scope(sender);
const char *topic = pubsub_udpmcTopicSender_topic(sender);
const char *sockAddr = pubsub_udpmcTopicSender_socketAddress(sender);
+ long sockPort = pubsub_udpmcTopicSender_socketPort(sender);
+ const char *postAddr = pubsub_udpmcTopicSender_isStatic(sender) ? " (static port)" : "";
fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
- fprintf(out, " |- socket address = %s\n", sockAddr);
+ fprintf(out, " |- socket address = %s:%li%s\n", sockAddr, sockPort, postAddr);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
celixThreadMutex_unlock(&psa->serializers.mutex);
@@ -569,15 +575,23 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
+ celix_array_list_t *connections = celix_arrayList_create();
+ pubsub_udpmcTopicReceiver_listConnections(receiver, connections);
+
fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- connections (%i):\n", celix_arrayList_size(connections));
+ for (int i = 0 ; i < celix_arrayList_size(connections); ++i) {
+ char *conn = celix_arrayList_get(connections, i);
+ fprintf(out, " |- address = %s\n", conn);
+ free(conn);
+ }
+ celix_arrayList_destroy(connections);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
celixThreadMutex_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
- //TODO topic receivers/senders connection count
-
return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 17b8957..469fa9c 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -22,18 +22,8 @@
#include "celix_api.h"
#include "log_helper.h"
+#include "pubsub_psa_udpmc_constants.h"
-#define PUBSUB_UDPMC_ADMIN_TYPE "udp_mc"
-#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "udpmc.socket_address"
-#define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY "udpmc.socker_port"
-
-#define PUBSUB_UDPMC_IP_KEY "PSA_IP"
-#define PUBSUB_UDPMC_ITF_KEY "PSA_INTERFACE"
-#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY "PSA_MC_PREFIX"
-#define PUBSUB_UDPMC_VERBOSE_KEY "PSA_UDPMC_VERBOSE"
-
-#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT "224.100"
-#define PUBSUB_UDPMC_VERBOSE_DEFAULT true
typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index e1e5a42..5e2ed57 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -26,6 +26,7 @@
#include <assert.h>
#include <pubsub_endpoint.h>
#include <arpa/inet.h>
+#include <log_helper.h>
#include "pubsub_udpmc_topic_receiver.h"
#include "pubsub_psa_udpmc_constants.h"
#include "large_udp.h"
@@ -36,8 +37,18 @@
#define UDP_BUFFER_SIZE 65535
#define MAX_UDP_SESSIONS 16
+#define L_DEBUG(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
struct pubsub_udpmc_topic_receiver {
celix_bundle_context_t *ctx;
+ log_helper_t *logHelper;
long serializerSvcId;
pubsub_serializer_service_t *serializer;
char *scope;
@@ -55,12 +66,14 @@ struct pubsub_udpmc_topic_receiver {
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = socketAddress, value = psa_udpmc_requested_connection_entry_t*
+ bool allConnected; //true if all requestedConnectection are connected
} requestedConnections;
long subscriberTrackerId;
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = bnd id, value = psa_udpmc_subscriber_entry_t
+ bool allInitialized;
} subscribers;
};
@@ -68,14 +81,18 @@ typedef struct psa_udpmc_requested_connection_entry {
char *key;
char *socketAddress;
long socketPort;
- bool connected;
int recvSocket;
+
+ bool connected;
+ bool statically; //true if the connection is statically configured through the topic properties.
} psa_udpmc_requested_connection_entry_t;
typedef struct psa_udpmc_subscriber_entry {
int usageCount;
hash_map_t *msgTypes; //map from serializer svc
pubsub_subscriber_t *svc;
+
+ bool initialized; //true if the init function is called through the receive thread
} psa_udpmc_subscriber_entry_t;
@@ -89,16 +106,20 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg);
static void* psa_udpmc_recvThread(void * data);
-
+static void psa_udpmc_connectToAllRequestedConnections(pubsub_udpmc_topic_receiver_t *receiver);
+static void psa_udpmc_initializeAllSubscribers(pubsub_udpmc_topic_receiver_t *receiver);
pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+ log_helper_t *logHelper,
const char *scope,
const char *topic,
const char *ifIP,
+ const celix_properties_t *topicProperties,
long serializerSvcId,
pubsub_serializer_service_t *serializer) {
pubsub_udpmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
+ receiver->logHelper = logHelper;
receiver->serializerSvcId = serializerSvcId;
receiver->serializer = serializer;
receiver->scope = strndup(scope, 1024 * 1024);
@@ -114,7 +135,9 @@ pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ receiver->subscribers.allInitialized = false;
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ receiver->requestedConnections.allConnected = false;
//track subscribers
{
@@ -132,6 +155,44 @@ pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
+ const char *staticConnects = celix_properties_get(topicProperties, PUBSUB_UDPMC_STATIC_CONNECT_SOCKET_ADDRESSES, NULL);
+ if (staticConnects != NULL) {
+ char *copy = strndup(staticConnects, 1024*1024);
+ char* addr;
+ char* save = copy;
+
+ while ((addr = strtok_r(save, " ", &save))) {
+ char *colon = strchr(addr, ':');
+ if (colon == NULL) {
+ continue;
+ }
+
+ char *sockAddr = NULL;
+ asprintf(&sockAddr, "%.*s", (int)(colon - addr), addr);
+
+ long sockPort = atol((colon + 1));
+
+ char *key = NULL;
+ asprintf(&key, "%s:%li", sockAddr, sockPort);
+
+
+ if (sockPort > 0) {
+ psa_udpmc_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+ entry->key = key;
+ entry->socketAddress = sockAddr;
+ entry->socketPort = sockPort;
+ entry->connected = false;
+ entry->statically = true;
+ hashMap_put(receiver->requestedConnections.map, (void *) entry->key, entry);
+ } else {
+ L_WARN("[PSA_UDPMC_TR] Invalid static socket address %s", addr);
+ free(key);
+ free(sockAddr);
+ }
+ }
+ free(copy);
+ }
+
celixThread_create(&receiver->recvThread.thread, NULL, psa_udpmc_recvThread, receiver);
return receiver;
@@ -205,9 +266,8 @@ void pubsub_udpmcTopicReceiver_connectTo(
long socketPort) {
printf("[PSA UDPMC] TopicReceiver %s/%s connect to socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
- int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort);
- char *key = calloc((size_t)len+1, sizeof(char));
- snprintf(key, (size_t)len+1, "%s:%li", socketAddress, socketPort);
+ char *key = NULL;
+ asprintf(&key, "%s:%li", socketAddress, socketPort);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
psa_udpmc_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key);
@@ -217,46 +277,15 @@ void pubsub_udpmcTopicReceiver_connectTo(
entry->socketAddress = strndup(socketAddress, 1024 * 1024);
entry->socketPort = socketPort;
entry->connected = false;
+ entry->statically = false;
hashMap_put(receiver->requestedConnections.map, (void*)entry->key, entry);
+ receiver->requestedConnections.allConnected = false;
} else {
free(key);
}
- if (!entry->connected) {
- int rc = 0;
- entry->recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
- if (entry->recvSocket >= 0) {
- int reuse = 1;
- rc = setsockopt(entry->recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse));
- }
- if (entry->recvSocket >= 0 && rc >= 0) {
- struct ip_mreq mc_addr;
- mc_addr.imr_multiaddr.s_addr = inet_addr(entry->socketAddress);
- mc_addr.imr_interface.s_addr = inet_addr(receiver->ifIpAddress);
- printf("Adding MC %s at interface %s\n", entry->socketAddress, receiver->ifIpAddress);
- rc = setsockopt(entry->recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mc_addr, sizeof(mc_addr));
- }
- if (entry->recvSocket >= 0 && rc >= 0) {
- struct sockaddr_in mcListenAddr;
- mcListenAddr.sin_family = AF_INET;
- mcListenAddr.sin_addr.s_addr = INADDR_ANY;
- mcListenAddr.sin_port = htons((uint16_t )entry->socketPort);
- rc = bind(entry->recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr));
- }
- if (entry->recvSocket >= 0 && rc >= 0) {
- struct epoll_event ev;
- memset(&ev, 0, sizeof(ev));
- ev.events = EPOLLIN;
- ev.data.fd = entry->recvSocket;
- rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, &ev);
- }
-
- if (entry->recvSocket < 0 || rc < 0) {
- fprintf(stderr, "[PSA UDPMC] Error connecting TopicReceiver %s/%s to %s:%li. (%s)\n", receiver->scope, receiver->topic, socketAddress, socketPort, strerror(errno));
- } else {
- entry->connected = true;
- }
- }
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ psa_udpmc_connectToAllRequestedConnections(receiver);
}
void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_udpmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) {
@@ -304,6 +333,8 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
entry = calloc(1, sizeof(*entry));
entry->usageCount = 1;
entry->svc = svc;
+ entry->initialized = false;
+ receiver->subscribers.allInitialized = false;
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
if (rc == 0) {
@@ -347,7 +378,22 @@ static void* psa_udpmc_recvThread(void * data) {
bool running = receiver->recvThread.running;
celixThreadMutex_unlock(&receiver->recvThread.mutex);
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ bool allConnected = receiver->requestedConnections.allConnected;
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ bool allInitialized = receiver->subscribers.allInitialized;
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
while (running) {
+ if (!allConnected) {
+ psa_udpmc_connectToAllRequestedConnections(receiver);
+ }
+ if (!allInitialized) {
+ psa_udpmc_initializeAllSubscribers(receiver);
+ }
+
int nfds = epoll_wait(receiver->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
int i;
for(i = 0; i < nfds; i++ ) {
@@ -366,9 +412,18 @@ static void* psa_udpmc_recvThread(void * data) {
free(udpMsg);
}
}
+
celixThreadMutex_lock(&receiver->recvThread.mutex);
running = receiver->recvThread.running;
celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ allConnected = receiver->requestedConnections.allConnected;
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ allInitialized = receiver->subscribers.allInitialized;
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
return NULL;
@@ -420,3 +475,99 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
+
+void pubsub_udpmcTopicReceiver_listConnections(pubsub_udpmc_topic_receiver_t *receiver, celix_array_list_t *connections) {
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_udpmc_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ char *url = NULL;
+ const char *post = entry->statically ? " (static)" : "";
+ asprintf(&url, "%s:%li%s", entry->socketAddress, entry->socketPort, post);
+ celix_arrayList_add(connections, url);
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static bool psa_udpmc_connectToEntry(pubsub_udpmc_topic_receiver_t *receiver, psa_udpmc_requested_connection_entry_t *entry) {
+ bool connected = true;
+ int rc = 0;
+ entry->recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+ if (entry->recvSocket >= 0) {
+ int reuse = 1;
+ rc = setsockopt(entry->recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse));
+ }
+ if (entry->recvSocket >= 0 && rc >= 0) {
+ struct ip_mreq mc_addr;
+ mc_addr.imr_multiaddr.s_addr = inet_addr(entry->socketAddress);
+ mc_addr.imr_interface.s_addr = inet_addr(receiver->ifIpAddress);
+ L_INFO("[PSA_UDPMC_TR] Add MC %s at interface %s\n", entry->socketAddress, receiver->ifIpAddress);
+ rc = setsockopt(entry->recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mc_addr, sizeof(mc_addr));
+ }
+ if (entry->recvSocket >= 0 && rc >= 0) {
+ struct sockaddr_in mcListenAddr;
+ mcListenAddr.sin_family = AF_INET;
+ mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+ mcListenAddr.sin_port = htons((uint16_t )entry->socketPort);
+ rc = bind(entry->recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr));
+ }
+ if (entry->recvSocket >= 0 && rc >= 0) {
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ ev.events = EPOLLIN;
+ ev.data.fd = entry->recvSocket;
+ rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, &ev);
+ }
+
+ if (entry->recvSocket < 0 || rc < 0) {
+ L_WARN("[PSA UDPMC] Error connecting TopicReceiver %s/%s to %s:%li. (%s)\n", receiver->scope, receiver->topic, entry->socketAddress, entry->socketPort, strerror(errno));
+ connected = false;
+ }
+ return connected;
+}
+
+
+static void psa_udpmc_connectToAllRequestedConnections(pubsub_udpmc_topic_receiver_t *receiver) {
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ if (!receiver->requestedConnections.allConnected) {
+ bool allConnected = true;
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_udpmc_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->connected){
+ if (psa_udpmc_connectToEntry(receiver, entry)) {
+ entry->connected = true;
+ } else {
+ allConnected = false;
+ }
+ }
+ }
+ receiver->requestedConnections.allConnected = allConnected;
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_udpmc_initializeAllSubscribers(pubsub_udpmc_topic_receiver_t *receiver) {
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ if (!receiver->subscribers.allInitialized) {
+ bool allInitialized = true;
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->initialized) {
+ int rc = 0;
+ if (entry->svc != NULL && entry->svc->init != NULL) {
+ rc = entry->svc->init(entry->svc->handle);
+ }
+ if (rc == 0) {
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
+ }
+ }
+ receiver->subscribers.allInitialized = allInitialized;
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
index 2610489..7eab09a 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
@@ -21,13 +21,16 @@
#include "celix_bundle_context.h"
#include "pubsub_serializer.h"
+#include "log_helper.h"
typedef struct pubsub_udpmc_topic_receiver pubsub_udpmc_topic_receiver_t;
-pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+ log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *ifIP,
+ const char *ifIP,
+ const celix_properties_t *topicProperties,
long serializerSvcId,
pubsub_serializer_service_t *serializer);
void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver);
@@ -35,6 +38,7 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver);
const char* pubsub_udpmcTopicReceiver_scope(pubsub_udpmc_topic_receiver_t *receiver);
const char* pubsub_udpmcTopicReceiver_topic(pubsub_udpmc_topic_receiver_t *receiver);
const char* pubsub_udpmcTopicReceiver_socketAddress(pubsub_udpmc_topic_receiver_t *receiver);
+void pubsub_udpmcTopicReceiver_listConnections(pubsub_udpmc_topic_receiver_t *receiver, celix_array_list_t *connections);
long pubsub_udpmcTopicReceiver_serializerSvcId(pubsub_udpmc_topic_receiver_t *receiver);
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 55bf96b..1b7c1db 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -46,6 +46,7 @@ struct pubsub_udpmc_topic_sender {
char *topic;
char *socketAddress;
long socketPort;
+ bool staticallyConfigured;
int sendSocket;
struct sockaddr_in destAddr;
@@ -89,7 +90,8 @@ pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
long serializerSvcId,
pubsub_serializer_service_t *serializer,
int sendSocket,
- const char *bindIP) {
+ const char *bindIP,
+ const celix_properties_t *topicProperties) {
pubsub_udpmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->serializerSvcId = serializerSvcId;
@@ -100,9 +102,17 @@ pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+ unsigned int port = rand_range(UDP_BASE_PORT, UDP_MAX_PORT);
+ long configuredPort = celix_properties_getAsLong(topicProperties, PUBSUB_UDPMC_STATIC_BIND_PORT, -1L);
+ if (configuredPort > 0) {
+ port = (unsigned int)configuredPort;
+ sender->staticallyConfigured = true;
+ } else {
+ sender->staticallyConfigured = false;
+ }
+
//setting up socket for UDPMC TopicSender
{
- unsigned int port = rand_range(UDP_BASE_PORT, UDP_MAX_PORT);
sender->sendSocket = sendSocket;
sender->destAddr.sin_family = AF_INET;
sender->destAddr.sin_addr.s_addr = inet_addr(bindIP);
@@ -265,15 +275,15 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
if (msgSer->msgVersion != NULL){
version_getMajor(msgSer->msgVersion, &major);
version_getMinor(msgSer->msgVersion, &minor);
- msg_hdr->major = major;
- msg_hdr->minor = minor;
+ msg_hdr->major = (unsigned char)major;
+ msg_hdr->minor = (unsigned char)minor;
}
pubsub_msg_t *msg = calloc(1, sizeof(pubsub_msg_t));
msg->header = msg_hdr;
msg->payload = (char *) serializedOutput;
- msg->payloadSize = serializedOutputLen;
+ msg->payloadSize = (unsigned int)serializedOutputLen;
if (psa_udpmc_sendMsg(entry, msg) == false) {
@@ -335,3 +345,7 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
long pubsub_udpmcTopicSender_serializerSvcId(pubsub_udpmc_topic_sender_t *sender) {
return sender->serializerSvcId;
}
+
+bool pubsub_udpmcTopicSender_isStatic(pubsub_udpmc_topic_sender_t *sender) {
+ return sender->staticallyConfigured;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
index 07c1301..e4ccbfb 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
@@ -31,13 +31,16 @@ pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
long serializerSvcId,
pubsub_serializer_service_t *serializer,
int sendSocket,
- const char *bindIP);
+ const char *bindIP,
+ const celix_properties_t *topicProperties);
void pubsub_udpmcTopicSender_destroy(pubsub_udpmc_topic_sender_t *sender);
const char* pubsub_udpmcTopicSender_scope(pubsub_udpmc_topic_sender_t *sender);
const char* pubsub_udpmcTopicSender_topic(pubsub_udpmc_topic_sender_t *sender);
const char* pubsub_udpmcTopicSender_socketAddress(pubsub_udpmc_topic_sender_t *sender);
long pubsub_udpmcTopicSender_socketPort(pubsub_udpmc_topic_sender_t *sender);
+bool pubsub_udpmcTopicSender_isStatic(pubsub_udpmc_topic_sender_t *sender);
+
long pubsub_udpmcTopicSender_serializerSvcId(pubsub_udpmc_topic_sender_t *sender);
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
index 3cc699a..2bd4438 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -23,9 +23,6 @@
#define PUBSUB_PSA_ZMQ_CONSTANTS_H_
-
-#define PSA_ZMQ_PUBSUB_ADMIN_TYPE "zmq"
-
#define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT"
#define PSA_ZMQ_MAX_PORT "PSA_ZMQ_MAX_PORT"
@@ -40,11 +37,44 @@
#define PSA_ZMQ_QOS_CONTROL_SCORE_KEY "PSA_ZMQ_QOS_CONTROL_SCORE"
#define PSA_ZMQ_DEFAULT_SCORE_KEY "PSA_ZMQ_DEFAULT_SCORE"
-#define PSA_ZMQ_DEFAULT_VERBOSE false
-#define PSA_ZMQ_VERBOSE_KEY "PSA_ZMQ_VERBOSE"
+
+#define PUBSUB_ZMQ_VERBOSE_KEY "PSA_ZMQ_VERBOSE"
+#define PUBSUB_ZMQ_VERBOSE_DEFAULT true
+
+#define PUBSUB_ZMQ_PSA_IP_KEY "PSA_IP"
+#define PUBSUB_ZMQ_PSA_ITF_KEY "PSA_INTERFACE"
+#define PUBSUB_ZMQ_NR_THREADS_KEY "PSA_ZMQ_NR_THREADS"
+
+#define PUBSUB_ZMQ_DEFAULT_IP "127.0.0.1"
+
+#define PUBSUB_ZMQ_ADMIN_TYPE "zmq"
+
+/**
+ * The ZMQ url key for the topic sender endpoints
+ */
+#define PUBSUB_ZMQ_URL_KEY "zmq.url"
+
-#define PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY "pubsub.zmq.url"
+/**
+ * Can be set in the topic properties to fix a static bind url
+ */
+#define PUBSUB_ZMQ_STATIC_BIND_URL "zmq.static.bind.url"
+
+/**
+ * Can be set in the topic properties to fix a static url used for discovery
+ */
+#define PUBSUB_ZMQ_STATIC_DISCOVER_URL "zmq.static.bind.url"
+/**
+ * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_ZMQ_STATIC_CONFIGURED "zmq.static.configured"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_ZMQ_STATIC_CONNECT_URLS "zmq.static.connect.urls"
#endif /* PUBSUB_PSA_ZMQ_CONSTANTS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index c5bfb99..8c9e163 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -377,10 +377,18 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
//if configured use a static discover url
const char *staticDiscUrl = celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_DISCOVER_URL, NULL);
if (staticDiscUrl != NULL) {
- celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl);
+ celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl);
}
celix_properties_setBool(newEndpoint, PUBSUB_ZMQ_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL);
+ //if url starts with ipc:// constrain discovery to host visibility, else use system visibility
+ const char *u = celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, "");
+ if (strncmp("ipc://", u, strlen("ipc://")) == 0) {
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_HOST_VISIBLITY);
+ } else {
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBLITY);
+ }
+
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index a249b5a..802bc38 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -22,40 +22,7 @@
#include "celix_api.h"
#include "log_helper.h"
-
-#define PUBSUB_ZMQ_ADMIN_TYPE "zmq"
-#define PUBSUB_ZMQ_URL_KEY "zmq.url"
-
-#define PUBSUB_ZMQ_VERBOSE_KEY "PSA_ZMQ_VERBOSE"
-#define PUBSUB_ZMQ_VERBOSE_DEFAULT true
-
-#define PUBSUB_ZMQ_PSA_IP_KEY "PSA_IP"
-#define PUBSUB_ZMQ_PSA_ITF_KEY "PSA_INTERFACE"
-#define PUBSUB_ZMQ_NR_THREADS_KEY "PSA_ZMQ_NR_THREADS"
-
-#define PUBSUB_ZMQ_DEFAULT_IP "127.0.0.1"
-
-/**
- * Can be set in the topic properties to fix a static bind url
- */
-#define PUBSUB_ZMQ_STATIC_BIND_URL "zmq.static.bind.url"
-
-/**
- * Can be set in the topic properties to fix a static url used for discovery
- */
-#define PUBSUB_ZMQ_STATIC_DISCOVER_URL "zmq.static.bind.url"
-
-/**
- * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
- */
-#define PUBSUB_ZMQ_STATIC_CONFIGURED "zmq.static.configured"
-
-/**
- * The static url which a subscriber should try to connect to.
- * The urls are space separated
- */
-#define PUBSUB_ZMQ_STATIC_CONNECT_URLS "zmq.static.connect.urls"
-
+#include "pubsub_psa_zmq_constants.h"
typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 673bdfd..a86178c 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -452,10 +452,16 @@ celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_propert
static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, celix_properties_t *endpoint) {
const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ assert(uuid != NULL);
celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
bool exists = hashMap_containsKey(disc->discoveredEndpoints, (void*)uuid);
- hashMap_put(disc->discoveredEndpoints, (void*)uuid, endpoint);
+ if (exists) {
+ //if exists -> keep old and free properties
+ celix_properties_destroy(endpoint);
+ } else {
+ hashMap_put(disc->discoveredEndpoints, (void*)uuid, endpoint);
+ }
celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
if (!exists) {
@@ -512,7 +518,7 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
}
celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, const char* etcdValue) {
- properties_t *props = properties_create();
+ properties_t *props = celix_properties_create();
// etcdValue contains the json formatted string
json_error_t error;
@@ -528,7 +534,7 @@ celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, con
while (iter) {
key = json_object_iter_key(iter);
value = json_object_iter_value(iter);
- properties_set(props, key, json_string_value(value));
+ celix_properties_set(props, key, json_string_value(value));
iter = json_object_iter_next(jsonRoot, iter);
}
}
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index 941f245..2d7130e 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -52,6 +52,23 @@ celix_bundle_files(pubsub_tst
)
+add_celix_container(pubsub_udpmc_tests
+ GEN_BUNDLES_CONFIG #ensures that a config.properties will be created with the launch bundles.
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ BUNDLES
+ Celix::pubsub_serializer_json
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_udp_multicast
+ pubsub_sut
+ pubsub_tst
+)
+target_link_libraries(pubsub_udpmc_tests PRIVATE Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
+target_include_directories(pubsub_udpmc_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
+add_test(NAME run_pubsub_udpmc_tests COMMAND pubsub_udpmc_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_udpmc_tests,CONTAINER_LOC>)
+SETUP_TARGET_FOR_COVERAGE(pubsub_udpmc_tests pubsub_udpmc_tests ${CMAKE_BINARY_DIR}/coverage/pubsub/pubsub_udpmc)
if (BUILD_PUBSUB_PSA_ZMQ)
add_celix_container(pubsub_zmq_tests
@@ -71,5 +88,4 @@ if (BUILD_PUBSUB_PSA_ZMQ)
target_include_directories(pubsub_zmq_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
add_test(NAME run_pubsub_zmq_tests COMMAND pubsub_zmq_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_tests,CONTAINER_LOC>)
SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_tests pubsub_zmq_tests ${CMAKE_BINARY_DIR}/coverage/pubsub/pubsub_zmq)
-endif ()
-
+endif ()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/1df73a01/bundles/pubsub/test/meta_data/ping.properties
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/ping.properties
index 09284fc..4cb9981 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/ping.properties
@@ -16,4 +16,6 @@
# under the License.
zmq.static.bind.url=ipc:///tmp/pubsub-pingtest
zmq.static.connect.urls=ipc:///tmp/pubsub-pingtest
+udpmc.static.bind.port=50678
+udpmc.static.connect.socket_addresses=224.100.0.1:50678