You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/29 14:53:20 UTC

[celix] 01/02: Revert "Removes adding of default scope in pubsub discovery."

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections
in repository https://gitbox.apache.org/repos/asf/celix.git

commit d3d908a4f0b737d8f6f818af4f38fa5a7e960c6e
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jun 29 16:43:11 2020 +0200

    Revert "Removes adding of default scope in pubsub discovery."
    
    This reverts commit e10d9835c8fba6c4086406d36841431e9810fdd6.
---
 .../examples/pubsub/publisher/CMakeLists.txt       |  1 -
 .../examples/pubsub/subscriber/CMakeLists.txt      |  2 +-
 .../src/pubsub_tcp_topic_receiver.c                |  9 +++----
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  6 +----
 .../src/pubsub_zmq_topic_receiver.c                |  6 +----
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c |  6 +----
 .../pubsub_discovery/src/pubsub_discovery_impl.c   | 29 ++++++++++------------
 .../pubsub/pubsub_spi/include/pubsub_constants.h   |  5 ++++
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    |  2 ++
 .../src/pubsub_topology_manager.c                  | 18 +++++++-------
 10 files changed, 37 insertions(+), 47 deletions(-)

diff --git a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
index 3b9ea50..c68d533 100644
--- a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
@@ -25,7 +25,6 @@ add_celix_bundle(celix_pubsub_poi_publisher
 
 target_link_libraries(celix_pubsub_poi_publisher PRIVATE Celix::framework Celix::pubsub_api)
 target_include_directories(celix_pubsub_poi_publisher PRIVATE private/include)
-target_compile_definitions(celix_pubsub_poi_publisher PRIVATE USE_SCOPE)
 
 celix_bundle_files(celix_pubsub_poi_publisher
         ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
diff --git a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
index 8bee482..94acdb2 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
@@ -25,7 +25,7 @@ add_celix_bundle(celix_pubsub_poi_subscriber
 
 target_link_libraries(celix_pubsub_poi_subscriber PRIVATE Celix::framework Celix::pubsub_api)
 target_include_directories(celix_pubsub_poi_subscriber PRIVATE private/include)
-target_compile_definitions(celix_pubsub_poi_subscriber PRIVATE USE_SCOPE)
+
 
 celix_bundle_files(celix_pubsub_poi_subscriber
         ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 8ecb7df..eb6afbd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -641,11 +641,10 @@ static void *psa_tcp_recvThread(void *data) {
 
 pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    if (receiver->scope != NULL) {
-        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
-    } else {
-        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
-    }
+    snprintf(result->scope,
+             PUBSUB_AMDIN_METRICS_NAME_MAX,
+             "%s",
+             receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
 
     int msgTypesCount = 0;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 809ab7f..47dc888 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -449,11 +449,7 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
 
 pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) {
     pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-    if (sender->scope != NULL) {
-        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
-    } else {
-        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
-    }
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
     celixThreadMutex_lock(&sender->boundedServices.mutex);
     size_t count = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 2ae58af..088474a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -711,11 +711,7 @@ static void* psa_zmq_recvThread(void * data) {
 
 pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    if (receiver->scope != NULL) {
-        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
-    } else {
-        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
-    }
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
 
     int msgTypesCount = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 85c488b..413f1b3 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -464,11 +464,7 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
 
 pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender) {
     pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-    if (sender->scope != NULL) {
-        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
-    } else {
-        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
-    }
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
     celixThreadMutex_lock(&sender->boundedServices.mutex);
     size_t count = 0;
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 5e1b702..c6d1aa9 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -398,11 +398,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
         clock_gettime(CLOCK_MONOTONIC, &entry->createTime);
         entry->isSet = false;
         entry->properties = celix_properties_copy(endpoint);
-        if (scope == NULL) {
-            asprintf(&entry->key, "/pubsub/%s/%s/%s", config, topic, uuid);
-        } else {
-            asprintf(&entry->key, "/pubsub/%s/%s__%s/%s", config, scope, topic, uuid);
-        }
+        asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : scope, topic, uuid);
 
         const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, NULL);
         celixThreadMutex_lock(&disc->announcedEndpointsMutex);
@@ -413,7 +409,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
         celixThreadCondition_broadcast(&disc->waitCond);
         celixThreadMutex_unlock(&disc->runningMutex);
     } else if (valid) {
-        L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(empty)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility);
+        L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(null)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility);
     }
 
     if (!valid) {
@@ -468,8 +464,8 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel
         if (disc->verbose) {
             const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
             const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-            const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)");
-            const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)");
+            const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
+            const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
             L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol is %s.\n",
                    uuid, type, admin, ser, prot);
         }
@@ -499,8 +495,8 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
     if (disc->verbose) {
         const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
         const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)");
-        const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)");
+        const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
+        const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
         L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol = %s.\n",
                uuid, type, admin, ser, prot);
     }
@@ -576,6 +572,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
 
     struct timespec now;
     clock_gettime(CLOCK_MONOTONIC, &now);
+    //TODO add support for query (scope / topic)
 
     fprintf(os, "\n");
     fprintf(os, "Discovery configuration:\n");
@@ -592,11 +589,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
     while (hashMapIterator_hasNext(&iter)) {
         celix_properties_t *ep = hashMapIterator_nextValue(&iter);
         const char *uuid = celix_properties_get(ep, PUBSUB_ENDPOINT_UUID, "!Error!");
-        const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
+        const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
         const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
-        const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+        const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!");
         fprintf(os, "Endpoint %s:\n", uuid);
         fprintf(os, "   |- type          = %s\n", type);
@@ -615,11 +612,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
         const char *uuid = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, "!Error!");
-        const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
+        const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
         const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
-        const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+        const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE, "!Error!");
         int age = (int)(now.tv_sec - entry->createTime.tv_sec);
         fprintf(os, "Endpoint %s:\n", uuid);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
index 671b874..00ee6b4 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -39,4 +39,9 @@
  */
 #define PUBSUB_ENDPOINT_LOCAL_VISIBILITY     "local"
 
+/**
+ * Default scope, if not scope is specified endpoints are published using this scope
+ */
+#define PUBSUB_DEFAULT_ENDPOINT_SCOPE        "default"
+
 #endif /* PUBSUB_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 540ecda..2aa052d 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -65,6 +65,8 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID,
 
     if (scope != NULL) {
         celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
+    } else {
+        celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE);
     }
 
     if (topic != NULL) {
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 434de1e..2b063ef 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -1009,11 +1009,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!");
         const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!");
         const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
-        const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
+        const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)");
         const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
-        const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+        const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
         fprintf(os, "   |- container name = %s\n", cn);
         fprintf(os, "   |- fw uuid        = %s\n", fwuuid);
@@ -1044,10 +1044,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         }
         const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
         const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
-        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
-        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope);
+        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(null)" : entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);
@@ -1074,10 +1074,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         }
         const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
         const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
-        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
-        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope);
+        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(null)" : entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);