You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rl...@apache.org on 2019/08/19 20:20:10 UTC
[celix] branch develop updated: Added check for serializer type
when connecting endpoints
This is an automated email from the ASF dual-hosted git repository.
rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push:
new b39a2e5 Added check for serializer type when connecting endpoints
new 47b0cd8 Merge pull request #42 from rlenferink/feature/serializer-fix
b39a2e5 is described below
commit b39a2e50f74eac4ef6061bc84b0bd6398309dc1a
Author: Michiel Bouwhuis <mi...@gmail.com>
AuthorDate: Mon Aug 19 12:02:40 2019 +0200
Added check for serializer type when connecting endpoints
---
.../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c | 82 ++++++++++++----------
.../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 52 +++++++-------
2 files changed, 72 insertions(+), 62 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 990594f..5c2df31 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -87,6 +87,11 @@ static celix_status_t udpmc_getIpAddress(const char* interface, char** ip);
static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static bool pubsub_udpmcAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+ return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+}
+
pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
pubsub_udpmc_admin_t *psa = calloc(1, sizeof(*psa));
@@ -405,7 +410,9 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
while (hashMapIterator_hasNext(&iter)) {
celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
- pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
+ pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
}
@@ -443,18 +450,10 @@ static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_a
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
- const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
-
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
const char *sockAddress = celix_properties_get(endpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, NULL);
long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, -1L);
- bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
-
- if (publisher && (sockAddress == NULL || sockPort < 0)) {
+ if (sockAddress == NULL || sockPort < 0) {
L_WARN("[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type. Properties:");
const char *key = NULL;
CELIX_PROPERTIES_FOR_EACH(endpoint, key) {
@@ -462,9 +461,24 @@ static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_a
}
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != NULL && eTopic != NULL && publisher &&
- strncmp(eScope, scope, 1024 * 1024) == 0 &&
- strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
+ const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
+ const char *serializer = NULL;
+ long serializerSvcId = pubsub_udpmcTopicReceiver_serializerSvcId(receiver);
+ psa_udpmc_serializer_entry_t *serializerEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+ if (serializerEntry != NULL) {
+ serializer = serializerEntry->serType;
+ }
+
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+
+ if (scope != NULL && topic != NULL && serializer != NULL
+ && eScope != NULL && eTopic != NULL && eSerializer != NULL
+ && strncmp(eScope, scope, 1024*1024) == 0
+ && strncmp(eTopic, topic, 1024*1024) == 0
+ && strncmp(eSerializer, serializer, 1024*1024) == 0) {
pubsub_udpmcTopicReceiver_connectTo(receiver, sockAddress, sockPort);
}
}
@@ -474,13 +488,16 @@ static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_a
celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_udpmc_admin_t *psa = handle;
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+
+ if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
}
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
celix_properties_t *cpy = celix_properties_copy(endpoint);
@@ -497,12 +514,7 @@ static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_ud
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
- const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
-
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
const char *sockAdress = celix_properties_get(endpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, NULL);
long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, -1L);
@@ -510,12 +522,7 @@ static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_ud
L_WARN("[PSA UPDMC] Error disconnecting from endpoint without udpmc socket address/port or endpoint type.");
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != NULL && eTopic != NULL && type != NULL &&
- strncmp(eScope, scope, 1024 * 1024) == 0 &&
- strncmp(eTopic, topic, 1024 * 1024) == 0 &&
- strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- pubsub_udpmcTopicReceiver_disconnectFrom(receiver, sockAdress, sockPort);
- }
+ pubsub_udpmcTopicReceiver_disconnectFrom(receiver, sockAdress, sockPort);
}
return status;
@@ -523,13 +530,16 @@ static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_ud
celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_udpmc_admin_t *psa = handle;
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- pubsub_udpmcAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+
+ if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_udpmcAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
}
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index 8335287..de7e068 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -92,6 +92,10 @@ static celix_status_t zmq_getIpAddress(const char* interface, char** ip);
static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static bool pubsub_zmqAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+ return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+}
pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
pubsub_zmq_admin_t *psa = calloc(1, sizeof(*psa));
@@ -491,8 +495,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
while (hashMapIterator_hasNext(&iter)) {
celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
- if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
}
}
@@ -532,11 +535,6 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
- const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
-
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
const char *url = celix_properties_get(endpoint, PUBSUB_ZMQ_URL_KEY, NULL);
if (url == NULL) {
@@ -545,9 +543,24 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
L_WARN("[PSA ZMQ] Error got endpoint without a zmq url (admin: %s, type: %s)", admin , type);
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != NULL && eTopic != NULL &&
- strncmp(eScope, scope, 1024 * 1024) == 0 &&
- strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
+ const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
+ const char *serializer = NULL;
+ long serializerSvcId = pubsub_zmqTopicReceiver_serializerSvcId(receiver);
+ psa_zmq_serializer_entry_t *serializerEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+ if (serializerEntry != NULL) {
+ serializer = serializerEntry->serType;
+ }
+
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+
+ if (scope != NULL && topic != NULL && serializer != NULL
+ && eScope != NULL && eTopic != NULL && eSerializer != NULL
+ && strncmp(eScope, scope, 1024*1024) == 0
+ && strncmp(eTopic, topic, 1024*1024) == 0
+ && strncmp(eSerializer, serializer, 1024*1024) == 0) {
pubsub_zmqTopicReceiver_connectTo(receiver, url);
}
}
@@ -558,9 +571,7 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_zmq_admin_t *psa = handle;
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-
- if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -585,22 +596,13 @@ static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
- const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
-
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
const char *url = celix_properties_get(endpoint, PUBSUB_ZMQ_URL_KEY, NULL);
if (url == NULL) {
L_WARN("[PSA ZMQ] Error got endpoint without zmq url");
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != NULL && eTopic != NULL &&
- strncmp(eScope, scope, 1024 * 1024) == 0 &&
- strncmp(eTopic, topic, 1024 * 1024) == 0) {
- pubsub_zmqTopicReceiver_disconnectFrom(receiver, url);
- }
+ pubsub_zmqTopicReceiver_disconnectFrom(receiver, url);
}
return status;
@@ -609,9 +611,7 @@ static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_
celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_zmq_admin_t *psa = handle;
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-
- if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {