You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rl...@apache.org on 2019/08/19 20:20:10 UTC

[celix] branch develop updated: Added check for serializer type when connecting endpoints

This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new b39a2e5  Added check for serializer type when connecting endpoints
     new 47b0cd8  Merge pull request #42 from rlenferink/feature/serializer-fix
b39a2e5 is described below

commit b39a2e50f74eac4ef6061bc84b0bd6398309dc1a
Author: Michiel Bouwhuis <mi...@gmail.com>
AuthorDate: Mon Aug 19 12:02:40 2019 +0200

    Added check for serializer type when connecting endpoints
---
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   | 82 ++++++++++++----------
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 52 +++++++-------
 2 files changed, 72 insertions(+), 62 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 990594f..5c2df31 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -87,6 +87,11 @@ static celix_status_t udpmc_getIpAddress(const char* interface, char** ip);
 static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
 static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
 
+static bool pubsub_udpmcAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+    return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+}
+
 
 pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
     pubsub_udpmc_admin_t *psa = calloc(1, sizeof(*psa));
@@ -405,7 +410,9 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
         hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-            pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
+                pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
         }
         celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
     }
@@ -443,18 +450,10 @@ static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_a
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
-    const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
-
-    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
     const char *sockAddress = celix_properties_get(endpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, NULL);
     long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, -1L);
 
-    bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
-
-    if (publisher && (sockAddress == NULL || sockPort < 0)) {
+    if (sockAddress == NULL || sockPort < 0) {
         L_WARN("[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type. Properties:");
         const char *key = NULL;
         CELIX_PROPERTIES_FOR_EACH(endpoint, key) {
@@ -462,9 +461,24 @@ static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_a
         }
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eScope != NULL && eTopic != NULL && publisher &&
-            strncmp(eScope, scope, 1024 * 1024) == 0 &&
-            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+        const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
+        const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
+        const char *serializer = NULL;
+        long serializerSvcId = pubsub_udpmcTopicReceiver_serializerSvcId(receiver);
+        psa_udpmc_serializer_entry_t *serializerEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serializerEntry != NULL) {
+            serializer = serializerEntry->serType;
+        }
+
+        const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+        const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+        const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+
+        if (scope != NULL && topic != NULL && serializer != NULL
+                        && eScope != NULL && eTopic != NULL && eSerializer != NULL
+                        && strncmp(eScope, scope, 1024*1024) == 0
+                        && strncmp(eTopic, topic, 1024*1024) == 0
+                        && strncmp(eSerializer, serializer, 1024*1024) == 0) {
             pubsub_udpmcTopicReceiver_connectTo(receiver, sockAddress, sockPort);
         }
     }
@@ -474,13 +488,16 @@ static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_a
 
 celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_udpmc_admin_t *psa = handle;
-    celixThreadMutex_lock(&psa->topicReceivers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
-        pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+
+    if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
-    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
 
     celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
     celix_properties_t *cpy = celix_properties_copy(endpoint);
@@ -497,12 +514,7 @@ static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_ud
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
-    const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
-
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
     const char *sockAdress = celix_properties_get(endpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, NULL);
     long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, -1L);
 
@@ -510,12 +522,7 @@ static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_ud
         L_WARN("[PSA UPDMC] Error disconnecting from endpoint without udpmc socket address/port or endpoint type.");
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eScope != NULL && eTopic != NULL && type != NULL &&
-            strncmp(eScope, scope, 1024 * 1024) == 0 &&
-            strncmp(eTopic, topic, 1024 * 1024) == 0 &&
-            strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-            pubsub_udpmcTopicReceiver_disconnectFrom(receiver, sockAdress, sockPort);
-        }
+        pubsub_udpmcTopicReceiver_disconnectFrom(receiver, sockAdress, sockPort);
     }
 
     return status;
@@ -523,13 +530,16 @@ static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_ud
 
 celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_udpmc_admin_t *psa = handle;
-    celixThreadMutex_lock(&psa->topicReceivers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
-        pubsub_udpmcAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+
+    if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            pubsub_udpmcAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
-    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
 
     celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
     const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index 8335287..de7e068 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -92,6 +92,10 @@ static celix_status_t zmq_getIpAddress(const char* interface, char** ip);
 static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
 static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
 
+static bool pubsub_zmqAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+    return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+}
 
 pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
     pubsub_zmq_admin_t *psa = calloc(1, sizeof(*psa));
@@ -491,8 +495,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
         hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-            const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+            if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
                 pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
             }
         }
@@ -532,11 +535,6 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
-    const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
-
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
     const char *url = celix_properties_get(endpoint, PUBSUB_ZMQ_URL_KEY, NULL);
 
     if (url == NULL) {
@@ -545,9 +543,24 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
         L_WARN("[PSA ZMQ] Error got endpoint without a zmq url (admin: %s, type: %s)", admin , type);
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eScope != NULL && eTopic != NULL &&
-            strncmp(eScope, scope, 1024 * 1024) == 0 &&
-            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+        const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
+        const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
+        const char *serializer = NULL;
+        long serializerSvcId = pubsub_zmqTopicReceiver_serializerSvcId(receiver);
+        psa_zmq_serializer_entry_t *serializerEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serializerEntry != NULL) {
+            serializer = serializerEntry->serType;
+        }
+
+        const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+        const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+        const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+
+        if (scope != NULL && topic != NULL && serializer != NULL
+                        && eScope != NULL && eTopic != NULL && eSerializer != NULL
+                        && strncmp(eScope, scope, 1024*1024) == 0
+                        && strncmp(eTopic, topic, 1024*1024) == 0
+                        && strncmp(eSerializer, serializer, 1024*1024) == 0) {
             pubsub_zmqTopicReceiver_connectTo(receiver, url);
         }
     }
@@ -558,9 +571,7 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
 celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_zmq_admin_t *psa = handle;
 
-    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-
-    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+    if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
         while (hashMapIterator_hasNext(&iter)) {
@@ -585,22 +596,13 @@ static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
-    const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
-
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
     const char *url = celix_properties_get(endpoint, PUBSUB_ZMQ_URL_KEY, NULL);
 
     if (url == NULL) {
         L_WARN("[PSA ZMQ] Error got endpoint without zmq url");
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eScope != NULL && eTopic != NULL &&
-            strncmp(eScope, scope, 1024 * 1024) == 0 &&
-            strncmp(eTopic, topic, 1024 * 1024) == 0) {
-            pubsub_zmqTopicReceiver_disconnectFrom(receiver, url);
-        }
+        pubsub_zmqTopicReceiver_disconnectFrom(receiver, url);
     }
 
     return status;
@@ -609,9 +611,7 @@ static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_
 celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_zmq_admin_t *psa = handle;
 
-    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-
-    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+    if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
         while (hashMapIterator_hasNext(&iter)) {