You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:26 UTC
[09/34] celix git commit: CELIX-454: More PubSub. The UDPMC is now
somewhat working again, but still needs some testing.
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 85c67e9..1ebd74a 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -36,8 +36,9 @@
#include "pubsub_listeners.h"
#include "pubsub_topology_manager.h"
#include "pubsub_admin.h"
+#include "../../pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h"
-#define PSTM_CLEANUP_SLEEPTIME_IN_SECONDS 5L
+#define PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS 30L
static void* pstm_psaHandlingThread(void *data);
@@ -161,22 +162,53 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
celix_arrayList_destroy(endpointsList);
}
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA");
-}
+ //NOTE psa shutdown will teardown topic receivers / topic senders
+ //de-setup all topic receivers/senders for the removed psa.
+ //the next psaHandling run will try to find new psa.
-static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
- pstm_topic_receiver_or_sender_entry_t *entry = handle;
- pubsub_admin_service_t *psa = svc;
- psa->setupTopicReciever(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry->selectedPsaSvcId == svcId) {
+ entry->setup = false;
+ entry->selectedSerializerSvcId = -1L;
+ entry->selectedPsaSvcId = -1L;
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ entry->endpoint = NULL;
+ }
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry->selectedPsaSvcId == svcId) {
+ entry->setup = false;
+ entry->selectedSerializerSvcId = -1L;
+ entry->selectedPsaSvcId = -1L;
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ entry->endpoint = NULL;
+ }
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+
+
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA");
}
void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
pubsub_topology_manager_t *manager = handle;
+
//NOTE new local subscriber service register
//1) First trying to see if a TopicReceiver already exists for this subscriber, if found
//2) update the usage count. if not found
- //3) Create new entry, find matching psa and serializer and broadcast cond, so that the psaHandling thread will
- // call the psa to setup the topic receiver and announce the endpoint.
+ //3) signal psaHandling thread to setup topic receiver
const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -196,75 +228,28 @@ void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute
if (entry != NULL) {
entry->usageCount += 1;
free(scopeAndTopicKey);
- }
- celixThreadMutex_unlock(&manager->topicReceivers.mutex);
-
- if (entry == NULL) {
- //new TopicReceiver needed -> matching for psa/serializer
+ } else {
entry = calloc(1, sizeof(*entry));
entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
+ entry->scope = strndup(scope, 1024 * 1024);
+ entry->topic = strndup(topic, 1024 * 1024);
+ entry->usageCount = 1;
entry->selectedPsaSvcId = -1L;
entry->selectedSerializerSvcId = -1L;
- entry->usageCount = 1;
-
- double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
- long serializerSvcId = -1L;
- long selectedPsasvcId = -1L;
-
- celixThreadMutex_lock(&manager->pubsubadmins.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter);
- long svcId = (long) hashMapEntry_getKey(mapEntry);
- pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
- double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
- long serSvcId = -1L;
-
- psa->matchSubscriber(psa->handle, bndId, props, &score, &serSvcId);
- if (score > highestScore) {
- highestScore = score;
- serializerSvcId = serSvcId;
- selectedPsasvcId = svcId;
- }
- }
- celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
- if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
- entry->selectedPsaSvcId = selectedPsasvcId;
- entry->selectedSerializerSvcId = serializerSvcId;
-
- celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry,
- pstm_setupTopicReceiverCallback);
+ entry->setup = false;
+ entry->bndId = bndId;
+ entry->subscriberProperties = celix_properties_copy(props);
+ hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
- if (entry->endpoint != NULL) {
- entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
-
- //announce new endpoint through the network
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
- listener->announceEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-
- //store topic receiver.
- //TODO race condition if multiple scope/topic combinations are request -> broader the lock?
- celixThreadMutex_lock(&manager->topicReceivers.mutex);
- hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
- celixThreadMutex_unlock(&manager->topicReceivers.mutex);
- } else {
- free(entry->scopeAndTopicKey);
- free(entry);
- //ignore -> psa unregistered in meantime
- }
- }
+ //signal psa handling thread
+ celixThreadCondition_broadcast(&manager->psaHandling.cond);
}
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
}
void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
pubsub_topology_manager_t *manager = handle;
+
//NOTE local subscriber service unregister
//1) Find topic receiver and decrease count
@@ -334,20 +319,13 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle,
celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
}
-
-static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
- pstm_topic_receiver_or_sender_entry_t *entry = handle;
- pubsub_admin_service_t *psa = svc;
- psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
-}
-
void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
pubsub_topology_manager_t *manager = handle;
//NOTE new local subscriber service register
//1) First trying to see if a TopicReceiver already exists for this subscriber, if found
//2) update the usage count. if not found
- //3) Try to find a matching psa and create a new TopicReceiver.
+ //3) signal psaHandling thread to find a psa and setup TopicSender
//TODO FIXME
if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) {
@@ -356,96 +334,45 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
}
- char *topic = NULL;
+ char *topicFromFilter = NULL;
char *scopeFromFilter = NULL;
- pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
- const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+ pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter);
+ char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter;
+ char *topic = topicFromFilter;
char *scopeAndTopicKey = NULL;
if (topic == NULL) {
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
- "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.", PUBSUB_SUBSCRIBER_TOPIC);
- } else {
- scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&manager->topicSenders.mutex);
- pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
- if (entry != NULL) {
- entry->usageCount += 1;
- }
- celixThreadMutex_unlock(&manager->topicSenders.mutex);
-
- if (entry == NULL) {
- //new topic receiver needed, requesting match with current psa
- double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
- long serializerSvcId = -1L;
- long selectedPsasvcId = -1L;
-
- celixThreadMutex_lock(&manager->pubsubadmins.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *entry = hashMapIterator_nextEntry(&iter);
- long svcId = (long)hashMapEntry_getKey(entry);
- pubsub_admin_service_t *psa = hashMapEntry_getValue(entry);
- double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
- long serSvcId = -1L;
- psa->matchPublisher(psa->handle, info->bundleId, info->filter, &score, &serSvcId);
- if (score > highestScore) {
- highestScore = score;
- serializerSvcId = serSvcId;
- selectedPsasvcId = svcId;
- }
- }
- celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
- if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
- entry = calloc(1, sizeof(*entry));
- entry->scopeAndTopicKey = scopeAndTopicKey;
- entry->usageCount = 1;
- entry->selectedPsaSvcId = selectedPsasvcId;
- entry->selectedSerializerSvcId = serializerSvcId;
- entry->topic = topic; //NOTE tmp
- entry->scope = scope; //NOTE tmp
-
- celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
-
- if (entry->endpoint != NULL) {
- //note psa->setupTopicSender has created the endpoint.
- entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- entry->topic = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
- } else {
- free(entry->scopeAndTopicKey);
- free(entry);
- entry = NULL;
- //ignore -> psa unregistered in meantime
- }
-
- if (entry != NULL) {
- //announce new endpoint through the network
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
- listener->announceEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-
- //store topic sender.
- celixThreadMutex_lock(&manager->topicSenders.mutex);
- hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
- celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.",
+ PUBSUB_SUBSCRIBER_TOPIC);
+ return;
+ }
- const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
- "[PSTM] setting up new TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n",
- entry->scope, entry->topic, adminType, serType);
- }
- }
- } else {
- free(scopeAndTopicKey);
- }
+ scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+ if (entry != NULL) {
+ entry->usageCount += 1;
+ free(scope);
+ free(topic);
+ free(scopeAndTopicKey);
+ } else {
+ entry = calloc(1, sizeof(*entry));
+ entry->usageCount = 1;
+ entry->selectedSerializerSvcId = -1L;
+ entry->selectedPsaSvcId = -1L;
+ entry->scope = scope; //taking ownership
+ entry->topic = topic; //taking ownership
+ entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership
+ entry->setup = false;
+ entry->publisherFilter = celix_filter_create(info->filter->filterStr);
+ entry->bndId = info->bundleId;
+ hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
+
+ //new entry -> wakeup psaHandling thread
+ celixThreadCondition_broadcast(&manager->psaHandling.cond);
}
- free(scopeFromFilter);
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
}
void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) {
@@ -483,12 +410,6 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
free(scopeFromFilter);
}
-static void pstm_addEndpointCallback(void *handle, void *svc) {
- celix_properties_t *endpoint = handle;
- pubsub_admin_service_t *psa = svc;
- psa->addEndpoint(psa->handle, endpoint);
-}
-
celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint){
celix_status_t status = CELIX_SUCCESS;
pubsub_topology_manager_t *manager = handle;
@@ -515,51 +436,20 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
if (entry != NULL) {
//already existing endpoint -> increase usage
entry->usageCount += 1;
- }
- celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
-
- if (entry == NULL) {
-
+ } else {
//new endpoint -> new entry
entry = calloc(1, sizeof(*entry));
entry->usageCount = 1;
entry->endpoint = celix_properties_copy(endpoint);
entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
- entry->selectedPsaSvcId = -1L;
-
- double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
- long psaSvcId = -1L;
-
- celixThreadMutex_lock(&manager->pubsubadmins.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter);
- pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
- long svcId = (long) hashMapEntry_getKey(mapEntry);
- double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
- psa->matchEndpoint(psa->handle, endpoint, &score);
- if (score > highestScore) {
- highestScore = score;
- psaSvcId = svcId;
- }
- }
- celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+ entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet
+ hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry);
- if (psaSvcId >= 0) {
- //psa called outside of mutex, this means the it can happen that addEndpointCallback is not called.
- //for now this is expected behaviour;
- //You need to start the pubsub admin stuff before the bundles using pubsub.
- celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
- (void *) endpoint, pstm_addEndpointCallback);
- } else {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
- }
+ //waking up psa handling thread to select psa
+ celixThreadCondition_broadcast(&manager->psaHandling.cond);
- entry->selectedPsaSvcId = psaSvcId;
- celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
- hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry);
- celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
}
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
return status;
}
@@ -635,28 +525,34 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
if (entry != NULL && entry->usageCount <= 0) {
hashMapIterator_remove(&iter);
if (manager->verbose) {
- const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
- "[PSTM] Tearing down TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n",
- entry->scope, entry->topic, adminType, serType);
- }
-
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener;
- listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
- listener->removeEndpoint(listener->handle, entry->endpoint);
+ "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic);
}
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
- entry, pstm_teardownTopicSenderCallback);
+ if (entry->endpoint != NULL) {
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->removeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
+ PUBSUB_ADMIN_SERVICE_NAME,
+ entry, pstm_teardownTopicSenderCallback);
+ }
//cleanup entry
free(entry->scopeAndTopicKey);
- celix_properties_destroy(entry->endpoint);
+ free(entry->scope);
+ free(entry->topic);
+ if (entry->publisherFilter != NULL) {
+ celix_filter_destroy(entry->publisherFilter);
+ }
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ }
free(entry);
}
}
@@ -685,23 +581,203 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
entry->scope, entry->topic, adminType, serType);
}
- celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_teardownTopicReceiverCallback);
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
- listener->removeEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ if (entry->endpoint != NULL) {
+ celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
+ PUBSUB_ADMIN_SERVICE_NAME, entry,
+ pstm_teardownTopicReceiverCallback);
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
+ manager->announceEndpointListeners.list, i);
+ listener->removeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
//cleanup entry
free(entry->scopeAndTopicKey);
- celix_properties_destroy(entry->endpoint);
+ free(entry->scope);
+ free(entry->topic);
+ if (entry->subscriberProperties != NULL) {
+ celix_properties_destroy(entry->subscriberProperties);
+ }
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ }
free(entry);
}
}
celixThreadMutex_unlock(&manager->topicReceivers.mutex);
}
+static void pstm_addEndpointCallback(void *handle, void *svc) {
+ celix_properties_t *endpoint = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->addEndpoint(psa->handle, endpoint);
+}
+
+static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->selectedPsaSvcId < 0) {
+ long psaSvcId = -1L;
+
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+ pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+ long svcId = (long) hashMapEntry_getKey(mapEntry);
+ bool match = false;
+ psa->matchEndpoint(psa->handle, entry->endpoint, &match);
+ if (match) {
+ psaSvcId = svcId;
+ break;
+ }
+ }
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+ if (psaSvcId >= 0) {
+ celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+ (void *)entry->endpoint, pstm_addEndpointCallback);
+ } else {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
+ }
+
+ entry->selectedPsaSvcId = psaSvcId;
+ }
+ }
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+}
+
+
+static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
+ pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+}
+
+static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && !entry->setup) {
+ //new topic receiver needed, requesting match with current psa
+ double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serializerSvcId = -1L;
+ long selectedPsaSvcId = -1L;
+
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+ long svcId = (long)hashMapEntry_getKey(mapEntry);
+ pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+ double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serSvcId = -1L;
+ psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &score, &serSvcId);
+ if (score > highestScore) {
+ highestScore = score;
+ serializerSvcId = serSvcId;
+ selectedPsaSvcId = svcId;
+ }
+ }
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+ if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+ entry->selectedPsaSvcId = selectedPsaSvcId;
+ entry->selectedSerializerSvcId = serializerSvcId;
+ bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
+
+ if (called && entry->endpoint != NULL) {
+ entry->setup = true;
+
+ //announce new endpoint through the network
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->announceEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
+ }
+
+ if (!entry->setup) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic);
+ }
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+}
+
+static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
+ pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->setupTopicReciever(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+}
+
+static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->setup) {
+
+ double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serializerSvcId = -1L;
+ long selectedPsaSvcId = -1L;
+
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+ long svcId = (long) hashMapEntry_getKey(mapEntry);
+ pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+ double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serSvcId = -1L;
+
+ psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &score, &serSvcId);
+ if (score > highestScore) {
+ highestScore = score;
+ serializerSvcId = serSvcId;
+ selectedPsaSvcId = svcId;
+ }
+ }
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+ if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+ entry->selectedPsaSvcId = selectedPsaSvcId;
+ entry->selectedSerializerSvcId = serializerSvcId;
+
+ bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+ entry,
+ pstm_setupTopicReceiverCallback);
+
+ if (called && entry->endpoint != NULL) {
+ entry->setup = true;
+ //announce new endpoint through the network
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
+ manager->announceEndpointListeners.list, i);
+ listener->announceEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
+ }
+
+
+ if (!entry->setup) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic);
+ }
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+}
+
static void* pstm_psaHandlingThread(void *data) {
pubsub_topology_manager_t *manager = data;
@@ -713,8 +789,13 @@ static void* pstm_psaHandlingThread(void *data) {
pstm_teardownTopicSenders(manager);
pstm_teardownTopicReceivers(manager);
+ pstm_setupTopicSenders(manager);
+ pstm_setupTopicReceivers(manager);
+
+ pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
+
celixThreadMutex_lock(&manager->psaHandling.mutex);
- celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_CLEANUP_SLEEPTIME_IN_SECONDS, 0L);
+ celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS, 0L);
running = manager->psaHandling.running;
celixThreadMutex_unlock(&manager->psaHandling.mutex);
}
@@ -733,18 +814,24 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter);
+ const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!");
+ const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!");
+ const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
- fprintf(os, " |- scope = %s\n", scope);
- fprintf(os, " |- topic = %s\n", topic);
- fprintf(os, " |- admin type = %s\n", adminType);
- fprintf(os, " |- serializer = %s\n", serType);
+ fprintf(os, " |- container name = %s\n", cn);
+ fprintf(os, " |- fw uuid = %s\n", fwuuid);
+ fprintf(os, " |- type = %s\n", type);
+ fprintf(os, " |- scope = %s\n", scope);
+ fprintf(os, " |- topic = %s\n", topic);
+ fprintf(os, " |- admin type = %s\n", adminType);
+ fprintf(os, " |- serializer = %s\n", serType);
if (manager->verbose) {
- fprintf(os, " |- psa svc id = %li\n", discovered->selectedPsaSvcId);
- fprintf(os, " |- usage count = %i\n", discovered->usageCount);
+ fprintf(os, " |- psa svc id = %li\n", discovered->selectedPsaSvcId);
+ fprintf(os, " |- usage count = %i\n", discovered->usageCount);
}
}
celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
@@ -756,9 +843,13 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
iter = hashMapIterator_construct(manager->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->setup) {
+ continue;
+ }
+ const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
- fprintf(os, "|- Topic Sender for endpoint %s:\n", entry->endpointUUID);
+ fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
fprintf(os, " |- scope = %s\n", entry->scope);
fprintf(os, " |- topic = %s\n", entry->topic);
fprintf(os, " |- admin type = %s\n", adminType);
@@ -777,9 +868,13 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
iter = hashMapIterator_construct(manager->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->setup) {
+ continue;
+ }
+ const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
- fprintf(os, "|- Topic Receiver for endpoint %s:\n", entry->endpointUUID);
+ fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
fprintf(os, " |- scope = %s\n", entry->scope);
fprintf(os, " |- topic = %s\n", entry->topic);
fprintf(os, " |- admin type = %s\n", adminType);
@@ -793,5 +888,7 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandL
celixThreadMutex_unlock(&manager->topicReceivers.mutex);
fprintf(os,"\n");
+ fprintf(os, "TODO pending topic senders/receivers\n");
+
return CELIX_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 105b797..3cca612 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -83,7 +83,7 @@ typedef struct pubsub_topology_manager {
typedef struct pstm_discovered_endpoint_entry {
const char *uuid;
- long selectedPsaSvcId;
+ long selectedPsaSvcId; // -1L, indicates no selected psa
int usageCount; //note that discovered endpoints can be found multiple times by different pubsub discovery components
celix_properties_t *endpoint;
} pstm_discovered_endpoint_entry_t;
@@ -91,12 +91,19 @@ typedef struct pstm_discovered_endpoint_entry {
typedef struct pstm_topic_receiver_or_sender_entry {
char *scopeAndTopicKey; //key of the combined value of the scope and topic
celix_properties_t *endpoint;
- const char *topic;
- const char *scope;
- const char *endpointUUID;
+ char *topic;
+ char *scope;
int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
long selectedPsaSvcId;
long selectedSerializerSvcId;
+ long bndId;
+ bool setup;
+
+ //for sender entry
+ celix_filter_t *publisherFilter;
+
+ //for receiver entry
+ celix_properties_t *subscriberProperties;
} pstm_topic_receiver_or_sender_entry_t;
celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager);
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/libs/framework/include/celix_bundle_context.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_bundle_context.h b/libs/framework/include/celix_bundle_context.h
index 76af9a5..6ecc445 100644
--- a/libs/framework/include/celix_bundle_context.h
+++ b/libs/framework/include/celix_bundle_context.h
@@ -275,7 +275,7 @@ long celix_bundleContext_trackServices(
/**
* Service Tracker Options used to fine tune which services to track and the callback to be used for the tracked services.
*/
-typedef struct celix_service_tracker_options {
+typedef struct celix_service_tracking_options {
/**
* The service filter options, used to setup the filter for the service to track.
*/