You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/29 18:57:26 UTC

[celix] branch bugfix/zmq_wrong_sender_connections updated: Changes handling of scopeless and default scoped topic sendes/receivers.

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/bugfix/zmq_wrong_sender_connections by this push:
     new 1c36d3f  Changes handling of scopeless and default scoped topic sendes/receivers.
1c36d3f is described below

commit 1c36d3f052e2bcc304ebff34ce31ba949a0f424e
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jun 29 20:57:02 2020 +0200

    Changes handling of scopeless and default scoped topic sendes/receivers.
    
    In the udpate setup a scopeless (scope==NULL) and default scoped (scope="default") will have their own topic sender and receiver, but will connect to each other through discovery. This is done so that requesting a scopeless publisher will result in a service without a scope property and requesting a default scoped publisher will result in a publisher with a "default" property.
---
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 14 ++++-----
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   | 13 ++++-----
 .../src/pubsub_websocket_admin.c                   | 13 ++++-----
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 33 ++++++++++++++++------
 .../src/pubsub_zmq_topic_receiver.c                |  4 +--
 .../pubsub/pubsub_spi/include/pubsub_endpoint.h    |  7 ++++-
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    |  2 +-
 7 files changed, 51 insertions(+), 35 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index c13442f..e7ad284 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -610,15 +610,13 @@ celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_p
 
     if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-        const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-        char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-        pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
-        if (receiver != NULL) {
-            pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_tcpTopicReceiver_topic(receiver), pubsub_tcpTopicReceiver_scope(receiver))) {
+                pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
         }
-        free(key);
         celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
 
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index e152b94..18657a1 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -472,13 +472,12 @@ celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_propertie
 
     if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-        const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-        char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-        pubsub_udpmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
-        if (receiver != NULL) {
-            pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_udpmcTopicReceiver_topic(receiver), pubsub_udpmcTopicReceiver_scope(receiver))) {
+                pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
         }
         celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
index 924ddef..ba33252 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -460,13 +460,12 @@ celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const c
 
     if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-        const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-        char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-        pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
-        if (receiver != NULL) {
-            pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_websocketTopicReceiver_topic(receiver), pubsub_websocketTopicReceiver_scope(receiver))) {
+                pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
         }
         celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index 2c1c7b5..296705d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -658,15 +658,13 @@ celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_p
 
     if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-        const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-        char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-        pubsub_zmq_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
-        if (receiver != NULL) {
-            pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_zmqTopicReceiver_topic(receiver), pubsub_zmqTopicReceiver_scope(receiver))) {
+                pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
         }
-        free(key);
         celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
 
@@ -723,10 +721,27 @@ celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celi
     return status;
 }
 
-bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
+bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *out, FILE *errStream __attribute__((unused))) {
     pubsub_zmq_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
+
+    char *line = celix_utils_strdup(commandLine);
+    char *token = line;
+    strtok_r(line, " ", &token); //first token is command name
+    strtok_r(NULL, " ", &token); //second token is sub command
+
+    if (celix_utils_stringEquals(token, "nr_of_receivers")) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map));
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+    if (celix_utils_stringEquals(token, "nr_of_senders")) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map));
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    }
+
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
     celixThreadMutex_lock(&psa->serializers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 088474a..10fc49e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -415,8 +415,8 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
     long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
-    if (receiver->scope == NULL){
-        if (subScope != NULL){
+    if (receiver->scope == NULL) {
+        if (subScope != NULL) {
             return;
         }
     } else if (subScope != NULL) {
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index e1f8f33..2e1cfe9 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -65,7 +65,12 @@ bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properti
 bool
 pubsubEndpoint_isValid(const celix_properties_t *endpointProperties, bool requireAdminType, bool requireSerializerType);
 
-
+/**
+ * Create a key based on scope an topic.
+ * Scope can be NULL.
+ * Note that NULL, "topic" and "default", "topic" will result in different keys
+ * @return a newly created key. caller is responsible for freeing the string array.
+ */
 char *pubsubEndpoint_createScopeTopicKey(const char *scope, const char *topic);
 
 /**
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 7ef620d..c1a715e 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -205,7 +205,7 @@ char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) {
         asprintf(&result, "%s:%s", scope, topic);
     } else {
         //NOTE scope == NULL, equal to scope="default"
-        asprintf(&result, "%s:%s", PUBSUB_DEFAULT_ENDPOINT_SCOPE, topic);
+        asprintf(&result, "%s", topic);
     }
     return result;
 }