You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/29 14:53:20 UTC
[celix] 01/02: Revert "Removes adding of default scope in pubsub
discovery."
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections
in repository https://gitbox.apache.org/repos/asf/celix.git
commit d3d908a4f0b737d8f6f818af4f38fa5a7e960c6e
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jun 29 16:43:11 2020 +0200
Revert "Removes adding of default scope in pubsub discovery."
This reverts commit e10d9835c8fba6c4086406d36841431e9810fdd6.
---
.../examples/pubsub/publisher/CMakeLists.txt | 1 -
.../examples/pubsub/subscriber/CMakeLists.txt | 2 +-
.../src/pubsub_tcp_topic_receiver.c | 9 +++----
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 6 +----
.../src/pubsub_zmq_topic_receiver.c | 6 +----
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 6 +----
.../pubsub_discovery/src/pubsub_discovery_impl.c | 29 ++++++++++------------
.../pubsub/pubsub_spi/include/pubsub_constants.h | 5 ++++
bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 2 ++
.../src/pubsub_topology_manager.c | 18 +++++++-------
10 files changed, 37 insertions(+), 47 deletions(-)
diff --git a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
index 3b9ea50..c68d533 100644
--- a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
@@ -25,7 +25,6 @@ add_celix_bundle(celix_pubsub_poi_publisher
target_link_libraries(celix_pubsub_poi_publisher PRIVATE Celix::framework Celix::pubsub_api)
target_include_directories(celix_pubsub_poi_publisher PRIVATE private/include)
-target_compile_definitions(celix_pubsub_poi_publisher PRIVATE USE_SCOPE)
celix_bundle_files(celix_pubsub_poi_publisher
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
diff --git a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
index 8bee482..94acdb2 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
@@ -25,7 +25,7 @@ add_celix_bundle(celix_pubsub_poi_subscriber
target_link_libraries(celix_pubsub_poi_subscriber PRIVATE Celix::framework Celix::pubsub_api)
target_include_directories(celix_pubsub_poi_subscriber PRIVATE private/include)
-target_compile_definitions(celix_pubsub_poi_subscriber PRIVATE USE_SCOPE)
+
celix_bundle_files(celix_pubsub_poi_subscriber
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 8ecb7df..eb6afbd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -641,11 +641,10 @@ static void *psa_tcp_recvThread(void *data) {
pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
- if (receiver->scope != NULL) {
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
- } else {
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
- }
+ snprintf(result->scope,
+ PUBSUB_AMDIN_METRICS_NAME_MAX,
+ "%s",
+ receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
int msgTypesCount = 0;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 809ab7f..47dc888 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -449,11 +449,7 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) {
pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
- if (sender->scope != NULL) {
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
- } else {
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
- }
+ snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
celixThreadMutex_lock(&sender->boundedServices.mutex);
size_t count = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 2ae58af..088474a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -711,11 +711,7 @@ static void* psa_zmq_recvThread(void * data) {
pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver) {
pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
- if (receiver->scope != NULL) {
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
- } else {
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
- }
+ snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
int msgTypesCount = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 85c488b..413f1b3 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -464,11 +464,7 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender) {
pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
- if (sender->scope != NULL) {
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
- } else {
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
- }
+ snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
celixThreadMutex_lock(&sender->boundedServices.mutex);
size_t count = 0;
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 5e1b702..c6d1aa9 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -398,11 +398,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
clock_gettime(CLOCK_MONOTONIC, &entry->createTime);
entry->isSet = false;
entry->properties = celix_properties_copy(endpoint);
- if (scope == NULL) {
- asprintf(&entry->key, "/pubsub/%s/%s/%s", config, topic, uuid);
- } else {
- asprintf(&entry->key, "/pubsub/%s/%s__%s/%s", config, scope, topic, uuid);
- }
+ asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : scope, topic, uuid);
const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, NULL);
celixThreadMutex_lock(&disc->announcedEndpointsMutex);
@@ -413,7 +409,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
celixThreadCondition_broadcast(&disc->waitCond);
celixThreadMutex_unlock(&disc->runningMutex);
} else if (valid) {
- L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(empty)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility);
+ L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(null)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility);
}
if (!valid) {
@@ -468,8 +464,8 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel
if (disc->verbose) {
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)");
- const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)");
+ const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
+ const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol is %s.\n",
uuid, type, admin, ser, prot);
}
@@ -499,8 +495,8 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
if (disc->verbose) {
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)");
- const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)");
+ const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
+ const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol = %s.\n",
uuid, type, admin, ser, prot);
}
@@ -576,6 +572,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
+ //TODO add support for query (scope / topic)
fprintf(os, "\n");
fprintf(os, "Discovery configuration:\n");
@@ -592,11 +589,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
while (hashMapIterator_hasNext(&iter)) {
celix_properties_t *ep = hashMapIterator_nextValue(&iter);
const char *uuid = celix_properties_get(ep, PUBSUB_ENDPOINT_UUID, "!Error!");
- const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
+ const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
- const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+ const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!");
fprintf(os, "Endpoint %s:\n", uuid);
fprintf(os, " |- type = %s\n", type);
@@ -615,11 +612,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
while (hashMapIterator_hasNext(&iter)) {
pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
const char *uuid = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, "!Error!");
- const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
+ const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
- const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+ const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE, "!Error!");
int age = (int)(now.tv_sec - entry->createTime.tv_sec);
fprintf(os, "Endpoint %s:\n", uuid);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
index 671b874..00ee6b4 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -39,4 +39,9 @@
*/
#define PUBSUB_ENDPOINT_LOCAL_VISIBILITY "local"
+/**
+ * Default scope, if not scope is specified endpoints are published using this scope
+ */
+#define PUBSUB_DEFAULT_ENDPOINT_SCOPE "default"
+
#endif /* PUBSUB_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 540ecda..2aa052d 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -65,6 +65,8 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID,
if (scope != NULL) {
celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
+ } else {
+ celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE);
}
if (topic != NULL) {
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 434de1e..2b063ef 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -1009,11 +1009,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!");
const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!");
const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
- const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
+ const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)");
const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
- const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+ const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
fprintf(os, " |- container name = %s\n", cn);
fprintf(os, " |- fw uuid = %s\n", fwuuid);
@@ -1044,10 +1044,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
}
const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
- const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+ const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
- fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope);
+ fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(null)" : entry->scope);
fprintf(os, " |- topic = %s\n", entry->topic);
fprintf(os, " |- admin type = %s\n", adminType);
fprintf(os, " |- serializer = %s\n", serType);
@@ -1074,10 +1074,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
}
const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
- const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
+ const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
- fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope);
+ fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(null)" : entry->scope);
fprintf(os, " |- topic = %s\n", entry->topic);
fprintf(os, " |- admin type = %s\n", adminType);
fprintf(os, " |- serializer = %s\n", serType);