You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by ab...@apache.org on 2020/03/31 09:20:39 UTC
[celix] branch feature/scope_usage created (now a2dd431)
This is an automated email from the ASF dual-hosted git repository.
abroekhuis pushed a change to branch feature/scope_usage
in repository https://gitbox.apache.org/repos/asf/celix.git.
at a2dd431 Updated ZMQ implementation to use NULL if scope is not set. Only when needed, "default" is used. Added exclude filter to example for scope, to make sure that the right publisher is found. If a publisher without scope, and a publisher with a scope, use the same topic, this could result in getting the wrong publisher service.
This branch includes the following new commits:
new a2dd431 Updated ZMQ implementation to use NULL if scope is not set. Only when needed, "default" is used. Added exclude filter to example for scope, to make sure that the right publisher is found. If a publisher without scope, and a publisher with a scope, use the same topic, this could result in getting the wrong publisher service.
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[celix] 01/01: Updated ZMQ implementation to use NULL if scope is
not set. Only when needed,
"default" is used. Added exclude filter to example for scope,
to make sure that the right publisher is found. If a publisher without scope,
and a publisher with a scope, use the same topic,
this could result in getting the wrong publisher service.
Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
abroekhuis pushed a commit to branch feature/scope_usage
in repository https://gitbox.apache.org/repos/asf/celix.git
commit a2dd43189114a0f97db78c6a973876ad55a8e086
Author: Alexander Broekhuis <al...@luminis.eu>
AuthorDate: Tue Mar 31 11:20:18 2020 +0200
Updated ZMQ implementation to use NULL if scope is not set. Only when needed, "default" is used.
Added exclude filter to example for scope, to make sure that the right publisher is found. If a publisher without scope, and a publisher with a scope, use the same topic, this could result in getting the wrong publisher service.
---
.../publisher/private/src/ps_pub_activator.c | 2 +-
.../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 12 ++++---
.../src/pubsub_zmq_topic_receiver.c | 20 ++++++++----
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 10 ++++--
.../pubsub/pubsub_api/include/pubsub/subscriber.h | 2 +-
.../pubsub_discovery/src/pubsub_discovery_impl.c | 2 +-
bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 17 ++++++----
bundles/pubsub/pubsub_spi/src/pubsub_utils.c | 24 +++-----------
.../src/pubsub_topology_manager.c | 38 +++++++++++++++-------
9 files changed, 73 insertions(+), 54 deletions(-)
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
index d545cbc..c87e8e0 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
@@ -62,7 +62,7 @@ static int pub_start(struct publisherActivator *act, celix_bundle_context_t *ctx
snprintf(filter, 128, "(%s=%s)(%s=%s)", PUBSUB_PUBLISHER_TOPIC, topic, PUBSUB_PUBLISHER_SCOPE, scope);
free(scope);
#else
- snprintf(filter, 128, "(%s=%s)", (char*) PUBSUB_PUBLISHER_TOPIC, topic);
+ snprintf(filter, 128, "(&(%s=%s)(!(%s=*)))", (char*) PUBSUB_PUBLISHER_TOPIC, topic, PUBSUB_PUBLISHER_SCOPE);
#endif
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
opts.callbackHandle = act->client;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index 1130b03..3864d2e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -667,13 +667,17 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL);
const char *eProtocol = celix_properties_get(endpoint, PUBSUB_ENDPOINT_PROTOCOL, NULL);
- if (scope != NULL && topic != NULL && serializer != NULL && protocol != NULL
- && eScope != NULL && eTopic != NULL && eSerializer != NULL && eProtocol != NULL
- && strncmp(eScope, scope, 1024*1024) == 0
+ if (topic != NULL && serializer != NULL && protocol != NULL
+ && eTopic != NULL && eSerializer != NULL && eProtocol != NULL
&& strncmp(eTopic, topic, 1024*1024) == 0
&& strncmp(eSerializer, serializer, 1024*1024) == 0
&& strncmp(eProtocol, protocol, 1024*1024) == 0) {
- pubsub_zmqTopicReceiver_connectTo(receiver, url);
+ // Scope is not required
+ if (scope == NULL && eScope == NULL) {
+ pubsub_zmqTopicReceiver_connectTo(receiver, url);
+ } else if (scope != NULL && eScope != NULL && strncmp(scope, eScope, 1024*1024) == 0) {
+ pubsub_zmqTopicReceiver_connectTo(receiver, url);
+ }
}
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 979d373..e832aff 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -146,7 +146,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->serializer = serializer;
receiver->protocolSvcId = protocolSvcId;
receiver->protocol = protocol;
- receiver->scope = strndup(scope, 1024 * 1024);
+ receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
@@ -260,7 +260,9 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
}
if (receiver->zmqSock == NULL) {
- free(receiver->scope);
+ if (receiver->scope != NULL) {
+ free(receiver->scope);
+ }
free(receiver->topic);
free(receiver);
receiver = NULL;
@@ -401,10 +403,16 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
pubsub_zmq_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
- const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
- if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
- //not the same scope. ignore
- return;
+ const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
+ if (receiver->scope == NULL){
+ if (subScope != NULL){
+ return;
+ }
+ } else {
+ if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+ //not the same scope. ignore
+ return;
+ }
}
celixThreadMutex_lock(&receiver->subscribers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index d1fb841..35f959b 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -252,7 +252,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
}
if (sender->url != NULL) {
- sender->scope = strndup(scope, 1024 * 1024);
+ sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
sender->topic = strndup(topic, 1024 * 1024);
celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
@@ -268,7 +268,9 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
- celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+ if (sender->scope != NULL) {
+ celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+ }
celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
opts.factory = &sender->publisher.factory;
@@ -318,7 +320,9 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
celixThreadMutex_destroy(&sender->boundedServices.mutex);
celixThreadMutex_destroy(&sender->zmq.mutex);
- free(sender->scope);
+ if (sender->scope != NULL) {
+ free(sender->scope);
+ }
free(sender->topic);
free(sender->url);
free(sender);
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index 0ec4c58..f2afa26 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -39,7 +39,7 @@
#define PUBSUB_SUBSCRIBER_SCOPE "scope"
#define PUBSUB_SUBSCRIBER_CONFIG "pubsub.config"
-#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default"
+//#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default"
struct pubsub_subscriber_struct {
void *handle;
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 283db2f..c6a661b 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -414,7 +414,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
clock_gettime(CLOCK_MONOTONIC, &entry->createTime);
entry->isSet = false;
entry->properties = celix_properties_copy(endpoint);
- asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope, topic, uuid);
+ asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope == NULL ? "default" : scope, topic, uuid);
const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, NULL);
celixThreadMutex_lock(&disc->announcedEndpointsMutex);
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index f9e74b2..2a4eb5e 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -122,7 +122,7 @@ celix_properties_t* pubsubEndpoint_createFromSubscriberSvc(bundle_context_t* ctx
celix_properties_t *ep = celix_properties_create();
const char* fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
- const char* scope = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
+ const char* scope = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_SCOPE, NULL);
const char* topic = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_TOPIC, NULL);
struct retrieve_topic_properties_data data;
@@ -157,7 +157,7 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context
char* topic = NULL;
char* scopeFromFilter = NULL;
pubsub_getPubSubInfoFromFilter(filter, &topic, &scopeFromFilter);
- const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+ const char *scope = scopeFromFilter;
struct retrieve_topic_properties_data data;
data.props = NULL;
@@ -176,7 +176,9 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context
}
free(topic);
- free(scopeFromFilter);
+ if (scope != NULL) {
+ free(scopeFromFilter);
+ }
return ep;
}
@@ -194,7 +196,11 @@ bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properti
char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) {
char *result = NULL;
- asprintf(&result, "%s:%s", scope, topic);
+ if (scope != NULL) {
+ asprintf(&result, "%s:%s", scope, topic);
+ } else {
+ asprintf(&result, "default:%s", topic);
+ }
return result;
}
@@ -220,7 +226,6 @@ bool pubsubEndpoint_isValid(const celix_properties_t *props, bool requireAdminTy
checkProp(props, PUBSUB_ENDPOINT_SERIALIZER);
}
bool p6 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_NAME);
- bool p7 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_SCOPE);
- return p1 && p2 && p3 && p4 && p5 && p6 && p7;
+ return p1 && p2 && p3 && p4 && p5 && p6;
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
index 18005b6..078ce57 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -43,25 +43,9 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi
const char *scope = NULL;
const char *objectClass = NULL;
celix_filter_t *filter = celix_filter_create(filterstr);
- if (filter != NULL) {
- if (filter->operand == CELIX_FILTER_OPERAND_AND) { //only and pubsub filter valid (e.g. (&(objectClass=pubsub_publisher)(topic=example))
- array_list_t *attributes = filter->children;
- unsigned int i;
- unsigned int size = arrayList_size(attributes);
- for (i = 0; i < size; ++i) {
- filter_t *attr = arrayList_get(attributes, i);
- if (attr->operand == CELIX_FILTER_OPERAND_EQUAL) {
- if (strncmp(OSGI_FRAMEWORK_OBJECTCLASS, attr->attribute, 128) == 0) {
- objectClass = attr->value;
- } else if (strncmp(PUBSUB_PUBLISHER_TOPIC, attr->attribute, 128) == 0) {
- topic = attr->value;
- } else if (strncmp(PUBSUB_PUBLISHER_SCOPE, attr->attribute, 128) == 0) {
- scope = attr->value;
- }
- }
- }
- }
- }
+ scope = (char *) celix_filter_findAttribute(filter, PUBSUB_PUBLISHER_SCOPE);
+ topic = (char *) celix_filter_findAttribute(filter, PUBSUB_PUBLISHER_TOPIC);
+ objectClass = (char *) celix_filter_findAttribute(filter, OSGI_FRAMEWORK_OBJECTCLASS);
if (topic != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) {
//NOTE topic must be present, scope can be present in the filter.
@@ -92,7 +76,7 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi
char* pubsub_getKeysBundleDir(celix_bundle_context_t *ctx) {
array_list_pt bundles = NULL;
bundleContext_getBundles(ctx, &bundles);
- int nrOfBundles = arrayList_size(bundles);
+ uint32_t nrOfBundles = arrayList_size(bundles);
long bundle_id = -1;
char* result = NULL;
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 5ec5140..4d163d0 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -126,7 +126,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
free(entry->scopeAndTopicKey);
- free(entry->scope);
+ if (entry->scope != NULL) {
+ free(entry->scope);
+ }
free(entry->topic);
if (entry->topicProperties != NULL) {
celix_properties_destroy(entry->topicProperties);
@@ -148,7 +150,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
free(entry->scopeAndTopicKey);
- free(entry->scope);
+ if (entry->scope != NULL) {
+ free(entry->scope);
+ }
free(entry->topic);
if (entry->topicProperties != NULL) {
celix_properties_destroy(entry->topicProperties);
@@ -314,7 +318,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
//3) signal psaHandling thread to setup topic receiver
const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
- const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (topic == NULL) {
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
"[PSTM] Warning found subscriber service without mandatory '%s' property.",
@@ -334,7 +338,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
} else {
entry = calloc(1, sizeof(*entry));
entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
- entry->scope = strndup(scope, 1024 * 1024);
+ entry->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
entry->topic = strndup(topic, 1024 * 1024);
entry->usageCount = 1;
entry->selectedPsaSvcId = -1L;
@@ -362,7 +366,7 @@ void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribut
//1) Find topic receiver and decrease count
const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
- const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (topic == NULL) {
return;
@@ -437,7 +441,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
char *topicFromFilter = NULL;
char *scopeFromFilter = NULL;
pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter);
- char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter;
+ char *scope = scopeFromFilter;
char *topic = topicFromFilter;
char *scopeAndTopicKey = NULL;
@@ -453,7 +457,9 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
if (entry != NULL) {
entry->usageCount += 1;
- free(scope);
+ if (scope != NULL) {
+ free(scope);
+ }
free(topic);
free(scopeAndTopicKey);
} else {
@@ -491,10 +497,12 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
char *topic = NULL;
char *scopeFromFilter = NULL;
pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
- const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+ const char *scope = scopeFromFilter;
if (topic == NULL) {
- free(scopeFromFilter);
+ if (scopeFromFilter != NULL) {
+ free(scopeFromFilter);
+ }
return;
}
@@ -509,7 +517,9 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
free(scopeAndTopicKey);
free(topic);
- free(scopeFromFilter);
+ if (scopeFromFilter != NULL) {
+ free(scopeFromFilter);
+ }
}
celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
@@ -656,7 +666,9 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
//no usage -> remove
hashMapIterator_remove(&iter);
free(entry->scopeAndTopicKey);
- free(entry->scope);
+ if (entry->scope != NULL) {
+ free(entry->scope);
+ }
free(entry->topic);
if (entry->topicProperties != NULL) {
celix_properties_destroy(entry->topicProperties);
@@ -721,7 +733,9 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
hashMapIterator_remove(&iter);
//cleanup entry
free(entry->scopeAndTopicKey);
- free(entry->scope);
+ if (entry->scope != NULL) {
+ free(entry->scope);
+ }
free(entry->topic);
if (entry->topicProperties != NULL) {
celix_properties_destroy(entry->topicProperties);