You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/29 14:26:58 UTC

[celix] branch bugfix/zmq_wrong_sender_connections created (now e10d983)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a change to branch bugfix/zmq_wrong_sender_connections
in repository https://gitbox.apache.org/repos/asf/celix.git.


      at e10d983  Removes adding of default scope in pubsub discovery.

This branch includes the following new commits:

     new 44d6a55  Fixes an issue in endpoint endpoint discovery.
     new e10d983  Removes adding of default scope in pubsub discovery.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[celix] 02/02: Removes adding of default scope in pubsub discovery.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections
in repository https://gitbox.apache.org/repos/asf/celix.git

commit e10d9835c8fba6c4086406d36841431e9810fdd6
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jun 29 16:26:30 2020 +0200

    Removes adding of default scope in pubsub discovery.
    
    If no scope is set for a publisher/subscriber this needs to be reflected in the
    endpoints so that correct matching can occur.
---
 .../examples/pubsub/publisher/CMakeLists.txt       |  1 +
 .../examples/pubsub/subscriber/CMakeLists.txt      |  2 +-
 .../src/pubsub_tcp_topic_receiver.c                |  9 ++++---
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  6 ++++-
 .../src/pubsub_zmq_topic_receiver.c                |  6 ++++-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c |  6 ++++-
 .../pubsub_discovery/src/pubsub_discovery_impl.c   | 29 ++++++++++++----------
 .../pubsub/pubsub_spi/include/pubsub_constants.h   |  5 ----
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    |  2 --
 .../src/pubsub_topology_manager.c                  | 18 +++++++-------
 10 files changed, 47 insertions(+), 37 deletions(-)

diff --git a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
index c68d533..3b9ea50 100644
--- a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
@@ -25,6 +25,7 @@ add_celix_bundle(celix_pubsub_poi_publisher
 
 target_link_libraries(celix_pubsub_poi_publisher PRIVATE Celix::framework Celix::pubsub_api)
 target_include_directories(celix_pubsub_poi_publisher PRIVATE private/include)
+target_compile_definitions(celix_pubsub_poi_publisher PRIVATE USE_SCOPE)
 
 celix_bundle_files(celix_pubsub_poi_publisher
         ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
diff --git a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
index 94acdb2..8bee482 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
@@ -25,7 +25,7 @@ add_celix_bundle(celix_pubsub_poi_subscriber
 
 target_link_libraries(celix_pubsub_poi_subscriber PRIVATE Celix::framework Celix::pubsub_api)
 target_include_directories(celix_pubsub_poi_subscriber PRIVATE private/include)
-
+target_compile_definitions(celix_pubsub_poi_subscriber PRIVATE USE_SCOPE)
 
 celix_bundle_files(celix_pubsub_poi_subscriber
         ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index eb6afbd..8ecb7df 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -641,10 +641,11 @@ static void *psa_tcp_recvThread(void *data) {
 
 pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope,
-             PUBSUB_AMDIN_METRICS_NAME_MAX,
-             "%s",
-             receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
+    if (receiver->scope != NULL) {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
+    } else {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
+    }
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
 
     int msgTypesCount = 0;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 47dc888..809ab7f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -449,7 +449,11 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
 
 pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) {
     pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
+    if (sender->scope != NULL) {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
+    } else {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
+    }
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
     celixThreadMutex_lock(&sender->boundedServices.mutex);
     size_t count = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 088474a..2ae58af 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -711,7 +711,11 @@ static void* psa_zmq_recvThread(void * data) {
 
 pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
+    if (receiver->scope != NULL) {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
+    } else {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
+    }
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
 
     int msgTypesCount = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 413f1b3..85c488b 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -464,7 +464,11 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
 
 pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender) {
     pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
+    if (sender->scope != NULL) {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
+    } else {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
+    }
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
     celixThreadMutex_lock(&sender->boundedServices.mutex);
     size_t count = 0;
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index c6d1aa9..5e1b702 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -398,7 +398,11 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
         clock_gettime(CLOCK_MONOTONIC, &entry->createTime);
         entry->isSet = false;
         entry->properties = celix_properties_copy(endpoint);
-        asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : scope, topic, uuid);
+        if (scope == NULL) {
+            asprintf(&entry->key, "/pubsub/%s/%s/%s", config, topic, uuid);
+        } else {
+            asprintf(&entry->key, "/pubsub/%s/%s__%s/%s", config, scope, topic, uuid);
+        }
 
         const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, NULL);
         celixThreadMutex_lock(&disc->announcedEndpointsMutex);
@@ -409,7 +413,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
         celixThreadCondition_broadcast(&disc->waitCond);
         celixThreadMutex_unlock(&disc->runningMutex);
     } else if (valid) {
-        L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(null)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility);
+        L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(empty)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility);
     }
 
     if (!valid) {
@@ -464,8 +468,8 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel
         if (disc->verbose) {
             const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
             const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-            const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
-            const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
+            const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)");
+            const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)");
             L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol is %s.\n",
                    uuid, type, admin, ser, prot);
         }
@@ -495,8 +499,8 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
     if (disc->verbose) {
         const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
         const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
-        const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
+        const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)");
+        const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)");
         L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol = %s.\n",
                uuid, type, admin, ser, prot);
     }
@@ -572,7 +576,6 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
 
     struct timespec now;
     clock_gettime(CLOCK_MONOTONIC, &now);
-    //TODO add support for query (scope / topic)
 
     fprintf(os, "\n");
     fprintf(os, "Discovery configuration:\n");
@@ -589,11 +592,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
     while (hashMapIterator_hasNext(&iter)) {
         celix_properties_t *ep = hashMapIterator_nextValue(&iter);
         const char *uuid = celix_properties_get(ep, PUBSUB_ENDPOINT_UUID, "!Error!");
-        const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+        const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
         const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-        const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
+        const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
+        const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
         const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!");
         fprintf(os, "Endpoint %s:\n", uuid);
         fprintf(os, "   |- type          = %s\n", type);
@@ -612,11 +615,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
         const char *uuid = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, "!Error!");
-        const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+        const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
         const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-        const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
+        const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
+        const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
         const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE, "!Error!");
         int age = (int)(now.tv_sec - entry->createTime.tv_sec);
         fprintf(os, "Endpoint %s:\n", uuid);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
index 00ee6b4..671b874 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -39,9 +39,4 @@
  */
 #define PUBSUB_ENDPOINT_LOCAL_VISIBILITY     "local"
 
-/**
- * Default scope, if not scope is specified endpoints are published using this scope
- */
-#define PUBSUB_DEFAULT_ENDPOINT_SCOPE        "default"
-
 #endif /* PUBSUB_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 2aa052d..540ecda 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -65,8 +65,6 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID,
 
     if (scope != NULL) {
         celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
-    } else {
-        celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE);
     }
 
     if (topic != NULL) {
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 2b063ef..434de1e 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -1009,11 +1009,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!");
         const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!");
         const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
-        const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)");
+        const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
         const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-        const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
+        const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
+        const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
         fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
         fprintf(os, "   |- container name = %s\n", cn);
         fprintf(os, "   |- fw uuid        = %s\n", fwuuid);
@@ -1044,10 +1044,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         }
         const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
         const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
+        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
+        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
         fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
-        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(null)" : entry->scope);
+        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);
@@ -1074,10 +1074,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         }
         const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
         const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
+        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
+        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
         fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
-        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(null)" : entry->scope);
+        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);


[celix] 01/02: Fixes an issue in endpoint endpoint discovery.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 44d6a5515c004de4c221a40c426099dfb780dfcf
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jun 29 15:39:07 2020 +0200

    Fixes an issue in endpoint endpoint discovery.
    
    The topic and scope of already discovered endpoints where not checked before connecting to them.
---
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c |  2 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   |  2 +-
 .../src/pubsub_websocket_admin.c                   |  2 +-
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c |  2 +-
 bundles/pubsub/pubsub_spi/CMakeLists.txt           |  7 +++-
 bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt     | 23 +++++++++++
 .../gtest/src/PubSubEndpointUtilsTestSuite.cc      | 46 ++++++++++++++++++++++
 .../pubsub/pubsub_spi/include/pubsub_endpoint.h    |  9 ++++-
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    |  2 +-
 .../pubsub/pubsub_spi/src/pubsub_endpoint_match.c  | 10 +++++
 libs/utils/gtest/src/LogUtilsTestSuite.cc          |  4 +-
 11 files changed, 100 insertions(+), 9 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index 67d4333..c13442f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -549,7 +549,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
         hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-            if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) {
+            if (pubsub_tcpAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
                 pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index d8fdac1..e152b94 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -410,7 +410,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
         hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-            if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
+            if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
                 pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
index a017c01..924ddef 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -393,7 +393,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
             const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0 && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
                 pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index bd2de54..2c1c7b5 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -599,7 +599,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
         hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-            if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
+            if (pubsub_zmqAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
                 pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
             }
         }
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt
index 2f4f3e9..8d1c340 100644
--- a/bundles/pubsub/pubsub_spi/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -33,4 +33,9 @@ target_link_libraries(pubsub_spi PUBLIC Celix::pubsub_utils )
 add_library(Celix::pubsub_spi ALIAS pubsub_spi)
 
 install(TARGETS pubsub_spi EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub)
-install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub)
\ No newline at end of file
+install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub)
+
+
+if (ENABLE_TESTING)
+    add_subdirectory(gtest)
+endif(ENABLE_TESTING)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt
new file mode 100644
index 0000000..faac0ff
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_executable(test_pubsub_spi
+		src/PubSubEndpointUtilsTestSuite.cc
+)
+target_link_libraries(test_pubsub_spi PRIVATE Celix::pubsub_spi GTest::gtest GTest::gtest_main)
+target_compile_options(test_pubsub_spi PRIVATE -std=c++14) #Note test code is allowed to be C++14
+setup_target_for_coverage(test_pubsub_spi SCAN_DIR ..)
diff --git a/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc b/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc
new file mode 100644
index 0000000..eecdc74
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc
@@ -0,0 +1,46 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "gtest/gtest.h"
+
+#include "pubsub_endpoint.h"
+
+class PubSubEndpointUtilsTestSuite : public ::testing::Test {
+public:
+};
+
+
+TEST_F(PubSubEndpointUtilsTestSuite, pubsubEndpoint_matchWithTopicAndScope) {
+    celix_properties_t* endpoint = celix_properties_create();
+    celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "topic");
+
+    EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default"));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope"));
+
+    celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "scope");
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default"));
+    EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope"));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scopeaa"));
+
+    celix_properties_destroy(endpoint);
+}
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index 1cd966c..e1f8f33 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -174,7 +174,14 @@ bool pubsubEndpoint_match(
         long *outSerializerSvcId,
         long *outProtocolSvcId);
 
-
+/**
+ * Match an endpoint with a topic & scope.
+ * @param endpoint The endpoints (mandatory)
+ * @param topic The topic (mandatory)
+ * @param scope The scope (can be NULL)
+ * @return true if the endpoint is for the provide topic and scope);
+ */
+bool pubsubEndpoint_matchWithTopicAndScope(const celix_properties_t* endpoint, const char *topic, const char *scope);
 
 
 #ifdef __cplusplus
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index a98044f..2aa052d 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -204,7 +204,7 @@ char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) {
     if (scope != NULL) {
         asprintf(&result, "%s:%s", scope, topic);
     } else {
-        asprintf(&result, "default:%s", topic);
+        asprintf(&result, "scopeless %s", topic);
     }
     return result;
 }
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c
index 659ad5d..cc18beb 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c
@@ -22,6 +22,7 @@
 #include <pubsub_endpoint.h>
 #include <pubsub_serializer.h>
 #include <pubsub_protocol.h>
+#include <celix_api.h>
 
 #include "service_reference.h"
 
@@ -322,3 +323,12 @@ bool pubsubEndpoint_match(
 
     return match;
 }
+
+bool pubsubEndpoint_matchWithTopicAndScope(const celix_properties_t* endpoint, const char *topic, const char *scope) {
+    const char *endpointScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *endpointTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    if (celix_utils_stringEquals(topic, endpointTopic) && celix_utils_stringEquals(scope, endpointScope)) {
+        return true;
+    }
+    return false;
+}
\ No newline at end of file
diff --git a/libs/utils/gtest/src/LogUtilsTestSuite.cc b/libs/utils/gtest/src/LogUtilsTestSuite.cc
index 0123ee6..a63bd43 100644
--- a/libs/utils/gtest/src/LogUtilsTestSuite.cc
+++ b/libs/utils/gtest/src/LogUtilsTestSuite.cc
@@ -20,11 +20,11 @@
 #include <gtest/gtest.h>
 
 #include "celix_log_utils.h"
+#include "hash_map.h"
+#include "utils.h"
 
 class LogUtilsTestSuite : public ::testing::Test {};
 
-
-
 TEST_F(LogUtilsTestSuite, LogLevelToString) {
     EXPECT_STREQ("trace", celix_logUtils_logLevelToString(CELIX_LOG_LEVEL_TRACE));
     EXPECT_STREQ("debug", celix_logUtils_logLevelToString(CELIX_LOG_LEVEL_DEBUG));