You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:22 UTC
[05/34] celix git commit: CELIX-454: More PubSub refactoring. Started
creating a new skeleton for psa udpmc based on a updated pubsub spi.
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 834e3a8..85c67e9 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -16,24 +16,19 @@
*specific language governing permissions and limitations
*under the License.
*/
-/*
- * pubsub_topology_manager.c
- *
- * \date Sep 29, 2011
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
+
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <celix_api.h>
+#include <pubsub_utils.h>
+#include <assert.h>
#include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
+#include "celix_array_list.h"
+#include "celix_bundle_context.h"
#include "constants.h"
-#include "listener_hook_service.h"
#include "utils.h"
#include "log_service.h"
#include "log_helper.h"
@@ -42,697 +37,761 @@
#include "pubsub_topology_manager.h"
#include "pubsub_admin.h"
-static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) {
- for(hash_map_iterator_t iter = hashMapIterator_construct(endpoints); hashMapIterator_hasNext(&iter);) {
- const char* key = (const char*)hashMapIterator_nextKey(&iter);
- fprintf(outStream, " Topic=%s\n", key);
- array_list_pt ep_list = hashMap_get(endpoints, key);
- for(unsigned int i = 0; i < arrayList_size(ep_list); ++i) {
- pubsub_endpoint_pt ep = arrayList_get(ep_list, i);
- fprintf(outStream, " Endpoint %d\n", i);
- fprintf(outStream, " Endpoint properties\n");
- const char *propKey;
- if(ep->properties) {
- PROPERTIES_FOR_EACH(ep->properties, propKey) {
- fprintf(outStream, " %s => %s\n", propKey, celix_properties_get(ep->properties, propKey, NULL));
- }
- }
- }
- }
+#define PSTM_CLEANUP_SLEEPTIME_IN_SECONDS 5L
-}
+static void* pstm_psaHandlingThread(void *data);
-static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
- pubsub_topology_manager_t *manager = (pubsub_topology_manager_t*) handle;
- if (manager->publications && !hashMap_isEmpty(manager->publications)) {
- fprintf(outStream, "Publications:\n");
- print_endpoint_info(manager->publications, outStream);
- }
- if (manager->subscriptions && !hashMap_isEmpty(manager->subscriptions)) {
- fprintf(outStream, "Subscriptions:\n");
- print_endpoint_info(manager->subscriptions, outStream);
- }
- return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager) {
+celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **out) {
celix_status_t status = CELIX_SUCCESS;
- *manager = calloc(1, sizeof(**manager));
- if (!*manager) {
+ pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager));
+ if (manager == NULL) {
+ *out = NULL;
return CELIX_ENOMEM;
+ } else {
+ *out = manager;
}
- (*manager)->context = context;
+ manager->context = context;
celix_thread_mutexattr_t psaAttr;
celixThreadMutexAttr_create(&psaAttr);
celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
- status |= celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
+ status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
celixThreadMutexAttr_destroy(&psaAttr);
- status |= celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
- status |= celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
- status |= celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
+ status |= celixThreadMutex_create(&manager->announcedEndpoints.mutex, NULL);
+ status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
+ status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
+ status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
+ status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
+ status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
- arrayList_create(&(*manager)->psaList);
+ status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
- (*manager)->discoveryList = hashMap_create(NULL, NULL, NULL, NULL);
- (*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ manager->announcedEndpoints.map = hashMap_create(NULL, NULL, NULL, NULL);
+ manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ manager->announceEndpointListeners.list = celix_arrayList_create();
+ manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL);
+ manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- (*manager)->loghelper = logHelper;
- (*manager)->shellCmdService.handle = *manager;
- (*manager)->shellCmdService.executeCommand = shellCommand;
+ manager->loghelper = logHelper;
+ manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
- (*manager)->verbose = PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE;
- const char *verboseStr = NULL;
- bundleContext_getProperty(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, &verboseStr);
- if (verboseStr != NULL) {
- (*manager)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
- }
+ manager->psaHandling.running = true;
+ celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
+ celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager");
- properties_pt shellProps = properties_create();
- properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info");
- properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info");
- properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "ps_info: Overview of PubSub");
- bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &((*manager)->shellCmdService), shellProps, &((*manager)->shellCmdReg));
return status;
}
celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) {
celix_status_t status = CELIX_SUCCESS;
- celixThreadMutex_lock(&manager->discoveryListLock);
- hashMap_destroy(manager->discoveryList, false, false);
- celixThreadMutex_unlock(&manager->discoveryListLock);
- celixThreadMutex_destroy(&manager->discoveryListLock);
-
- celixThreadMutex_lock(&manager->psaListLock);
- arrayList_destroy(manager->psaList);
- celixThreadMutex_unlock(&manager->psaListLock);
- celixThreadMutex_destroy(&manager->psaListLock);
-
- celixThreadMutex_lock(&manager->publicationsLock);
- hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
- while(hashMapIterator_hasNext(pubit)){
- array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit);
- unsigned int i;
- for(i=0;i<arrayList_size(l);i++){
- pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
- }
- arrayList_destroy(l);
- }
- hashMapIterator_destroy(pubit);
- hashMap_destroy(manager->publications, true, false);
- celixThreadMutex_unlock(&manager->publicationsLock);
- celixThreadMutex_destroy(&manager->publicationsLock);
-
- celixThreadMutex_lock(&manager->subscriptionsLock);
- hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
- while(hashMapIterator_hasNext(subit)){
- array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit);
- unsigned int i;
- for(i=0;i<arrayList_size(l);i++){
- pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
- }
- arrayList_destroy(l);
- }
- hashMapIterator_destroy(subit);
- hashMap_destroy(manager->subscriptions, true, false);
- celixThreadMutex_unlock(&manager->subscriptionsLock);
- celixThreadMutex_destroy(&manager->subscriptionsLock);
- serviceRegistration_unregister(manager->shellCmdReg);
- free(manager);
+ celixThreadMutex_lock(&manager->psaHandling.mutex);
+ manager->psaHandling.running = false;
+ celixThreadCondition_broadcast(&manager->psaHandling.cond);
+ celixThreadMutex_unlock(&manager->psaHandling.mutex);
+ celixThread_join(manager->psaHandling.thread, NULL);
- return status;
-}
-void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
- pubsub_topology_manager_t *manager = handle;
- unsigned int i;
+ celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+ hashMap_destroy(manager->announcedEndpoints.map, false, false);
+ celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
+ celixThreadMutex_destroy(&manager->announcedEndpoints.mutex);
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc;
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hashMap_destroy(manager->pubsubadmins.map, false, false);
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+ celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
- celixThreadMutex_lock(&manager->psaListLock);
- arrayList_add(manager->psaList, psa);
- celixThreadMutex_unlock(&manager->psaListLock);
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ hashMap_destroy(manager->discoveredEndpoints.map, true, false);
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+ celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex);
- // Add already detected subscriptions to new PSA
- celixThreadMutex_lock(&manager->subscriptionsLock);
- hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions);
-
- //TODO FIXME no matching used, should only add unmatched subscribers ?
- //NOTE this is a bug which occurs when psa are started after bundles that uses the PSA
- while (hashMapIterator_hasNext(subscriptionsIterator)) {
- array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
- for(i=0;i<arrayList_size(sub_ep_list);i++){
- psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
- }
- }
+ free(manager);
- hashMapIterator_destroy(subscriptionsIterator);
+ return status;
+}
- celixThreadMutex_unlock(&manager->subscriptionsLock);
+void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+ pubsub_topology_manager_t *manager = handle;
+ pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
- // Add already detected publications to new PSA
- celixThreadMutex_lock(&manager->publicationsLock);
- hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA");
- //TODO FIXME no matching used, should only add unmatched publications ?
- //NOTE this is a bug which occurs when psa are started after bundles that uses the PSA
- while (hashMapIterator_hasNext(publicationsIterator)) {
- array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
- for(i=0;i<arrayList_size(pub_ep_list);i++){
- psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
- }
+ if (svcId >= 0) {
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hashMap_put(manager->pubsubadmins.map, (void*)svcId, psa);
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
}
- hashMapIterator_destroy(publicationsIterator);
-
- celixThreadMutex_unlock(&manager->publicationsLock);
+ /* NOTE for now it assumed PSA / PST and PSD are started before subscribers/publisher
+ * so no retroactively adding subscribers
+ *
+ * TODO future extension?
+ */
}
-void pubsub_topologyManager_psaRemoved(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
- celix_status_t status = CELIX_SUCCESS;
+void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
pubsub_topology_manager_t *manager = handle;
+ //pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc;
-
- /* Deactivate all publications */
- celixThreadMutex_lock(&manager->publicationsLock);
-
- hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
- while(hashMapIterator_hasNext(pubit)){
- hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit);
- char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry);
- // Extract scope/topic name from key
- char scope[MAX_SCOPE_LEN];
- char topic[MAX_TOPIC_LEN];
- sscanf(scope_topic_key, "%[^:]:%s", scope, topic );
- array_list_pt pubEP_list = (array_list_pt)hashMapEntry_getValue(pub_entry);
-
- status = psa->closeAllPublications(psa->admin,scope,topic);
-
- if(status==CELIX_SUCCESS){
- celixThreadMutex_lock(&manager->discoveryListLock);
- hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
- while(hashMapIterator_hasNext(iter)){
- service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
- pubsub_announce_endpoint_listener_t *disc = NULL;
- bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- const char* fwUUID = NULL;
- bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- unsigned int i;
- for(i=0;i<arrayList_size(pubEP_list);i++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
- if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
- disc->removeEndpoint(disc->handle,pubEP->properties);
- }
- }
- bundleContext_ungetService(manager->context, disc_sr, NULL);
+ /* de-announce all publications */
+ celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+ celix_array_list_t *endpointsList = hashMap_remove(manager->announcedEndpoints.map, (void*)svcId);
+ celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
+
+ if (endpointsList != NULL) {
+ for (int i = 0; i < celix_arrayList_size(endpointsList); ++i) {
+ celix_properties_t *endpoint = celix_arrayList_get(endpointsList, i);
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+ listener->removeEndpoint(listener->handle, endpoint);
}
- hashMapIterator_destroy(iter);
- celixThreadMutex_unlock(&manager->discoveryListLock);
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celix_properties_destroy(endpoint);
}
+ celix_arrayList_destroy(endpointsList);
}
- hashMapIterator_destroy(pubit);
-
- celixThreadMutex_unlock(&manager->publicationsLock);
-
- /* Deactivate all subscriptions */
- celixThreadMutex_lock(&manager->subscriptionsLock);
- hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
- while(hashMapIterator_hasNext(subit)){
- // TODO do some error checking
- char* scope_topic = (char*)hashMapIterator_nextKey(subit);
- char scope[MAX_TOPIC_LEN];
- char topic[MAX_TOPIC_LEN];
- memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char));
- memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char));
- sscanf(scope_topic, "%[^:]:%s", scope, topic );
- status += psa->closeAllSubscriptions(psa->admin,scope, topic);
- }
- hashMapIterator_destroy(subit);
- celixThreadMutex_unlock(&manager->subscriptionsLock);
- celixThreadMutex_lock(&manager->psaListLock);
- arrayList_removeElement(manager->psaList, psa);
- celixThreadMutex_unlock(&manager->psaListLock);
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA");
+}
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA");
+static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
+ pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->setupTopicReciever(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
}
-void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
pubsub_topology_manager_t *manager = handle;
- //subscriber_service_pt subscriber = (subscriber_service_pt)service;
+ //NOTE new local subscriber service register
+ //1) First trying to see if a TopicReceiver already exists for this subscriber, if found
+ //2) update the usage count. if not found
+ //3) Create new entry, find matching psa and serializer and broadcast cond, so that the psaHandling thread will
+ // call the psa to setup the topic receiver and announce the endpoint.
+
+ const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+ const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ if (topic == NULL) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+ "[PSTM] Warning found subscriber service without mandatory %s property.",
+ PUBSUB_SUBSCRIBER_TOPIC);
+ return;
+ }
- pubsub_endpoint_pt sub = NULL;
- if(pubsubEndpoint_createFromSvc(manager->context, bnd, props,false, &sub) == CELIX_SUCCESS) {
- celixThreadMutex_lock(&manager->subscriptionsLock);
- char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(sub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ long bndId = celix_bundle_getId(bnd);
+ char *scopeAndTopicKey = NULL;
+ scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
- array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
- if(sub_list_by_topic==NULL){
- arrayList_create(&sub_list_by_topic);
- hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic);
- }
- free(sub_key);
- arrayList_add(sub_list_by_topic,sub);
-
- celixThreadMutex_unlock(&manager->subscriptionsLock);
-
- unsigned int j;
- double score = 0;
- double best_score = 0;
- pubsub_admin_service_pt best_psa = NULL;
- celixThreadMutex_lock(&manager->psaListLock);
- for(j=0;j<arrayList_size(manager->psaList);j++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
- psa->matchEndpoint(psa->admin,sub,&score);
- if (score > best_score) { /* We have a new winner! */
- best_score = score;
- best_psa = psa;
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
+ if (entry != NULL) {
+ entry->usageCount += 1;
+ free(scopeAndTopicKey);
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+ if (entry == NULL) {
+ //new TopicReceiver needed -> matching for psa/serializer
+ entry = calloc(1, sizeof(*entry));
+ entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
+ entry->selectedPsaSvcId = -1L;
+ entry->selectedSerializerSvcId = -1L;
+ entry->usageCount = 1;
+
+ double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serializerSvcId = -1L;
+ long selectedPsasvcId = -1L;
+
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter);
+ long svcId = (long) hashMapEntry_getKey(mapEntry);
+ pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+ double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serSvcId = -1L;
+
+ psa->matchSubscriber(psa->handle, bndId, props, &score, &serSvcId);
+ if (score > highestScore) {
+ highestScore = score;
+ serializerSvcId = serSvcId;
+ selectedPsasvcId = svcId;
}
}
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
- if (best_psa != NULL && best_score>0) {
- best_psa->addSubscription(best_psa->admin,sub);
- }
-
- // Inform discoveries for interest in the topic
- celixThreadMutex_lock(&manager->discoveryListLock);
- hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
- while(hashMapIterator_hasNext(iter)){
- service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
- pubsub_announce_endpoint_listener_t *disc = NULL;
- bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- disc->announceEndpoint(disc->handle, sub->properties);
- bundleContext_ungetService(manager->context, disc_sr, NULL);
- }
- hashMapIterator_destroy(iter);
- celixThreadMutex_unlock(&manager->discoveryListLock);
-
- celixThreadMutex_unlock(&manager->psaListLock);
- }
-}
-
-
-void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_topology_manager_t *manager = handle;
-
- pubsub_endpoint_pt subcmp = NULL;
- if (pubsubEndpoint_createFromSvc(manager->context, bnd, props, false, &subcmp) == CELIX_SUCCESS){
-
- unsigned int j,k;
+ if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+ entry->selectedPsaSvcId = selectedPsasvcId;
+ entry->selectedSerializerSvcId = serializerSvcId;
- // Inform discoveries that we not interested in the topic any more
- celixThreadMutex_lock(&manager->discoveryListLock);
- hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
- while(hashMapIterator_hasNext(iter)){
- service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
- pubsub_announce_endpoint_listener_t *disc = NULL;
- bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- disc->removeEndpoint(disc->handle, subcmp->properties);
- bundleContext_ungetService(manager->context, disc_sr, NULL);
- }
- hashMapIterator_destroy(iter);
- celixThreadMutex_unlock(&manager->discoveryListLock);
-
- celixThreadMutex_lock(&manager->subscriptionsLock);
- celixThreadMutex_lock(&manager->psaListLock);
-
- char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
- array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
- free(sub_key);
- if(sub_list_by_topic!=NULL){
- for(j=0;j<arrayList_size(sub_list_by_topic);j++){
- pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j);
- if(pubsubEndpoint_equals(sub,subcmp)){
- for(k=0;k<arrayList_size(manager->psaList);k++){
- /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- psa->removeSubscription(psa->admin,sub);
- }
+ celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry,
+ pstm_setupTopicReceiverCallback);
- }
- arrayList_remove(sub_list_by_topic,j);
+ if (entry->endpoint != NULL) {
+ entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
- /* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */
- if(arrayList_size(sub_list_by_topic)==0){
- for(k=0;k<arrayList_size(manager->psaList);k++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
- }
+ //announce new endpoint through the network
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->announceEndpoint(listener->handle, entry->endpoint);
}
-
- pubsubEndpoint_destroy(sub);
-
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+ //store topic receiver.
+ //TODO race condition if multiple scope/topic combinations are request -> broader the lock?
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ } else {
+ free(entry->scopeAndTopicKey);
+ free(entry);
+ //ignore -> psa unregistered in meantime
}
}
-
- celixThreadMutex_unlock(&manager->psaListLock);
- celixThreadMutex_unlock(&manager->subscriptionsLock);
-
- pubsubEndpoint_destroy(subcmp);
-
}
}
-void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, const celix_properties_t *props) {
- pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
- pubsub_announce_endpoint_listener_t *disc = svc;
+void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
+ pubsub_topology_manager_t *manager = handle;
+ //NOTE local subscriber service unregister
+ //1) Find topic receiver and decrease count
- const char* fwUUID = NULL;
+ const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+ const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
- bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- if(fwUUID==NULL){
- printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
+ if (topic == NULL) {
return;
}
- celixThreadMutex_lock(&manager->publicationsLock);
+ char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMap_remove(manager->topicReceivers.map, scopeAndTopicKey);
+ if (entry != NULL) {
+ entry->usageCount -= 0;
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ free(scopeAndTopicKey);
- celixThreadMutex_lock(&manager->discoveryListLock);
- long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
- hashMap_put(manager->discoveryList, (void*)svcId, NULL);
- celixThreadMutex_unlock(&manager->discoveryListLock);
-
- hash_map_iterator_pt iter = hashMapIterator_create(manager->publications);
- while(hashMapIterator_hasNext(iter)){
- array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
- for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
- if( (strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0)) {
- disc->announceEndpoint(disc->handle,pubEP->properties);
- }
+ //NOTE not waking up psaHandling thread, topic receiver does not need to be removed immediately.
+}
+
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+ pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
+ pubsub_announce_endpoint_listener_t *listener = svc;
+
+ //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
+ //2) Add listener to manager->announceEndpointListeners
+
+ celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
+ for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
+ celix_properties_t *ep = celix_arrayList_get(endpoints, i);
+ listener->announceEndpoint(listener->handle, ep);
}
}
- hashMapIterator_destroy(iter);
-
- celixThreadMutex_unlock(&manager->publicationsLock);
+ celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
- celixThreadMutex_lock(&manager->subscriptionsLock);
- iter = hashMapIterator_create(manager->subscriptions);
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ celix_arrayList_add(manager->announceEndpointListeners.list, listener);
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+}
- while(hashMapIterator_hasNext(iter)) {
- array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
- unsigned int i;
- for(i=0;i<arrayList_size(l);i++){
- pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
- disc->announceEndpoint(disc->handle, subEp->properties);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+ pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
+ pubsub_announce_endpoint_listener_t *listener = svc;
+
+ //1) Remove listener from manager->announceEndpointListeners
+ //2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints)
+
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+ celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
+ for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
+ celix_properties_t *ep = celix_arrayList_get(endpoints, i);
+ listener->removeEndpoint(listener->handle, ep);
}
}
- hashMapIterator_destroy(iter);
- celixThreadMutex_unlock(&manager->subscriptionsLock);
+ celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
}
-void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, const celix_properties_t *props) {
- pubsub_topology_manager_t *manager = handle;
+static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
+ pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+}
- celixThreadMutex_lock(&manager->discoveryListLock);
+void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
+ pubsub_topology_manager_t *manager = handle;
+ //NOTE new local subscriber service register
+ //1) First trying to see if a TopicReceiver already exists for this subscriber, if found
+ //2) update the usage count. if not found
+ //3) Try to find a matching psa and create a new TopicReceiver.
- long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
- if (hashMap_remove(manager->discoveryList, (void*)svcId)) {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed");
+ //TODO FIXME
+ if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Bug. trackServiceTracker should only trigger for %s. Now triggering on %s", PUBSUB_PUBLISHER_SERVICE_NAME, info->serviceName);
+ return;
}
- celixThreadMutex_unlock(&manager->discoveryListLock);
-}
-
-static void tm_callAnnounce(void *handle, void *svc) {
- pubsub_endpoint_t *pub = handle;
- pubsub_announce_endpoint_listener_t *listener = svc;
- listener->announceEndpoint(listener->handle, pub->properties);
-}
-
-void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
- pubsub_topology_manager_t *manager = handle;
- pubsub_endpoint_pt pub = NULL;
- celix_status_t status = pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pub);
- if (status == CELIX_SUCCESS) {
+ char *topic = NULL;
+ char *scopeFromFilter = NULL;
+ pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
+ const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
- celixThreadMutex_lock(&manager->publicationsLock);
- char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
- array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
- if(pub_list_by_topic==NULL){
- arrayList_create(&pub_list_by_topic);
- hashMap_put(manager->publications,pub_key,pub_list_by_topic);
- } else {
- free(pub_key);
- }
- arrayList_add(pub_list_by_topic,pub);
-
- celixThreadMutex_unlock(&manager->publicationsLock);
-
- unsigned int j;
- double score = 0;
- double best_score = 0;
- pubsub_admin_service_pt best_psa = NULL;
- celixThreadMutex_lock(&manager->psaListLock);
-
- int size = celix_arrayList_size(manager->psaList);
- for (j=0; j<size; j++) {
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
- psa->matchEndpoint(psa->admin,pub,&score);
- if(score>best_score){ /* We have a new winner! */
- best_score = score;
- best_psa = psa;
- }
+ char *scopeAndTopicKey = NULL;
+ if (topic == NULL) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+ "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.", PUBSUB_SUBSCRIBER_TOPIC);
+ } else {
+ scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+ if (entry != NULL) {
+ entry->usageCount += 1;
}
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+ if (entry == NULL) {
+ //new topic receiver needed, requesting match with current psa
+ double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serializerSvcId = -1L;
+ long selectedPsasvcId = -1L;
+
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *entry = hashMapIterator_nextEntry(&iter);
+ long svcId = (long)hashMapEntry_getKey(entry);
+ pubsub_admin_service_t *psa = hashMapEntry_getValue(entry);
+ double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serSvcId = -1L;
+ psa->matchPublisher(psa->handle, info->bundleId, info->filter, &score, &serSvcId);
+ if (score > highestScore) {
+ highestScore = score;
+ serializerSvcId = serSvcId;
+ selectedPsasvcId = svcId;
+ }
+ }
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+ if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+ entry = calloc(1, sizeof(*entry));
+ entry->scopeAndTopicKey = scopeAndTopicKey;
+ entry->usageCount = 1;
+ entry->selectedPsaSvcId = selectedPsasvcId;
+ entry->selectedSerializerSvcId = serializerSvcId;
+ entry->topic = topic; //NOTE tmp
+ entry->scope = scope; //NOTE tmp
+
+ celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
+
+ if (entry->endpoint != NULL) {
+ //note psa->setupTopicSender has created the endpoint.
+ entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ entry->topic = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ } else {
+ free(entry->scopeAndTopicKey);
+ free(entry);
+ entry = NULL;
+ //ignore -> psa unregistered in meantime
+ }
- if (best_psa != NULL && best_score > 0) {
- celix_status_t status = best_psa->addPublication(best_psa->admin,pub);
- if(status==CELIX_SUCCESS){
- celixThreadMutex_lock(&manager->discoveryListLock);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveryList);
- while(hashMapIterator_hasNext(&iter)) {
- long svcId = (long)hashMapIterator_nextKey(&iter);
- celix_bundleContext_useServiceWithId(manager->context, svcId, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, pub, tm_callAnnounce);
+ if (entry != NULL) {
+ //announce new endpoint through the network
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->announceEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+ //store topic sender.
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+ const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+ "[PSTM] setting up new TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n",
+ entry->scope, entry->topic, adminType, serType);
}
- celixThreadMutex_unlock(&manager->discoveryListLock);
}
+ } else {
+ free(scopeAndTopicKey);
}
-
- celixThreadMutex_unlock(&manager->psaListLock);
-
}
+ free(scopeFromFilter);
}
-
void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) {
pubsub_topology_manager_t *manager = handle;
- pubsub_endpoint_pt pubcmp = NULL;
- if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pubcmp) == CELIX_SUCCESS){
- unsigned int j,k;
- celixThreadMutex_lock(&manager->psaListLock);
- celixThreadMutex_lock(&manager->publicationsLock);
-
- char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
- array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
- if(pub_list_by_topic!=NULL){
- for(j=0;j<arrayList_size(pub_list_by_topic);j++){
- pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
- if(pubsubEndpoint_equals(pub,pubcmp)){
- for(k=0;k<arrayList_size(manager->psaList);k++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- celix_status_t status = psa->removePublication(psa->admin,pub);
- if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
- celixThreadMutex_lock(&manager->discoveryListLock);
- hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
- while(hashMapIterator_hasNext(iter)){
- service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
- pubsub_announce_endpoint_listener_t *disc = NULL;
- bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- disc->removeEndpoint(disc->handle,pub->properties);
- bundleContext_ungetService(manager->context, disc_sr, NULL);
- }
- hashMapIterator_destroy(iter);
- celixThreadMutex_unlock(&manager->discoveryListLock);
- }
- else if(status == CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */
- status = CELIX_SUCCESS;
- }
- }
- //}
- arrayList_remove(pub_list_by_topic,j);
-
- /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */
- if(arrayList_size(pub_list_by_topic)==0){
- for(k=0;k<arrayList_size(manager->psaList);k++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- psa->closeAllPublications(psa->admin, (char*) properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
- }
- }
-
- pubsubEndpoint_destroy(pub);
- }
+ //NOTE local subscriber service unregister
+ //1) Find topic sender and decrease count
- }
- }
+ //TODO FIXME
+ if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Bug. trackServiceTracker should only trigger for %s. Now triggering on %s", PUBSUB_PUBLISHER_SERVICE_NAME, info->serviceName);
+ return;
+ }
- celixThreadMutex_unlock(&manager->publicationsLock);
- celixThreadMutex_unlock(&manager->psaListLock);
+ char *topic = NULL;
+ char *scopeFromFilter = NULL;
+ pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
+ const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
- free(pub_key);
+ if (topic == NULL) {
+ free(scopeFromFilter);
+ return;
+ }
- pubsubEndpoint_destroy(pubcmp);
+ char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+ if (entry != NULL) {
+ entry->usageCount -= 1;
}
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+ free(scopeAndTopicKey);
+ free(scopeFromFilter);
+}
+
+static void pstm_addEndpointCallback(void *handle, void *svc) {
+ celix_properties_t *endpoint = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->addEndpoint(psa->handle, endpoint);
}
-static celix_status_t pubsub_topologyManager_addDiscoveredPublisher(void *handle, const celix_properties_t *pubProperties){
+celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint){
celix_status_t status = CELIX_SUCCESS;
pubsub_topology_manager_t *manager = handle;
- const char *topic = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- const char *scope = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *fwUid = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL);
- const char *uuid = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_UUID, NULL);
+ const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
+ // 1) See if endpoint is already discovered, if so increase usage count.
+ // 1) If not, find matching psa using the matchEndpoint
+ // 2) if found call addEndpoint of the matching psa
- if (manager->verbose) {
- printf("PSTM: New publisher discovered for scope/topic %s/%s [fwUUID=%s, epUUID=%s]\n",
- scope, topic, fwUid, uuid);
- }
-
-
- celixThreadMutex_lock(&manager->psaListLock);
- celixThreadMutex_lock(&manager->publicationsLock);
+ if (manager->verbose) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+ "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+ uuid);
+ }
- char *pub_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
- if(pub_list_by_topic==NULL){
- arrayList_create(&pub_list_by_topic);
- hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
+ if (entry != NULL) {
+ //already existing endpoint -> increase usage
+ entry->usageCount += 1;
}
- free(pub_key);
-
- /* Shouldn't be any other duplicate, since it's filtered out by the discovery */
- pubsub_endpoint_pt p = NULL;
- pubsubEndpoint_createFromProperties(pubProperties, &p);
- arrayList_add(pub_list_by_topic , p);
-
- unsigned int j;
- double score = 0;
- double best_score = 0;
- pubsub_admin_service_pt best_psa = NULL;
-
- for(j=0;j<arrayList_size(manager->psaList);j++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
- psa->matchEndpoint(psa->admin , p, &score);
- if (score>best_score) { /* We have a new winner! */
- best_score = score;
- best_psa = psa;
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+
+ if (entry == NULL) {
+
+ //new endpoint -> new entry
+ entry = calloc(1, sizeof(*entry));
+ entry->usageCount = 1;
+ entry->endpoint = celix_properties_copy(endpoint);
+ entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ entry->selectedPsaSvcId = -1L;
+
+ double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long psaSvcId = -1L;
+
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+ long svcId = (long) hashMapEntry_getKey(mapEntry);
+ double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ psa->matchEndpoint(psa->handle, endpoint, &score);
+ if (score > highestScore) {
+ highestScore = score;
+ psaSvcId = svcId;
+ }
+ }
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+ if (psaSvcId >= 0) {
+ //psa called outside of mutex, this means the it can happen that addEndpointCallback is not called.
+ //for now this is expected behaviour;
+ //You need to start the pubsub admin stuff before the bundles using pubsub.
+ celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+ (void *) endpoint, pstm_addEndpointCallback);
+ } else {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
}
- }
- if(best_psa != NULL && best_score>0) {
- //TODO FIXME this the same call as used by publisher of service trackers. This is confusing.
- //remote discovered publication can be handle different.
- best_psa->addPublication(best_psa->admin,p);
- }
- else{
- status = CELIX_ILLEGAL_STATE;
+ entry->selectedPsaSvcId = psaSvcId;
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry);
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
}
- celixThreadMutex_unlock(&manager->publicationsLock);
- celixThreadMutex_unlock(&manager->psaListLock);
+ return status;
+}
- return status;
+static void pstm_removeEndpointCallback(void *handle, void *svc) {
+ celix_properties_t *endpoint = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->removeEndpoint(psa->handle, endpoint);
}
-static celix_status_t pubsub_topologyManager_removeDiscoveredPublisher(void *handle, const celix_properties_t *props) {
+celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_topology_manager_t *manager = handle;
- if (manager->verbose) {
- printf("PSTM: Publisher removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
- celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
- celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
- celix_properties_get(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
- celix_properties_get(props, PUBSUB_ENDPOINT_UUID, NULL));
- }
+ // 1) See if endpoint is already discovered, if so decrease usage count.
+ // 1) If usage count becomes 0, find matching psa using the matchEndpoint
+ // 2) if found call disconnectEndpoint of the matching psa
- celixThreadMutex_lock(&manager->psaListLock);
- celixThreadMutex_lock(&manager->publicationsLock);
- unsigned int i;
+ const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
- char *pub_key = pubsubEndpoint_createScopeTopicKey(celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_NAME, NULL));
- array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
- if(pub_list_by_topic==NULL){
- printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,celix_properties_get(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),celix_properties_get(props, "pubsub.url", NULL));
+ if (manager->verbose) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+ "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+ uuid);
}
- else{
-
- pubsub_endpoint_pt p = NULL;
- bool found = false;
- for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
- p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
- found = pubsubEndpoint_equalsWithProperties(p,props);
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
+ if (entry != NULL) {
+ //already existing endpoint -> decrease usage
+ entry->usageCount-= 1;
+ if (entry->usageCount <= 0) {
+ hashMap_remove(manager->discoveredEndpoints.map, entry->uuid);
+ } else {
+ entry = NULL; //still used (usage count > 0) -> do nothing
+ }
+ }
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+
+ if (entry != NULL) {
+ //note entry is removed from manager->discoveredEndpoints, also inform used psa
+ if (entry->selectedPsaSvcId >= 0) {
+ //note that it is possible that the psa is already gone, in that case the call is also not needed anymore.
+ celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+ (void *) endpoint, pstm_removeEndpointCallback);
+ } else {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid);
}
+ celix_properties_destroy(entry->endpoint);
+ free(entry);
+ }
- if(found && p !=NULL){
- for(i=0;i<arrayList_size(manager->psaList);i++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
- /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
- psa->removePublication(psa->admin,p);
- }
+ return CELIX_SUCCESS;
+}
- arrayList_removeElement(pub_list_by_topic,p);
- /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket */
- if(arrayList_size(pub_list_by_topic)==0){
+static void pstm_teardownTopicSenderCallback(void *handle, void *svc) {
+ pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->teardownTopicSender(psa->handle, entry->scope, entry->topic);
+}
- for(i=0;i<arrayList_size(manager->psaList);i++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
- psa->closeAllPublications(psa->admin, (char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), (char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL));
- }
- }
+static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+
+ if (entry != NULL && entry->usageCount <= 0) {
+ hashMapIterator_remove(&iter);
+ if (manager->verbose) {
+ const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+ "[PSTM] Tearing down TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n",
+ entry->scope, entry->topic, adminType, serType);
+ }
+
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->removeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+ celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+ entry, pstm_teardownTopicSenderCallback);
+
+
+ //cleanup entry
+ free(entry->scopeAndTopicKey);
+ celix_properties_destroy(entry->endpoint);
+ free(entry);
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+}
- pubsubEndpoint_destroy(p);
- }
+static void pstm_teardownTopicReceiverCallback(void *handle, void *svc) {
+ pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->teardownTopicSender(psa->handle, entry->scope, entry->topic);
+}
+
+static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->usageCount <= 0) {
+ hashMapIterator_remove(&iter);
+
+ if (manager->verbose) {
+ const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+ "[PSTM] Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n",
+ entry->scope, entry->topic, adminType, serType);
+ }
+
+ celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_teardownTopicReceiverCallback);
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->removeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+ //cleanup entry
+ free(entry->scopeAndTopicKey);
+ celix_properties_destroy(entry->endpoint);
+ free(entry);
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+}
+static void* pstm_psaHandlingThread(void *data) {
+ pubsub_topology_manager_t *manager = data;
- }
- free(pub_key);
- celixThreadMutex_unlock(&manager->publicationsLock);
- celixThreadMutex_unlock(&manager->psaListLock);
+ celixThreadMutex_lock(&manager->psaHandling.mutex);
+ bool running = manager->psaHandling.running;
+ celixThreadMutex_unlock(&manager->psaHandling.mutex);
+ while (running) {
+ pstm_teardownTopicSenders(manager);
+ pstm_teardownTopicReceivers(manager);
- return CELIX_SUCCESS;
+ celixThreadMutex_lock(&manager->psaHandling.mutex);
+ celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_CLEANUP_SLEEPTIME_IN_SECONDS, 0L);
+ running = manager->psaHandling.running;
+ celixThreadMutex_unlock(&manager->psaHandling.mutex);
+ }
+ return NULL;
}
-celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *properties) {
- const char *type = celix_properties_get(properties, PUBSUB_ENDPOINT_TYPE, NULL);
- if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- return pubsub_topologyManager_addDiscoveredPublisher(handle, properties);
- } else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) {
- //nop //TODO add subscription to pubsub admins
- } else {
- fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint type (key: %s)\n", PUBSUB_ENDPOINT_TYPE);
- }
- return CELIX_SUCCESS;
-}
-celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *properties) {
- const char *type = celix_properties_get(properties, PUBSUB_ENDPOINT_TYPE, NULL);
- if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- pubsub_topologyManager_removeDiscoveredPublisher(handle, properties);
- } else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) {
- //nop //TODO remove subscription from pubsub admins
- } else {
- fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint type (key: %s)\n", PUBSUB_ENDPOINT_TYPE);
+celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
+ pubsub_topology_manager_t *manager = handle;
+ //TODO add support for searching based on scope and topic
+
+ fprintf(os, "\n");
+
+ fprintf(os, "Discovered Endpoints: \n");
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter);
+ const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+ const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
+ const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
+ fprintf(os, " |- scope = %s\n", scope);
+ fprintf(os, " |- topic = %s\n", topic);
+ fprintf(os, " |- admin type = %s\n", adminType);
+ fprintf(os, " |- serializer = %s\n", serType);
+ if (manager->verbose) {
+ fprintf(os, " |- psa svc id = %li\n", discovered->selectedPsaSvcId);
+ fprintf(os, " |- usage count = %i\n", discovered->usageCount);
+ }
+ }
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+ fprintf(os,"\n");
+
+
+ fprintf(os, "Active Topic Senders:\n");
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ fprintf(os, "|- Topic Sender for endpoint %s:\n", entry->endpointUUID);
+ fprintf(os, " |- scope = %s\n", entry->scope);
+ fprintf(os, " |- topic = %s\n", entry->topic);
+ fprintf(os, " |- admin type = %s\n", adminType);
+ fprintf(os, " |- serializer = %s\n", serType);
+ if (manager->verbose) {
+ fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
+ fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
+ fprintf(os, " |- usage count = %i\n", entry->usageCount);
+ }
}
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ fprintf(os,"\n");
+
+ fprintf(os, "Active Topic Receivers:\n");
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ fprintf(os, "|- Topic Receiver for endpoint %s:\n", entry->endpointUUID);
+ fprintf(os, " |- scope = %s\n", entry->scope);
+ fprintf(os, " |- topic = %s\n", entry->topic);
+ fprintf(os, " |- admin type = %s\n", adminType);
+ fprintf(os, " |- serializer = %s\n", serType);
+ if (manager->verbose) {
+ fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
+ fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
+ fprintf(os, " |- usage count = %i\n", entry->usageCount);
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ fprintf(os,"\n");
+
return CELIX_SUCCESS;
}
-
-
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index dda84a0..105b797 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -36,31 +36,68 @@
#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE false
-struct pubsub_topology_manager {
+typedef struct pubsub_topology_manager {
bundle_context_pt context;
- celix_thread_mutex_t psaListLock;
- array_list_pt psaList;
-
- celix_thread_mutex_t discoveryListLock;
- hash_map_pt discoveryList; //<svcId,NULL>
-
- celix_thread_mutex_t publicationsLock;
- hash_map_pt publications; //<topic(string),list<pubsub_ep>>
-
- celix_thread_mutex_t subscriptionsLock;
- hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
-
- command_service_t shellCmdService;
- service_registration_pt shellCmdReg;
-
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = svcId, value = pubsub_admin_t*
+ } pubsubadmins;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = psa svc id, value = list<celix_properties_t /*endpoint*/>
+ } announcedEndpoints;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = uuid , value = pstm_discovered_endpoint_entry_t
+ } discoveredEndpoints;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t*
+ } topicReceivers;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t*
+ } topicSenders;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ celix_array_list_t *list; //<pubsub_announce_endpoint_listener_t*>
+ } announceEndpointListeners;
+
+ struct {
+ celix_thread_t thread;
+ celix_thread_mutex_t mutex; //protect running and condition
+ celix_thread_cond_t cond;
+ bool running;
+ } psaHandling;
log_helper_pt loghelper;
bool verbose;
-};
-
-typedef struct pubsub_topology_manager pubsub_topology_manager_t;
+} pubsub_topology_manager_t;
+
+typedef struct pstm_discovered_endpoint_entry {
+ const char *uuid;
+ long selectedPsaSvcId;
+ int usageCount; //note that discovered endpoints can be found multiple times by different pubsub discovery components
+ celix_properties_t *endpoint;
+} pstm_discovered_endpoint_entry_t;
+
+typedef struct pstm_topic_receiver_or_sender_entry {
+ char *scopeAndTopicKey; //key of the combined value of the scope and topic
+ celix_properties_t *endpoint;
+ const char *topic;
+ const char *scope;
+ const char *endpointUUID;
+ int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
+ long selectedPsaSvcId;
+ long selectedSerializerSvcId;
+} pstm_topic_receiver_or_sender_entry_t;
celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager);
celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager);
@@ -69,8 +106,8 @@ celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_t *ma
void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props);
void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const celix_properties_t *props);
-void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, const celix_properties_t *props);
-void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, const celix_properties_t *props);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, void *svc, const celix_properties_t *props);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle, void *svc, const celix_properties_t *props);
void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
@@ -81,4 +118,6 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *properties);
celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *properties);
+celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream);
+
#endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/include/celix_api.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_api.h b/libs/framework/include/celix_api.h
index 5129a65..b941a78 100644
--- a/libs/framework/include/celix_api.h
+++ b/libs/framework/include/celix_api.h
@@ -21,32 +21,26 @@
#define CELIX_CELIX_API_H_
#include "properties.h"
-
#include "array_list.h"
-#include "celix_array_list.h"
-
#include "constants.h"
+#include "bundle.h"
+#include "bundle_context.h"
+#include "framework.h"
+#include "celix_properties.h"
+#include "celix_array_list.h"
+//#include "celix_constants.h"
#include "celix_utils_api.h"
-
-#include "bundle.h"
#include "celix_bundle.h"
-
-#include "bundle_context.h"
#include "celix_bundle_context.h"
-#include "service_registration.h"
-#include "service_factory.h"
-#include "service_reference.h"
-#include "service_tracker.h"
-#include "service_tracker_customizer.h"
-#include "listener_hook_service.h"
-
-#include "framework.h"
+#include "celix_framework.h"
#include "celix_framework_factory.h"
#include "celix_launcher.h"
#include "dm_dependency_manager.h"
#include "dm_service_dependency.h"
+#include "celix_bundle_activator.h"
+
#endif //CELIX_CELIX_API_H_
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/include/celix_bundle_context.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_bundle_context.h b/libs/framework/include/celix_bundle_context.h
index 5bb6faa..76af9a5 100644
--- a/libs/framework/include/celix_bundle_context.h
+++ b/libs/framework/include/celix_bundle_context.h
@@ -357,6 +357,7 @@ typedef struct celix_service_tracker_options {
.filter.versionRange = NULL, \
.filter.filter = NULL, \
.filter.serviceLanguage = NULL, \
+ .filter.ignoreServiceLanguage = false, \
.callbackHandle = NULL, \
.set = NULL, \
.add = NULL, \
@@ -797,6 +798,16 @@ const char* celix_bundleContext_getProperty(celix_bundle_context_t *ctx, const c
long celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const char *key, long defaultValue);
/**
+ * Gets the config property as converts it to double. If the property is not a valid double, the defaultValue will be returned.
+ * The rest of the behaviour is the same as celix_bundleContext_getProperty.
+
+ * @param key The key of the property to receive.
+ * @param defaultVal The default value to use if the property is not found.
+ * @return The property value for the provided key or the provided defaultValue is the key is not found.
+ */
+double celix_bundleContext_getPropertyAsDouble(celix_bundle_context_t *ctx, const char *key, double defaultValue);
+
+/**
* Gets the config property as converts it to bool. If the property is not a valid bool, the defaultValue will be returned.
* The rest of the behaviour is the same as celix_bundleContext_getProperty.
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/src/bundle_context.c
----------------------------------------------------------------------
diff --git a/libs/framework/src/bundle_context.c b/libs/framework/src/bundle_context.c
index b8b1552..1e86f2a 100644
--- a/libs/framework/src/bundle_context.c
+++ b/libs/framework/src/bundle_context.c
@@ -1038,6 +1038,20 @@ long celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const ch
return result;
}
+double celix_bundleContext_getPropertyAsDouble(celix_bundle_context_t *ctx, const char *key, double defaultValue) {
+ double result = defaultValue;
+ const char *val = celix_bundleContext_getProperty(ctx, key, NULL);
+ if (val != NULL) {
+ char *enptr = NULL;
+ errno = 0;
+ double r = strtod(val, &enptr);
+ if (enptr != val && errno == 0) {
+ result = r;
+ }
+ }
+ return result;
+}
+
bool celix_bundleContext_getPropertyAsBool(celix_bundle_context_t *ctx, const char *key, bool defaultValue) {
bool result = defaultValue;
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/src/framework.c
----------------------------------------------------------------------
diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c
index 4059e70..b0d505d 100644
--- a/libs/framework/src/framework.c
+++ b/libs/framework/src/framework.c
@@ -2822,7 +2822,7 @@ void celix_framework_useBundle(framework_t *fw, bool onlyActive, long bundleId,
bundle_t *bnd = framework_getBundleById(fw, bundleId);
if (bnd != NULL) {
celix_bundle_state_e bndState = celix_bundle_getState(bnd);
- if (onlyActive && bndState == OSGI_FRAMEWORK_BUNDLE_ACTIVE) {
+ if (onlyActive && (bndState == OSGI_FRAMEWORK_BUNDLE_ACTIVE || bndState == OSGI_FRAMEWORK_BUNDLE_STARTING)) {
use(callbackHandle, bnd);
} else if (!onlyActive) {
use(callbackHandle, bnd);
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/include/celix_properties.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_properties.h b/libs/utils/include/celix_properties.h
index e325222..f6e871b 100644
--- a/libs/utils/include/celix_properties.h
+++ b/libs/utils/include/celix_properties.h
@@ -60,12 +60,16 @@ void celix_properties_unset(celix_properties_t *properties, const char *key);
celix_properties_t* celix_properties_copy(const celix_properties_t *properties);
long celix_properties_getAsLong(const celix_properties_t *props, const char *key, long defaultValue);
-
void celix_properties_setLong(celix_properties_t *props, const char *key, long value);
bool celix_properties_getAsBool(celix_properties_t *props, const char *key, bool defaultValue);
void celix_properties_setBool(celix_properties_t *props, const char *key, bool val);
+
+void celix_properties_setDouble(celix_properties_t *props, const char *key, double val);
+double celix_properties_getAsDouble(const celix_properties_t *props, const char *key, double defaultValue);
+
+
#define CELIX_PROPERTIES_FOR_EACH(props, key) \
for(hash_map_iterator_t iter = hashMapIterator_construct(props); \
hashMapIterator_hasNext(&iter), (key) = (const char*)hashMapIterator_nextKey(&iter);)
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/include/celix_threads.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_threads.h b/libs/utils/include/celix_threads.h
index a9a3049..bf6e1f6 100644
--- a/libs/utils/include/celix_threads.h
+++ b/libs/utils/include/celix_threads.h
@@ -54,6 +54,11 @@ static const celix_thread_t celix_thread_default = {0, 0};
celix_status_t
celixThread_create(celix_thread_t *new_thread, celix_thread_attr_t *attr, celix_thread_start_t func, void *data);
+/**
+ * If supported by the platform sets the name of the thread.
+ */
+void celixThread_setName(celix_thread_t *thread, const char *threadName);
+
void celixThread_exit(void *exitStatus);
celix_status_t celixThread_detach(celix_thread_t thread);
@@ -123,7 +128,7 @@ celix_status_t celixThreadCondition_destroy(celix_thread_cond_t *condition);
celix_status_t celixThreadCondition_wait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex);
-celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds);
+celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds);
celix_status_t celixThreadCondition_broadcast(celix_thread_cond_t *cond);
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/src/celix_threads.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/celix_threads.c b/libs/utils/src/celix_threads.c
index d8a8091..7bfc37d 100644
--- a/libs/utils/src/celix_threads.c
+++ b/libs/utils/src/celix_threads.c
@@ -24,6 +24,7 @@
* \copyright Apache License, Version 2.0
*/
#include <stdlib.h>
+#include <sys/time.h>
#include "signal.h"
#include "celix_threads.h"
@@ -41,6 +42,16 @@ celix_status_t celixThread_create(celix_thread_t *new_thread, celix_thread_attr_
return status;
}
+#ifdef _GNU_SOURCE
+void celixThread_setName(celix_thread_t *thread, const char *threadName) {
+ pthread_setname_np(thread->thread, threadName);
+}
+#else
+void celixThread_setName(celix_thread_t *thread __attribute__((unused)), const char *threadName __attribute__((unused))); {
+ //nop
+}
+#endif
+
// Returns void, since pthread_exit does exit the thread and never returns.
void celixThread_exit(void *exitStatus) {
pthread_exit(exitStatus);
@@ -143,10 +154,11 @@ celix_status_t celixThreadCondition_wait(celix_thread_cond_t *cond, celix_thread
return pthread_cond_wait(cond, mutex);
}
-celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) {
+celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) {
struct timespec time;
- time.tv_sec = seconds;
- time.tv_nsec = nanoseconds;
+ clock_gettime(CLOCK_REALTIME, &time);
+ time.tv_sec += seconds;
+ time.tv_nsec += nanoseconds;
return pthread_cond_timedwait(cond, mutex, &time);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/src/properties.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/properties.c b/libs/utils/src/properties.c
index 480a056..191dfd8 100644
--- a/libs/utils/src/properties.c
+++ b/libs/utils/src/properties.c
@@ -394,13 +394,37 @@ long celix_properties_getAsLong(const celix_properties_t *props, const char *key
}
void celix_properties_setLong(celix_properties_t *props, const char *key, long value) {
- char buf[32]; //should be enough to store long long int
- int writen = snprintf(buf, 32, "%li", value);
- if (writen <= 31) {
- celix_properties_set(props, key, buf);
- } else {
- fprintf(stderr,"buf to small for value '%li'\n", value);
+ char buf[32]; //should be enough to store long long int
+ int writen = snprintf(buf, 32, "%li", value);
+ if (writen <= 31) {
+ celix_properties_set(props, key, buf);
+ } else {
+ fprintf(stderr,"buf to small for value '%li'\n", value);
+ }
+}
+
+double celix_properties_getAsDouble(const celix_properties_t *props, const char *key, double defaultValue) {
+ double result = defaultValue;
+ const char *val = celix_properties_get(props, key, NULL);
+ if (val != NULL) {
+ char *enptr = NULL;
+ errno = 0;
+ double r = strtod(val, &enptr);
+ if (enptr != val && errno == 0) {
+ result = r;
+ }
}
+ return result;
+}
+
+void celix_properties_setDouble(celix_properties_t *props, const char *key, double val) {
+ char buf[32]; //should be enough to store long long int
+ int writen = snprintf(buf, 32, "%f", val);
+ if (writen <= 31) {
+ celix_properties_set(props, key, buf);
+ } else {
+ fprintf(stderr,"buf to small for value '%f'\n", val);
+ }
}
bool celix_properties_getAsBool(celix_properties_t *props, const char *key, bool defaultValue) {