You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/29 18:57:26 UTC
[celix] branch bugfix/zmq_wrong_sender_connections updated: Changes
handling of scopeless and default scoped topic sendes/receivers.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/bugfix/zmq_wrong_sender_connections by this push:
new 1c36d3f Changes handling of scopeless and default scoped topic sendes/receivers.
1c36d3f is described below
commit 1c36d3f052e2bcc304ebff34ce31ba949a0f424e
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jun 29 20:57:02 2020 +0200
Changes handling of scopeless and default scoped topic sendes/receivers.
In the udpate setup a scopeless (scope==NULL) and default scoped (scope="default") will have their own topic sender and receiver, but will connect to each other through discovery. This is done so that requesting a scopeless publisher will result in a service without a scope property and requesting a default scoped publisher will result in a publisher with a "default" property.
---
.../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 14 ++++-----
.../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c | 13 ++++-----
.../src/pubsub_websocket_admin.c | 13 ++++-----
.../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 33 ++++++++++++++++------
.../src/pubsub_zmq_topic_receiver.c | 4 +--
.../pubsub/pubsub_spi/include/pubsub_endpoint.h | 7 ++++-
bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 2 +-
7 files changed, 51 insertions(+), 35 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index c13442f..e7ad284 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -610,15 +610,13 @@ celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_p
if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
- const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
- pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
- if (receiver != NULL) {
- pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_tcpTopicReceiver_topic(receiver), pubsub_tcpTopicReceiver_scope(receiver))) {
+ pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
}
- free(key);
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
}
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index e152b94..18657a1 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -472,13 +472,12 @@ celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_propertie
if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
- const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
- pubsub_udpmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
- if (receiver != NULL) {
- pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_udpmcTopicReceiver_topic(receiver), pubsub_udpmcTopicReceiver_scope(receiver))) {
+ pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
index 924ddef..ba33252 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -460,13 +460,12 @@ celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const c
if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
- const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
- pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
- if (receiver != NULL) {
- pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_websocketTopicReceiver_topic(receiver), pubsub_websocketTopicReceiver_scope(receiver))) {
+ pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index 2c1c7b5..296705d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -658,15 +658,13 @@ celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_p
if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
- const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
- pubsub_zmq_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
- if (receiver != NULL) {
- pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_zmqTopicReceiver_topic(receiver), pubsub_zmqTopicReceiver_scope(receiver))) {
+ pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
}
- free(key);
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
}
@@ -723,10 +721,27 @@ celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celi
return status;
}
-bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
+bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *out, FILE *errStream __attribute__((unused))) {
pubsub_zmq_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
+
+ char *line = celix_utils_strdup(commandLine);
+ char *token = line;
+ strtok_r(line, " ", &token); //first token is command name
+ strtok_r(NULL, " ", &token); //second token is sub command
+
+ if (celix_utils_stringEquals(token, "nr_of_receivers")) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map));
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+ if (celix_utils_stringEquals(token, "nr_of_senders")) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map));
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ }
+
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
celixThreadMutex_lock(&psa->serializers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 088474a..10fc49e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -415,8 +415,8 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
- if (receiver->scope == NULL){
- if (subScope != NULL){
+ if (receiver->scope == NULL) {
+ if (subScope != NULL) {
return;
}
} else if (subScope != NULL) {
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index e1f8f33..2e1cfe9 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -65,7 +65,12 @@ bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properti
bool
pubsubEndpoint_isValid(const celix_properties_t *endpointProperties, bool requireAdminType, bool requireSerializerType);
-
+/**
+ * Create a key based on scope an topic.
+ * Scope can be NULL.
+ * Note that NULL, "topic" and "default", "topic" will result in different keys
+ * @return a newly created key. caller is responsible for freeing the string array.
+ */
char *pubsubEndpoint_createScopeTopicKey(const char *scope, const char *topic);
/**
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 7ef620d..c1a715e 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -205,7 +205,7 @@ char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) {
asprintf(&result, "%s:%s", scope, topic);
} else {
//NOTE scope == NULL, equal to scope="default"
- asprintf(&result, "%s:%s", PUBSUB_DEFAULT_ENDPOINT_SCOPE, topic);
+ asprintf(&result, "%s", topic);
}
return result;
}