You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by ab...@apache.org on 2020/03/31 09:20:40 UTC

[celix] 01/01: Updated ZMQ implementation to use NULL if scope is not set. Only when needed, "default" is used. Added exclude filter to example for scope, to make sure that the right publisher is found. If a publisher without scope, and a publisher with a scope, use the same topic, this could result in getting the wrong publisher service.

This is an automated email from the ASF dual-hosted git repository.

abroekhuis pushed a commit to branch feature/scope_usage
in repository https://gitbox.apache.org/repos/asf/celix.git

commit a2dd43189114a0f97db78c6a973876ad55a8e086
Author: Alexander Broekhuis <al...@luminis.eu>
AuthorDate: Tue Mar 31 11:20:18 2020 +0200

    Updated ZMQ implementation to use NULL if scope is not set. Only when needed, "default" is used.
    Added exclude filter to example for scope, to make sure that the right publisher is found. If a publisher without scope, and a publisher with a scope, use the same topic, this could result in getting the wrong publisher service.
---
 .../publisher/private/src/ps_pub_activator.c       |  2 +-
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 12 ++++---
 .../src/pubsub_zmq_topic_receiver.c                | 20 ++++++++----
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 10 ++++--
 .../pubsub/pubsub_api/include/pubsub/subscriber.h  |  2 +-
 .../pubsub_discovery/src/pubsub_discovery_impl.c   |  2 +-
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    | 17 ++++++----
 bundles/pubsub/pubsub_spi/src/pubsub_utils.c       | 24 +++-----------
 .../src/pubsub_topology_manager.c                  | 38 +++++++++++++++-------
 9 files changed, 73 insertions(+), 54 deletions(-)

diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
index d545cbc..c87e8e0 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
@@ -62,7 +62,7 @@ static int pub_start(struct publisherActivator *act, celix_bundle_context_t *ctx
         snprintf(filter, 128, "(%s=%s)(%s=%s)", PUBSUB_PUBLISHER_TOPIC, topic, PUBSUB_PUBLISHER_SCOPE, scope);
         free(scope);
 #else
-        snprintf(filter, 128, "(%s=%s)", (char*) PUBSUB_PUBLISHER_TOPIC, topic);
+        snprintf(filter, 128, "(&(%s=%s)(!(%s=*)))", (char*) PUBSUB_PUBLISHER_TOPIC, topic, PUBSUB_PUBLISHER_SCOPE);
 #endif
         celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
         opts.callbackHandle = act->client;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index 1130b03..3864d2e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -667,13 +667,17 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
         const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL);
         const char *eProtocol = celix_properties_get(endpoint, PUBSUB_ENDPOINT_PROTOCOL, NULL);
 
-        if (scope != NULL && topic != NULL && serializer != NULL && protocol != NULL
-                        && eScope != NULL && eTopic != NULL && eSerializer != NULL && eProtocol != NULL
-                        && strncmp(eScope, scope, 1024*1024) == 0
+        if (topic != NULL && serializer != NULL && protocol != NULL
+                        && eTopic != NULL && eSerializer != NULL && eProtocol != NULL
                         && strncmp(eTopic, topic, 1024*1024) == 0
                         && strncmp(eSerializer, serializer, 1024*1024) == 0
                         && strncmp(eProtocol, protocol, 1024*1024) == 0) {
-            pubsub_zmqTopicReceiver_connectTo(receiver, url);
+            // Scope is not required
+            if (scope == NULL && eScope == NULL) {
+                pubsub_zmqTopicReceiver_connectTo(receiver, url);
+            } else if (scope != NULL && eScope != NULL && strncmp(scope, eScope, 1024*1024) == 0) {
+                pubsub_zmqTopicReceiver_connectTo(receiver, url);
+            }
         }
     }
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 979d373..e832aff 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -146,7 +146,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->serializer = serializer;
     receiver->protocolSvcId = protocolSvcId;
     receiver->protocol = protocol;
-    receiver->scope = strndup(scope, 1024 * 1024);
+    receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
@@ -260,7 +260,9 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     }
 
     if (receiver->zmqSock == NULL) {
-        free(receiver->scope);
+        if (receiver->scope != NULL) {
+            free(receiver->scope);
+        }
         free(receiver->topic);
         free(receiver);
         receiver = NULL;
@@ -401,10 +403,16 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
-    const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
-    if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
-        //not the same scope. ignore
-        return;
+    const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
+    if (receiver->scope == NULL){
+        if (subScope != NULL){
+            return;
+        }
+    } else {
+        if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+            //not the same scope. ignore
+            return;
+        }
     }
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index d1fb841..35f959b 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -252,7 +252,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     }
 
     if (sender->url != NULL) {
-        sender->scope = strndup(scope, 1024 * 1024);
+        sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
         sender->topic = strndup(topic, 1024 * 1024);
 
         celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
@@ -268,7 +268,9 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
 
         celix_properties_t *props = celix_properties_create();
         celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
-        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+        if (sender->scope != NULL) {
+            celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+        }
 
         celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
         opts.factory = &sender->publisher.factory;
@@ -318,7 +320,9 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->zmq.mutex);
 
-        free(sender->scope);
+        if (sender->scope != NULL) {
+            free(sender->scope);
+        }
         free(sender->topic);
         free(sender->url);
         free(sender);
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index 0ec4c58..f2afa26 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -39,7 +39,7 @@
 #define PUBSUB_SUBSCRIBER_SCOPE                "scope"
 #define PUBSUB_SUBSCRIBER_CONFIG               "pubsub.config"
 
-#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT        "default"
+//#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT        "default"
 
 struct pubsub_subscriber_struct {
     void *handle;
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 283db2f..c6a661b 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -414,7 +414,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
         clock_gettime(CLOCK_MONOTONIC, &entry->createTime);
         entry->isSet = false;
         entry->properties = celix_properties_copy(endpoint);
-        asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope, topic, uuid);
+        asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope == NULL ? "default" : scope, topic, uuid);
 
         const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, NULL);
         celixThreadMutex_lock(&disc->announcedEndpointsMutex);
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index f9e74b2..2a4eb5e 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -122,7 +122,7 @@ celix_properties_t* pubsubEndpoint_createFromSubscriberSvc(bundle_context_t* ctx
     celix_properties_t *ep = celix_properties_create();
 
     const char* fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
-    const char* scope = celix_properties_get(svcProps,  PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
+    const char* scope = celix_properties_get(svcProps,  PUBSUB_SUBSCRIBER_SCOPE, NULL);
     const char* topic = celix_properties_get(svcProps,  PUBSUB_SUBSCRIBER_TOPIC, NULL);
 
     struct retrieve_topic_properties_data data;
@@ -157,7 +157,7 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context
     char* topic = NULL;
     char* scopeFromFilter = NULL;
     pubsub_getPubSubInfoFromFilter(filter, &topic, &scopeFromFilter);
-    const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+    const char *scope = scopeFromFilter;
 
     struct retrieve_topic_properties_data data;
     data.props = NULL;
@@ -176,7 +176,9 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context
     }
 
     free(topic);
-    free(scopeFromFilter);
+    if (scope != NULL) {
+        free(scopeFromFilter);
+    }
 
     return ep;
 }
@@ -194,7 +196,11 @@ bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properti
 
 char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) {
     char *result = NULL;
-    asprintf(&result, "%s:%s", scope, topic);
+    if (scope != NULL) {
+        asprintf(&result, "%s:%s", scope, topic);
+    } else {
+        asprintf(&result, "default:%s", topic);
+    }
     return result;
 }
 
@@ -220,7 +226,6 @@ bool pubsubEndpoint_isValid(const celix_properties_t *props, bool requireAdminTy
         checkProp(props, PUBSUB_ENDPOINT_SERIALIZER);
     }
     bool p6 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_NAME);
-    bool p7 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_SCOPE);
 
-    return p1 && p2 && p3 && p4 && p5 && p6 && p7;
+    return p1 && p2 && p3 && p4 && p5 && p6;
 }
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
index 18005b6..078ce57 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -43,25 +43,9 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi
     const char *scope = NULL;
     const char *objectClass = NULL;
     celix_filter_t *filter = celix_filter_create(filterstr);
-    if (filter != NULL) {
-        if (filter->operand == CELIX_FILTER_OPERAND_AND) { //only and pubsub filter valid (e.g. (&(objectClass=pubsub_publisher)(topic=example))
-            array_list_t *attributes = filter->children;
-            unsigned int i;
-            unsigned int size = arrayList_size(attributes);
-            for (i = 0; i < size; ++i) {
-                filter_t *attr = arrayList_get(attributes, i);
-                if (attr->operand == CELIX_FILTER_OPERAND_EQUAL) {
-                    if (strncmp(OSGI_FRAMEWORK_OBJECTCLASS, attr->attribute, 128) == 0) {
-                        objectClass = attr->value;
-                    } else if (strncmp(PUBSUB_PUBLISHER_TOPIC, attr->attribute, 128) == 0) {
-                        topic = attr->value;
-                    } else if (strncmp(PUBSUB_PUBLISHER_SCOPE, attr->attribute, 128) == 0) {
-                        scope = attr->value;
-                    }
-                }
-            }
-        }
-    }
+    scope = (char *) celix_filter_findAttribute(filter, PUBSUB_PUBLISHER_SCOPE);
+    topic = (char *) celix_filter_findAttribute(filter, PUBSUB_PUBLISHER_TOPIC);
+    objectClass = (char *) celix_filter_findAttribute(filter, OSGI_FRAMEWORK_OBJECTCLASS);
 
     if (topic != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) {
         //NOTE topic must be present, scope can be present in the filter.
@@ -92,7 +76,7 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi
 char* pubsub_getKeysBundleDir(celix_bundle_context_t *ctx) {
     array_list_pt bundles = NULL;
     bundleContext_getBundles(ctx, &bundles);
-    int nrOfBundles = arrayList_size(bundles);
+    uint32_t nrOfBundles = arrayList_size(bundles);
     long bundle_id = -1;
     char* result = NULL;
 
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 5ec5140..4d163d0 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -126,7 +126,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
         if (entry != NULL) {
             free(entry->scopeAndTopicKey);
-            free(entry->scope);
+            if (entry->scope != NULL) {
+                free(entry->scope);
+            }
             free(entry->topic);
             if (entry->topicProperties != NULL) {
                 celix_properties_destroy(entry->topicProperties);
@@ -148,7 +150,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
         if (entry != NULL) {
             free(entry->scopeAndTopicKey);
-            free(entry->scope);
+            if (entry->scope != NULL) {
+                free(entry->scope);
+            }
             free(entry->topic);
             if (entry->topicProperties != NULL) {
                 celix_properties_destroy(entry->topicProperties);
@@ -314,7 +318,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
     //3) signal psaHandling thread to setup topic receiver
 
     const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
-    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (topic == NULL) {
         logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
                       "[PSTM] Warning found subscriber service without mandatory '%s' property.",
@@ -334,7 +338,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
     } else {
         entry = calloc(1, sizeof(*entry));
         entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
-        entry->scope = strndup(scope, 1024 * 1024);
+        entry->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
         entry->topic = strndup(topic, 1024 * 1024);
         entry->usageCount = 1;
         entry->selectedPsaSvcId = -1L;
@@ -362,7 +366,7 @@ void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribut
     //1) Find topic receiver and decrease count
 
     const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
-    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
 
     if (topic == NULL) {
         return;
@@ -437,7 +441,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
     char *topicFromFilter = NULL;
     char *scopeFromFilter = NULL;
     pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter);
-    char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter;
+    char *scope = scopeFromFilter;
     char *topic = topicFromFilter;
 
     char *scopeAndTopicKey = NULL;
@@ -453,7 +457,9 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
     pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
     if (entry != NULL) {
         entry->usageCount += 1;
-        free(scope);
+        if (scope != NULL) {
+            free(scope);
+        }
         free(topic);
         free(scopeAndTopicKey);
     } else {
@@ -491,10 +497,12 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
     char *topic = NULL;
     char *scopeFromFilter = NULL;
     pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
-    const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+    const char *scope = scopeFromFilter;
 
     if (topic == NULL) {
-        free(scopeFromFilter);
+        if (scopeFromFilter != NULL) {
+            free(scopeFromFilter);
+        }
         return;
     }
 
@@ -509,7 +517,9 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
 
     free(scopeAndTopicKey);
     free(topic);
-    free(scopeFromFilter);
+    if (scopeFromFilter != NULL) {
+        free(scopeFromFilter);
+    }
 }
 
 celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
@@ -656,7 +666,9 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
                 //no usage -> remove
                 hashMapIterator_remove(&iter);
                 free(entry->scopeAndTopicKey);
-                free(entry->scope);
+                if (entry->scope != NULL) {
+                    free(entry->scope);
+                }
                 free(entry->topic);
                 if (entry->topicProperties != NULL) {
                     celix_properties_destroy(entry->topicProperties);
@@ -721,7 +733,9 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
                 hashMapIterator_remove(&iter);
                 //cleanup entry
                 free(entry->scopeAndTopicKey);
-                free(entry->scope);
+                if (entry->scope != NULL) {
+                    free(entry->scope);
+                }
                 free(entry->topic);
                 if (entry->topicProperties != NULL) {
                     celix_properties_destroy(entry->topicProperties);