You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/30 14:05:46 UTC

[celix] branch master updated: Bugfix/zmq wrong sender connections (#264)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/master by this push:
     new a11b4c9  Bugfix/zmq wrong sender connections (#264)
a11b4c9 is described below

commit a11b4c9433a3a69245f374a8a4fb3a2935491bd4
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Tue Jun 30 16:05:34 2020 +0200

    Bugfix/zmq wrong sender connections (#264)
    
    * Fixes an issue in endpoint endpoint discovery.
    
    * Changes handling of scopeless and default scoped topic sendes/receivers.
    
    In the udpated setup a scopeless (scope==NULL) and default scoped (scope="default") will have their own topic sender and receiver, but will connect to each other through discovery. This is done so that requesting a scopeless publisher will result in a service without a scope property and requesting a default scoped publisher will result in a publisher with a "default" property.
---
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 16 ++--
 .../src/pubsub_tcp_topic_receiver.c                |  3 +
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   | 15 ++--
 .../src/pubsub_udpmc_topic_receiver.c              |  5 +-
 .../src/pubsub_websocket_admin.c                   | 15 ++--
 .../src/pubsub_websocket_topic_receiver.c          |  3 +
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 35 ++++++---
 .../src/pubsub_zmq_topic_receiver.c                |  7 +-
 bundles/pubsub/pubsub_spi/CMakeLists.txt           |  7 +-
 bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt     | 25 ++++++
 .../gtest/src/PubSubEndpointUtilsTestSuite.cc      | 47 +++++++++++
 .../pubsub/pubsub_spi/include/pubsub_endpoint.h    | 16 +++-
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    |  3 +-
 .../pubsub/pubsub_spi/src/pubsub_endpoint_match.c  | 11 +++
 bundles/pubsub/test/test/test_runner.cc            | 90 +++++++++++-----------
 libs/utils/gtest/src/LogUtilsTestSuite.cc          |  2 -
 16 files changed, 211 insertions(+), 89 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index 67d4333..e7ad284 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -549,7 +549,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
         hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-            if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) {
+            if (pubsub_tcpAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
                 pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
             }
         }
@@ -610,15 +610,13 @@ celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_p
 
     if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-        const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-        char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-        pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
-        if (receiver != NULL) {
-            pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_tcpTopicReceiver_topic(receiver), pubsub_tcpTopicReceiver_scope(receiver))) {
+                pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
         }
-        free(key);
         celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index abc44f4..4fa4586 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -448,6 +448,9 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
             //not the same scope. ignore
             return;
         }
+    } else {
+        //receiver scope is not NULL, but subScope is NULL -> ignore
+        return;
     }
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index d8fdac1..18657a1 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -410,7 +410,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
         hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-            if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
+            if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
                 pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
             }
         }
@@ -472,13 +472,12 @@ celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_propertie
 
     if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-        const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-        char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-        pubsub_udpmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
-        if (receiver != NULL) {
-            pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_udpmcTopicReceiver_topic(receiver), pubsub_udpmcTopicReceiver_scope(receiver))) {
+                pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
         }
         celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index c8ad961..5f82019 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -338,11 +338,14 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
         if (subScope != NULL) {
             return;
         }
-    } else {
+    } else if (subScope != NULL) {
         if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
             //not the same scope. ignore
             return;
         }
+    } else {
+        //receiver scope is not NULL, but subScope is NULL -> ignore
+        return;
     }
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
index a017c01..ba33252 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -393,7 +393,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
             const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
-            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0 && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
                 pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
             }
         }
@@ -460,13 +460,12 @@ celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const c
 
     if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-        const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-        char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-        pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
-        if (receiver != NULL) {
-            pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_websocketTopicReceiver_topic(receiver), pubsub_websocketTopicReceiver_scope(receiver))) {
+                pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
         }
         celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index ec35af3..7d8cd00 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -410,6 +410,9 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
             //not the same scope. ignore
             return;
         }
+    } else {
+        //receiver scope is not NULL, but subScope is NULL -> ignore
+        return;
     }
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index 83eaed1..b16321a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -601,7 +601,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
         hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
-            if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
+            if (pubsub_zmqAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
                 pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
             }
         }
@@ -660,15 +660,13 @@ celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_p
 
     if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-        const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-        char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-        pubsub_zmq_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
-        if (receiver != NULL) {
-            pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_zmqTopicReceiver_topic(receiver), pubsub_zmqTopicReceiver_scope(receiver))) {
+                pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
         }
-        free(key);
         celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     }
 
@@ -725,10 +723,27 @@ celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celi
     return status;
 }
 
-bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
+bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *out, FILE *errStream __attribute__((unused))) {
     pubsub_zmq_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
+
+    char *line = celix_utils_strdup(commandLine);
+    char *token = line;
+    strtok_r(line, " ", &token); //first token is command name
+    strtok_r(NULL, " ", &token); //second token is sub command
+
+    if (celix_utils_stringEquals(token, "nr_of_receivers")) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map));
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+    if (celix_utils_stringEquals(token, "nr_of_senders")) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map));
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    }
+
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
     celixThreadMutex_lock(&psa->serializers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index b19adb0..11f2b8f 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -419,8 +419,8 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
     long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
-    if (receiver->scope == NULL){
-        if (subScope != NULL){
+    if (receiver->scope == NULL) {
+        if (subScope != NULL) {
             return;
         }
     } else if (subScope != NULL) {
@@ -428,6 +428,9 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
             //not the same scope. ignore
             return;
         }
+    } else {
+        //receiver scope is not NULL, but subScope is NULL -> ignore
+        return;
     }
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt
index 2f4f3e9..8d1c340 100644
--- a/bundles/pubsub/pubsub_spi/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -33,4 +33,9 @@ target_link_libraries(pubsub_spi PUBLIC Celix::pubsub_utils )
 add_library(Celix::pubsub_spi ALIAS pubsub_spi)
 
 install(TARGETS pubsub_spi EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub)
-install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub)
\ No newline at end of file
+install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub)
+
+
+if (ENABLE_TESTING)
+    add_subdirectory(gtest)
+endif(ENABLE_TESTING)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt
new file mode 100644
index 0000000..5cc80be
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_executable(test_pubsub_spi
+		src/PubSubEndpointUtilsTestSuite.cc
+)
+target_link_libraries(test_pubsub_spi PRIVATE Celix::pubsub_spi GTest::gtest GTest::gtest_main)
+target_compile_options(test_pubsub_spi PRIVATE -std=c++14) #Note test code is allowed to be C++14
+
+#Seems to be an issue with coverage setup, for now disabled
+#setup_target_for_coverage(test_pubsub_spi SCAN_DIR ..)
diff --git a/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc b/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc
new file mode 100644
index 0000000..802e8f0
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc
@@ -0,0 +1,47 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "gtest/gtest.h"
+
+#include "pubsub_endpoint.h"
+
+TEST(PubSubEndpointUtilsTestSuite, pubsubEndpoint_matchWithTopicAndScope) {
+    celix_properties_t* endpoint = celix_properties_create();
+    celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "topic");
+
+    EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr));
+    EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default")); //Note "default" is the same as NULL scope
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope"));
+
+    celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "scope");
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default"));
+    EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope"));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scopeaa"));
+
+    celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "default");
+    EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr)); //Note NULL is the same as "default" scope
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr));
+    EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default"));
+    EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope"));
+
+    celix_properties_destroy(endpoint);
+}
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index 1cd966c..2e1cfe9 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -65,7 +65,12 @@ bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properti
 bool
 pubsubEndpoint_isValid(const celix_properties_t *endpointProperties, bool requireAdminType, bool requireSerializerType);
 
-
+/**
+ * Create a key based on scope an topic.
+ * Scope can be NULL.
+ * Note that NULL, "topic" and "default", "topic" will result in different keys
+ * @return a newly created key. caller is responsible for freeing the string array.
+ */
 char *pubsubEndpoint_createScopeTopicKey(const char *scope, const char *topic);
 
 /**
@@ -174,7 +179,14 @@ bool pubsubEndpoint_match(
         long *outSerializerSvcId,
         long *outProtocolSvcId);
 
-
+/**
+ * Match an endpoint with a topic & scope.
+ * @param endpoint The endpoints (mandatory)
+ * @param topic The topic (mandatory)
+ * @param scope The scope (can be NULL)
+ * @return true if the endpoint is for the provide topic and scope);
+ */
+bool pubsubEndpoint_matchWithTopicAndScope(const celix_properties_t* endpoint, const char *topic, const char *scope);
 
 
 #ifdef __cplusplus
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index a98044f..c1a715e 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -204,7 +204,8 @@ char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) {
     if (scope != NULL) {
         asprintf(&result, "%s:%s", scope, topic);
     } else {
-        asprintf(&result, "default:%s", topic);
+        //NOTE scope == NULL, equal to scope="default"
+        asprintf(&result, "%s", topic);
     }
     return result;
 }
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c
index 659ad5d..041f0f5 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c
@@ -22,6 +22,7 @@
 #include <pubsub_endpoint.h>
 #include <pubsub_serializer.h>
 #include <pubsub_protocol.h>
+#include <celix_api.h>
 
 #include "service_reference.h"
 
@@ -322,3 +323,13 @@ bool pubsubEndpoint_match(
 
     return match;
 }
+
+bool pubsubEndpoint_matchWithTopicAndScope(const celix_properties_t* endpoint, const char *topic, const char *scope) {
+    const char *endpointScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE);
+    const char *endpointTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    if (scope == NULL) {
+        scope = PUBSUB_DEFAULT_ENDPOINT_SCOPE;
+    }
+
+    return celix_utils_stringEquals(topic, endpointTopic) && celix_utils_stringEquals(scope, endpointScope);
+}
diff --git a/bundles/pubsub/test/test/test_runner.cc b/bundles/pubsub/test/test/test_runner.cc
index 9ed12da..3ef3faa 100644
--- a/bundles/pubsub/test/test/test_runner.cc
+++ b/bundles/pubsub/test/test/test_runner.cc
@@ -31,58 +31,58 @@ int main(int argc, char **argv) {
 }
 
 TEST_GROUP(PUBSUB_INT_GROUP) {
-    celix_framework_t *fw = NULL;
-    celix_bundle_context_t *ctx = NULL;
-    void setup() override {
-        celixLauncher_launch("config.properties", &fw);
-        ctx = celix_framework_getFrameworkContext(fw);
-    }
+        celix_framework_t * fw = NULL;
+        celix_bundle_context_t *ctx = NULL;
+        void setup() override {
+            celixLauncher_launch("config.properties", &fw);
+            ctx = celix_framework_getFrameworkContext(fw);
+        }
 
-    void teardown() override {
-        celixLauncher_stop(fw);
-        celixLauncher_waitForShutdown(fw);
-        celixLauncher_destroy(fw);
-        ctx = NULL;
-        fw = NULL;
-    }
+        void teardown() override {
+            celixLauncher_stop(fw);
+            celixLauncher_waitForShutdown(fw);
+            celixLauncher_destroy(fw);
+            ctx = NULL;
+            fw = NULL;
+        }
 };
 
 TEST_GROUP(PUBSUB_INT_ENV_GROUP) {
-    celix_framework_t *fw = NULL;
-    celix_bundle_context_t *ctx = NULL;
-    void setup() override {
-        setenv("PSA_TCP_STATIC_BIND_URL_FOR_ping", "tcp://localhost:9001", 1);
-        setenv("PSA_TCP_STATIC_CONNECT_URL_FOR_ping", "tcp://localhost:9001", 1);
-        setenv("PSA_UDPMC_STATIC_BIND_PORT_FOR_ping", "9001", 1);
-        setenv("PSA_UDPMC_STATIC_CONNECT_URLS_FOR_ping", "224.100.0.1:9001", 1);
-        setenv("PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES_FOR_ping", "127.0.0.1:9001", 1);
-        setenv("CELIX_HTTP_ADMIN_LISTENING_PORTS", "9001", 1);
-        setenv("PSA_ZMQ_STATIC_BIND_URL_FOR_ping", "ipc:///tmp/pubsub-envtest", 1);
-        setenv("PSA_ZMQ_STATIC_CONNECT_URL_FOR_ping", "ipc:///tmp/pubsub-envtest", 1);
+        celix_framework_t * fw = NULL;
+        celix_bundle_context_t *ctx = NULL;
+        void setup() override {
+            setenv("PSA_TCP_STATIC_BIND_URL_FOR_ping", "tcp://localhost:9001", 1);
+            setenv("PSA_TCP_STATIC_CONNECT_URL_FOR_ping", "tcp://localhost:9001", 1);
+            setenv("PSA_UDPMC_STATIC_BIND_PORT_FOR_ping", "9001", 1);
+            setenv("PSA_UDPMC_STATIC_CONNECT_URLS_FOR_ping", "224.100.0.1:9001", 1);
+            setenv("PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES_FOR_ping", "127.0.0.1:9001", 1);
+            setenv("CELIX_HTTP_ADMIN_LISTENING_PORTS", "9001", 1);
+            setenv("PSA_ZMQ_STATIC_BIND_URL_FOR_ping", "ipc:///tmp/pubsub-envtest", 1);
+            setenv("PSA_ZMQ_STATIC_CONNECT_URL_FOR_ping", "ipc:///tmp/pubsub-envtest", 1);
 
-        celixLauncher_launch("config.properties", &fw);
-        ctx = celix_framework_getFrameworkContext(fw);
-    }
+            celixLauncher_launch("config.properties", &fw);
+            ctx = celix_framework_getFrameworkContext(fw);
+        }
 
-    void teardown() override {
-        celixLauncher_stop(fw);
-        celixLauncher_waitForShutdown(fw);
-        celixLauncher_destroy(fw);
-        ctx = NULL;
-        fw = NULL;
-        unsetenv("PSA_TCP_STATIC_BIND_URL_FOR_ping");
-        unsetenv("PSA_TCP_STATIC_CONNECT_URL_FOR_ping");
-        unsetenv("PSA_UDPMC_STATIC_BIND_PORT_FOR_ping");
-        unsetenv("PSA_UDPMC_STATIC_CONNECT_URLS_FOR_ping");
-        unsetenv("PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES_FOR_ping");
-        unsetenv("CELIX_HTTP_ADMIN_LISTENING_PORTS");
-        unsetenv("PSA_ZMQ_STATIC_BIND_URL_FOR_ping");
-        unsetenv("PSA_ZMQ_STATIC_CONNECT_URL_FOR_ping");
-    }
+        void teardown() override {
+            celixLauncher_stop(fw);
+            celixLauncher_waitForShutdown(fw);
+            celixLauncher_destroy(fw);
+            ctx = NULL;
+            fw = NULL;
+            unsetenv("PSA_TCP_STATIC_BIND_URL_FOR_ping");
+            unsetenv("PSA_TCP_STATIC_CONNECT_URL_FOR_ping");
+            unsetenv("PSA_UDPMC_STATIC_BIND_PORT_FOR_ping");
+            unsetenv("PSA_UDPMC_STATIC_CONNECT_URLS_FOR_ping");
+            unsetenv("PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES_FOR_ping");
+            unsetenv("CELIX_HTTP_ADMIN_LISTENING_PORTS");
+            unsetenv("PSA_ZMQ_STATIC_BIND_URL_FOR_ping");
+            unsetenv("PSA_ZMQ_STATIC_CONNECT_URL_FOR_ping");
+        }
 };
 
 void receiveTest(celix_bundle_context_t *ctx) {
-    constexpr int TRIES = 25;
+    constexpr int TRIES = 40;
     constexpr int TIMEOUT = 250000;
     constexpr int MSG_COUNT = 100;
 
@@ -91,8 +91,8 @@ void receiveTest(celix_bundle_context_t *ctx) {
     for (int i = 0; i < TRIES; ++i) {
         count = 0;
         celix_bundleContext_useService(ctx, CELIX_RECEIVE_COUNT_SERVICE_NAME, &count, [](void *handle, void *svc) {
-            auto* count_ptr = static_cast<int*>(handle);
-            auto* count = static_cast<celix_receive_count_service_t*>(svc);
+            auto *count_ptr = static_cast<int *>(handle);
+            auto *count = static_cast<celix_receive_count_service_t *>(svc);
             *count_ptr = count->receiveCount(count->handle);
         });
         printf("Current msg count is %i, waiting for at least %i\n", count, MSG_COUNT);
diff --git a/libs/utils/gtest/src/LogUtilsTestSuite.cc b/libs/utils/gtest/src/LogUtilsTestSuite.cc
index 0123ee6..f0062b0 100644
--- a/libs/utils/gtest/src/LogUtilsTestSuite.cc
+++ b/libs/utils/gtest/src/LogUtilsTestSuite.cc
@@ -23,8 +23,6 @@
 
 class LogUtilsTestSuite : public ::testing::Test {};
 
-
-
 TEST_F(LogUtilsTestSuite, LogLevelToString) {
     EXPECT_STREQ("trace", celix_logUtils_logLevelToString(CELIX_LOG_LEVEL_TRACE));
     EXPECT_STREQ("debug", celix_logUtils_logLevelToString(CELIX_LOG_LEVEL_DEBUG));