You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/11/06 11:44:43 UTC
[1/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is
no longer part of the current API.
Repository: celix
Updated Branches:
refs/heads/feature/CELIX-454-pubsub-disc [created] b2548c845
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index bfd038b..aff03c7 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -40,66 +40,66 @@
#define PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS 30L
-static void* pstm_psaHandlingThread(void *data);
+static void *pstm_psaHandlingThread(void *data);
celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **out) {
- celix_status_t status = CELIX_SUCCESS;
-
- pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager));
- if (manager == NULL) {
- *out = NULL;
- return CELIX_ENOMEM;
- } else {
- *out = manager;
- }
-
- manager->context = context;
-
- celix_thread_mutexattr_t psaAttr;
- celixThreadMutexAttr_create(&psaAttr);
- celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
- status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
- celixThreadMutexAttr_destroy(&psaAttr);
-
- status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
- status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
- status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
- status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
- status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
-
- status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
-
- manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- manager->announceEndpointListeners.list = celix_arrayList_create();
- manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL);
- manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
- manager->loghelper = logHelper;
- manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
-
- manager->psaHandling.running = true;
- celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
- celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager");
-
- return status;
+ celix_status_t status = CELIX_SUCCESS;
+
+ pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager));
+ if (manager == NULL) {
+ *out = NULL;
+ return CELIX_ENOMEM;
+ } else {
+ *out = manager;
+ }
+
+ manager->context = context;
+
+ celix_thread_mutexattr_t psaAttr;
+ celixThreadMutexAttr_create(&psaAttr);
+ celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+ status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
+ celixThreadMutexAttr_destroy(&psaAttr);
+
+ status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
+ status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
+ status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
+ status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
+ status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
+
+ status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
+
+ manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ manager->announceEndpointListeners.list = celix_arrayList_create();
+ manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL);
+ manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ manager->loghelper = logHelper;
+ manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
+
+ manager->psaHandling.running = true;
+ celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
+ celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager");
+
+ return status;
}
celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) {
- celix_status_t status = CELIX_SUCCESS;
+ celix_status_t status = CELIX_SUCCESS;
- celixThreadMutex_lock(&manager->psaHandling.mutex);
- manager->psaHandling.running = false;
- celixThreadCondition_broadcast(&manager->psaHandling.cond);
- celixThreadMutex_unlock(&manager->psaHandling.mutex);
- celixThread_join(manager->psaHandling.thread, NULL);
+ celixThreadMutex_lock(&manager->psaHandling.mutex);
+ manager->psaHandling.running = false;
+ celixThreadCondition_broadcast(&manager->psaHandling.cond);
+ celixThreadMutex_unlock(&manager->psaHandling.mutex);
+ celixThread_join(manager->psaHandling.thread, NULL);
- celixThreadMutex_lock(&manager->pubsubadmins.mutex);
- hashMap_destroy(manager->pubsubadmins.map, false, false);
- celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
- celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hashMap_destroy(manager->pubsubadmins.map, false, false);
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+ celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
- celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
@@ -108,9 +108,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
free(entry);
}
}
- hashMap_destroy(manager->discoveredEndpoints.map, false, false);
- celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
- celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex);
+ hashMap_destroy(manager->discoveredEndpoints.map, false, false);
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+ celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex);
celixThreadMutex_lock(&manager->topicReceivers.mutex);
iter = hashMapIterator_construct(manager->topicReceivers.map);
@@ -120,6 +120,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
free(entry->scopeAndTopicKey);
free(entry->scope);
free(entry->topic);
+ if (entry->topicProperties != NULL) {
+ celix_properties_destroy(entry->topicProperties);
+ }
if (entry->endpoint != NULL) {
celix_properties_destroy(entry->endpoint);
}
@@ -139,6 +142,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
free(entry->scopeAndTopicKey);
free(entry->scope);
free(entry->topic);
+ if (entry->topicProperties != NULL) {
+ celix_properties_destroy(entry->topicProperties);
+ }
if (entry->endpoint != NULL) {
celix_properties_destroy(entry->endpoint);
}
@@ -151,70 +157,92 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
celixThreadMutex_destroy(&manager->topicSenders.mutex);
celixThreadMutex_destroy(&manager->announceEndpointListeners.mutex);
- celix_arrayList_destroy(manager->announceEndpointListeners.list);
+ celix_arrayList_destroy(manager->announceEndpointListeners.list);
- free(manager);
+ free(manager);
- return status;
+ return status;
}
-void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
- pubsub_topology_manager_t *manager = handle;
- pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
+void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+ pubsub_topology_manager_t *manager = handle;
+ pubsub_admin_service_t *psa = (pubsub_admin_service_t *) svc;
+
+
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA");
+ if (svcId >= 0) {
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hashMap_put(manager->pubsubadmins.map, (void *) svcId, psa);
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+ }
+
+ //NOTE new psa, so no endpoints announce yet
- long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA");
+ //new PSA -> every topic sender/receiver entry needs a rematch
+ int needsRematchCount = 0;
- if (svcId >= 0) {
- celixThreadMutex_lock(&manager->pubsubadmins.mutex);
- hashMap_put(manager->pubsubadmins.map, (void*)svcId, psa);
- celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
- }
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ entry->needsMatch = true;
+ ++needsRematchCount;
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ entry->needsMatch = true;
+ ++needsRematchCount;
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
- //NOTE new psa, so no endpoints announce yet
+ if (needsRematchCount > 0) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO,
+ "A PSA is added after at least one active publisher/provided. \
+ It is preferred that all PSA are started before publiser/subscriber are started!\n\
+ Current topic/sender count is %i", needsRematchCount);
+ }
- /* NOTE for now it assumed PSA / PST and PSD are started before subscribers/publisher
- * so no retroactively adding subscribers
- *
- * TODO future extension?
- */
}
-void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
- pubsub_topology_manager_t *manager = handle;
- //pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
- long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
+ pubsub_topology_manager_t *manager = handle;
+ //pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
- //NOTE psa shutdown will teardown topic receivers / topic senders
- //de-setup all topic receivers/senders for the removed psa.
- //the next psaHandling run will try to find new psa.
+ //NOTE psa shutdown will teardown topic receivers / topic senders
+ //de-setup all topic receivers/senders for the removed psa.
+ //the next psaHandling run will try to find new psa.
celixThreadMutex_lock(&manager->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry->selectedPsaSvcId == svcId) {
- /* de-announce all senders */
- if (entry->endpoint != NULL) {
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
- pubsub_announce_endpoint_listener_t *listener;
- listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
- listener->removeEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- }
-
- entry->needsMatch = true;
- entry->selectedSerializerSvcId = -1L;
- entry->selectedPsaSvcId = -1L;
- if (entry->endpoint != NULL) {
- celix_properties_destroy(entry->endpoint);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry->selectedPsaSvcId == svcId) {
+ /* de-announce all senders */
+ if (entry->endpoint != NULL) {
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+ listener->revokeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
+
+ entry->needsMatch = true;
+ entry->selectedSerializerSvcId = -1L;
+ entry->selectedPsaSvcId = -1L;
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
entry->endpoint = NULL;
}
- }
- }
+ }
+ }
celixThreadMutex_unlock(&manager->topicSenders.mutex);
celixThreadMutex_lock(&manager->topicReceivers.mutex);
@@ -222,16 +250,16 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry->selectedPsaSvcId == svcId) {
- /* de-announce all receivers */
- if (entry->endpoint != NULL) {
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
- pubsub_announce_endpoint_listener_t *listener;
- listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
- listener->removeEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- }
+ /* de-announce all receivers */
+ if (entry->endpoint != NULL) {
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+ listener->revokeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
entry->needsMatch = true;
entry->selectedSerializerSvcId = -1L;
@@ -245,207 +273,206 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
celixThreadMutex_unlock(&manager->topicReceivers.mutex);
-
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA");
}
-void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_topology_manager_t *manager = handle;
-
- //NOTE new local subscriber service register
- //1) First trying to see if a TopicReceiver already exists for this subscriber, if found
- //2) update the usage count. if not found
- //3) signal psaHandling thread to setup topic receiver
-
- const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
- const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
- if (topic == NULL) {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
- "[PSTM] Warning found subscriber service without mandatory %s property.",
- PUBSUB_SUBSCRIBER_TOPIC);
- return;
- }
-
- long bndId = celix_bundle_getId(bnd);
- char *scopeAndTopicKey = NULL;
- scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
- celixThreadMutex_lock(&manager->topicReceivers.mutex);
- pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
- if (entry != NULL) {
- entry->usageCount += 1;
- free(scopeAndTopicKey);
- } else {
- entry = calloc(1, sizeof(*entry));
- entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
- entry->scope = strndup(scope, 1024 * 1024);
- entry->topic = strndup(topic, 1024 * 1024);
- entry->usageCount = 1;
- entry->selectedPsaSvcId = -1L;
- entry->selectedSerializerSvcId = -1L;
- entry->needsMatch = true;
- entry->bndId = bndId;
- entry->subscriberProperties = celix_properties_copy(props);
- hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
-
- //signal psa handling thread
- celixThreadCondition_broadcast(&manager->psaHandling.cond);
- }
- celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
+ pubsub_topology_manager_t *manager = handle;
+
+ //NOTE new local subscriber service register
+ //1) First trying to see if a TopicReceiver already exists for this subscriber, if found
+ //2) update the usage count. if not found
+ //3) signal psaHandling thread to setup topic receiver
+
+ const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+ const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ if (topic == NULL) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+ "[PSTM] Warning found subscriber service without mandatory %s property.",
+ PUBSUB_SUBSCRIBER_TOPIC);
+ return;
+ }
+
+ long bndId = celix_bundle_getId(bnd);
+ char *scopeAndTopicKey = NULL;
+ scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
+ if (entry != NULL) {
+ entry->usageCount += 1;
+ free(scopeAndTopicKey);
+ } else {
+ entry = calloc(1, sizeof(*entry));
+ entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
+ entry->scope = strndup(scope, 1024 * 1024);
+ entry->topic = strndup(topic, 1024 * 1024);
+ entry->usageCount = 1;
+ entry->selectedPsaSvcId = -1L;
+ entry->selectedSerializerSvcId = -1L;
+ entry->needsMatch = true;
+ entry->bndId = bndId;
+ entry->subscriberProperties = celix_properties_copy(props);
+ hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
+
+ //signal psa handling thread
+ celixThreadCondition_broadcast(&manager->psaHandling.cond);
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
}
-void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_topology_manager_t *manager = handle;
+void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
+ pubsub_topology_manager_t *manager = handle;
- //NOTE local subscriber service unregister
- //1) Find topic receiver and decrease count
+ //NOTE local subscriber service unregister
+ //1) Find topic receiver and decrease count
- const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
- const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+ const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
- if (topic == NULL) {
- return;
- }
+ if (topic == NULL) {
+ return;
+ }
- char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&manager->topicReceivers.mutex);
- pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
- if (entry != NULL) {
- entry->usageCount -= 0;
- }
- celixThreadMutex_unlock(&manager->topicReceivers.mutex);
- free(scopeAndTopicKey);
+ char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
+ if (entry != NULL) {
+ entry->usageCount -= 0;
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ free(scopeAndTopicKey);
- //NOTE not waking up psaHandling thread, topic receiver does not need to be removed immediately.
+ //NOTE not waking up psaHandling thread, topic receiver does not need to be removed immediately.
}
-void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
- pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
- pubsub_announce_endpoint_listener_t *listener = svc;
-
- //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
- //2) Add listener to manager->announceEndpointListeners
-
- celixThreadMutex_lock(&manager->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL && entry->endpoint != NULL) {
- listener->announceEndpoint(listener->handle, entry->endpoint);
- }
- }
- celixThreadMutex_unlock(&manager->topicSenders.mutex);
-
- celixThreadMutex_lock(&manager->topicReceivers.mutex);
- iter = hashMapIterator_construct(manager->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL && entry->endpoint != NULL) {
- listener->announceEndpoint(listener->handle, entry->endpoint);
- }
- }
- celixThreadMutex_unlock(&manager->topicReceivers.mutex);
-
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- celix_arrayList_add(manager->announceEndpointListeners.list, listener);
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+ pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle;
+ pubsub_announce_endpoint_listener_t *listener = svc;
+
+ //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
+ //2) Add listener to manager->announceEndpointListeners
+
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->endpoint != NULL) {
+ listener->announceEndpoint(listener->handle, entry->endpoint);
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->endpoint != NULL) {
+ listener->announceEndpoint(listener->handle, entry->endpoint);
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ celix_arrayList_add(manager->announceEndpointListeners.list, listener);
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
}
-void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
- pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
- pubsub_announce_endpoint_listener_t *listener = svc;
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+ pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle;
+ pubsub_announce_endpoint_listener_t *listener = svc;
- //1) Remove listener from manager->announceEndpointListeners
- //2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints)
+ //1) Remove listener from manager->announceEndpointListeners
+ //2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints)
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
}
void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
- pubsub_topology_manager_t *manager = handle;
-
- //NOTE new local subscriber service register
- //1) First trying to see if a TopicReceiver already exists for this subscriber, if found
- //2) update the usage count. if not found
- //3) signal psaHandling thread to find a psa and setup TopicSender
-
-
- char *topicFromFilter = NULL;
- char *scopeFromFilter = NULL;
- pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter);
- char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter;
- char *topic = topicFromFilter;
-
- char *scopeAndTopicKey = NULL;
- if (topic == NULL) {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
- "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.",
- PUBSUB_SUBSCRIBER_TOPIC);
- return;
- }
-
- scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&manager->topicSenders.mutex);
- pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
- if (entry != NULL) {
- entry->usageCount += 1;
- free(scope);
- free(topic);
- free(scopeAndTopicKey);
- } else {
- entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->selectedSerializerSvcId = -1L;
- entry->selectedPsaSvcId = -1L;
- entry->scope = scope; //taking ownership
- entry->topic = topic; //taking ownership
- entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership
- entry->needsMatch = true;
- entry->publisherFilter = celix_filter_create(info->filter->filterStr);
- entry->bndId = info->bundleId;
- hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
-
- //new entry -> wakeup psaHandling thread
- celixThreadCondition_broadcast(&manager->psaHandling.cond);
- }
- celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ pubsub_topology_manager_t *manager = handle;
+
+ //NOTE new local subscriber service register
+ //1) First trying to see if a TopicReceiver already exists for this subscriber, if found
+ //2) update the usage count. if not found
+ //3) signal psaHandling thread to find a psa and setup TopicSender
+
+
+ char *topicFromFilter = NULL;
+ char *scopeFromFilter = NULL;
+ pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter);
+ char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter;
+ char *topic = topicFromFilter;
+
+ char *scopeAndTopicKey = NULL;
+ if (topic == NULL) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+ "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.",
+ PUBSUB_SUBSCRIBER_TOPIC);
+ return;
+ }
+
+ scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+ if (entry != NULL) {
+ entry->usageCount += 1;
+ free(scope);
+ free(topic);
+ free(scopeAndTopicKey);
+ } else {
+ entry = calloc(1, sizeof(*entry));
+ entry->usageCount = 1;
+ entry->selectedSerializerSvcId = -1L;
+ entry->selectedPsaSvcId = -1L;
+ entry->scope = scope; //taking ownership
+ entry->topic = topic; //taking ownership
+ entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership
+ entry->needsMatch = true;
+ entry->publisherFilter = celix_filter_create(info->filter->filterStr);
+ entry->bndId = info->bundleId;
+ hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
+
+ //new entry -> wakeup psaHandling thread
+ celixThreadCondition_broadcast(&manager->psaHandling.cond);
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
}
void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) {
- pubsub_topology_manager_t *manager = handle;
+ pubsub_topology_manager_t *manager = handle;
- //NOTE local subscriber service unregister
- //1) Find topic sender and decrease count
+ //NOTE local subscriber service unregister
+ //1) Find topic sender and decrease count
- char *topic = NULL;
- char *scopeFromFilter = NULL;
- pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
- const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+ char *topic = NULL;
+ char *scopeFromFilter = NULL;
+ pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter);
+ const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
- if (topic == NULL) {
- free(scopeFromFilter);
- return;
- }
+ if (topic == NULL) {
+ free(scopeFromFilter);
+ return;
+ }
- char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&manager->topicSenders.mutex);
- pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
- if (entry != NULL) {
- entry->usageCount -= 1;
- }
- celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+ if (entry != NULL) {
+ entry->usageCount -= 1;
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
- free(scopeAndTopicKey);
- free(topic);
- free(scopeFromFilter);
+ free(scopeAndTopicKey);
+ free(topic);
+ free(scopeFromFilter);
}
-celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint){
- celix_status_t status = CELIX_SUCCESS;
+celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+ celix_status_t status = CELIX_SUCCESS;
pubsub_topology_manager_t *manager = handle;
const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
@@ -455,92 +482,92 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
// 1) If not, find matching psa using the matchEndpoint
// 2) if found call addEndpoint of the matching psa
- if (manager->verbose) {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
- "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
- celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
- celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
- celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
- uuid);
- }
-
-
- celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
- pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
- if (entry != NULL) {
- //already existing endpoint -> increase usage
- entry->usageCount += 1;
- } else {
- //new endpoint -> new entry
- entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->endpoint = celix_properties_copy(endpoint);
- entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
- entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet
- hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry);
-
- //waking up psa handling thread to select psa
- celixThreadCondition_broadcast(&manager->psaHandling.cond);
-
- }
- celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+ if (manager->verbose) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+ "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+ uuid);
+ }
+
+
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
+ if (entry != NULL) {
+ //already existing endpoint -> increase usage
+ entry->usageCount += 1;
+ } else {
+ //new endpoint -> new entry
+ entry = calloc(1, sizeof(*entry));
+ entry->usageCount = 1;
+ entry->endpoint = celix_properties_copy(endpoint);
+ entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet
+ hashMap_put(manager->discoveredEndpoints.map, (void *) entry->uuid, entry);
+
+ //waking up psa handling thread to select psa
+ celixThreadCondition_broadcast(&manager->psaHandling.cond);
+
+ }
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
return status;
}
static void pstm_removeEndpointCallback(void *handle, void *svc) {
- celix_properties_t *endpoint = handle;
- pubsub_admin_service_t *psa = svc;
- psa->removeEndpoint(psa->handle, endpoint);
+ celix_properties_t *endpoint = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->removeDiscoveredEndpoint(psa->handle, endpoint);
}
celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_topology_manager_t *manager = handle;
- // 1) See if endpoint is already discovered, if so decrease usage count.
- // 1) If usage count becomes 0, find matching psa using the matchEndpoint
- // 2) if found call disconnectEndpoint of the matching psa
-
- const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
- assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
-
- if (manager->verbose) {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
- "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
- celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
- celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
- celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
- uuid);
- }
-
- celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
- pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
- if (entry != NULL) {
- //already existing endpoint -> decrease usage
- entry->usageCount-= 1;
- if (entry->usageCount <= 0) {
- hashMap_remove(manager->discoveredEndpoints.map, entry->uuid);
- } else {
- entry = NULL; //still used (usage count > 0) -> do nothing
- }
- }
- celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
-
- if (entry != NULL) {
- //note entry is removed from manager->discoveredEndpoints, also inform used psa
- if (entry->selectedPsaSvcId >= 0) {
- //note that it is possible that the psa is already gone, in that case the call is also not needed anymore.
- celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
- (void *) endpoint, pstm_removeEndpointCallback);
- } else {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid);
- }
- celix_properties_destroy(entry->endpoint);
- free(entry);
- }
-
-
- return CELIX_SUCCESS;
+ // 1) See if endpoint is already discovered, if so decrease usage count.
+ // 1) If usage count becomes 0, find matching psa using the matchEndpoint
+ // 2) if found call disconnectEndpoint of the matching psa
+
+ const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
+
+ if (manager->verbose) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+ "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+ celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+ uuid);
+ }
+
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid);
+ if (entry != NULL) {
+ //already existing endpoint -> decrease usage
+ entry->usageCount -= 1;
+ if (entry->usageCount <= 0) {
+ hashMap_remove(manager->discoveredEndpoints.map, entry->uuid);
+ } else {
+ entry = NULL; //still used (usage count > 0) -> do nothing
+ }
+ }
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+
+ if (entry != NULL) {
+ //note entry is removed from manager->discoveredEndpoints, also inform used psa
+ if (entry->selectedPsaSvcId >= 0) {
+ //note that it is possible that the psa is already gone, in that case the call is also not needed anymore.
+ celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+ (void *) endpoint, pstm_removeEndpointCallback);
+ } else {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid);
+ }
+ celix_properties_destroy(entry->endpoint);
+ free(entry);
+ }
+
+
+ return CELIX_SUCCESS;
}
@@ -559,45 +586,48 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) {
if (manager->verbose && entry->endpoint != NULL) {
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
- "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic);
+ "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic);
}
if (entry->endpoint != NULL) {
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener;
- listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
- listener->removeEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
- PUBSUB_ADMIN_SERVICE_NAME,
- entry, pstm_teardownTopicSenderCallback);
- }
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->revokeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
+ PUBSUB_ADMIN_SERVICE_NAME,
+ entry, pstm_teardownTopicSenderCallback);
+ }
//cleanup entry
if (entry->usageCount <= 0) {
- //no usage -> remove
- hashMapIterator_remove(&iter);
- free(entry->scopeAndTopicKey);
- free(entry->scope);
- free(entry->topic);
- if (entry->publisherFilter != NULL) {
- celix_filter_destroy(entry->publisherFilter);
- }
- if (entry->endpoint != NULL) {
- celix_properties_destroy(entry->endpoint);
- }
- free(entry);
- } else {
- //still usage, setup for new match
- if (entry->endpoint != NULL) {
- celix_properties_destroy(entry->endpoint);
- }
- entry->endpoint = NULL;
- entry->selectedPsaSvcId = -1L;
- entry->selectedSerializerSvcId = -1L;
+ //no usage -> remove
+ hashMapIterator_remove(&iter);
+ free(entry->scopeAndTopicKey);
+ free(entry->scope);
+ free(entry->topic);
+ if (entry->topicProperties != NULL) {
+ celix_properties_destroy(entry->topicProperties);
+ }
+ if (entry->publisherFilter != NULL) {
+ celix_filter_destroy(entry->publisherFilter);
+ }
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ }
+ free(entry);
+ } else {
+ //still usage, setup for new match
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ }
+ entry->endpoint = NULL;
+ entry->selectedPsaSvcId = -1L;
+ entry->selectedSerializerSvcId = -1L;
}
}
}
@@ -625,215 +655,235 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
}
if (entry->endpoint != NULL) {
- celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
- PUBSUB_ADMIN_SERVICE_NAME, entry,
- pstm_teardownTopicReceiverCallback);
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
- manager->announceEndpointListeners.list, i);
- listener->removeEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- }
-
- if (entry->usageCount <= 0) {
- //no usage -> remove
- hashMapIterator_remove(&iter);
- //cleanup entry
- free(entry->scopeAndTopicKey);
- free(entry->scope);
- free(entry->topic);
- if (entry->subscriberProperties != NULL) {
- celix_properties_destroy(entry->subscriberProperties);
- }
- if (entry->endpoint != NULL) {
- celix_properties_destroy(entry->endpoint);
- }
- free(entry);
- } else {
- //still usage -> setup for rematch
- if (entry->endpoint != NULL) {
- celix_properties_destroy(entry->endpoint);
- }
- entry->endpoint = NULL;
- entry->selectedPsaSvcId = -1L;
- entry->selectedSerializerSvcId = -1L;
- }
+ celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
+ PUBSUB_ADMIN_SERVICE_NAME, entry,
+ pstm_teardownTopicReceiverCallback);
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
+ manager->announceEndpointListeners.list, i);
+ listener->revokeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
+
+ if (entry->usageCount <= 0) {
+ //no usage -> remove
+ hashMapIterator_remove(&iter);
+ //cleanup entry
+ free(entry->scopeAndTopicKey);
+ free(entry->scope);
+ free(entry->topic);
+ if (entry->topicProperties != NULL) {
+ celix_properties_destroy(entry->topicProperties);
+ }
+ if (entry->subscriberProperties != NULL) {
+ celix_properties_destroy(entry->subscriberProperties);
+ }
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ }
+ free(entry);
+ } else {
+ //still usage -> setup for rematch
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ }
+ entry->endpoint = NULL;
+ entry->selectedPsaSvcId = -1L;
+ entry->selectedSerializerSvcId = -1L;
+ }
}
}
celixThreadMutex_unlock(&manager->topicReceivers.mutex);
}
static void pstm_addEndpointCallback(void *handle, void *svc) {
- celix_properties_t *endpoint = handle;
- pubsub_admin_service_t *psa = svc;
- psa->addEndpoint(psa->handle, endpoint);
+ celix_properties_t *endpoint = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->addDiscoveredEndpoint(psa->handle, endpoint);
}
static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
- celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL && entry->selectedPsaSvcId < 0) {
- long psaSvcId = -1L;
-
- celixThreadMutex_lock(&manager->pubsubadmins.mutex);
- hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
- while (hashMapIterator_hasNext(&iter2)) {
- hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
- pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
- long svcId = (long) hashMapEntry_getKey(mapEntry);
- bool match = false;
- psa->matchEndpoint(psa->handle, entry->endpoint, &match);
- if (match) {
- psaSvcId = svcId;
- break;
- }
- }
- celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
- if (psaSvcId >= 0) {
- celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
- (void *)entry->endpoint, pstm_addEndpointCallback);
- } else {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
- }
-
- entry->selectedPsaSvcId = psaSvcId;
- }
- }
- celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->selectedPsaSvcId < 0) {
+ long psaSvcId = -1L;
+
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+ pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+ long svcId = (long) hashMapEntry_getKey(mapEntry);
+ bool match = false;
+ psa->matchDiscoveredEndpoint(psa->handle, entry->endpoint, &match);
+ if (match) {
+ psaSvcId = svcId;
+ break;
+ }
+ }
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+ if (psaSvcId >= 0) {
+ celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+ (void *) entry->endpoint, pstm_addEndpointCallback);
+ } else {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
+ }
+
+ entry->selectedPsaSvcId = psaSvcId;
+ }
+ }
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
}
static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
- pstm_topic_receiver_or_sender_entry_t *entry = handle;
- pubsub_admin_service_t *psa = svc;
- psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+ pstm_topic_receiver_or_sender_entry_t *entry = handle;
+ pubsub_admin_service_t *psa = svc;
+ psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint);
}
static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
- celixThreadMutex_lock(&manager->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL && entry->needsMatch) {
- //new topic sender needed, requesting match with current psa
- double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
- long serializerSvcId = -1L;
- long selectedPsaSvcId = -1L;
-
- celixThreadMutex_lock(&manager->pubsubadmins.mutex);
- hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
- while (hashMapIterator_hasNext(&iter2)) {
- hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
- long svcId = (long)hashMapEntry_getKey(mapEntry);
- pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
- double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
- long serSvcId = -1L;
- psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &score, &serSvcId);
- if (score > highestScore) {
- highestScore = score;
- serializerSvcId = serSvcId;
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->needsMatch) {
+ //new topic sender needed, requesting match with current psa
+ double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serializerSvcId = -1L;
+ long selectedPsaSvcId = -1L;
+ celix_properties_t *topicPropertiesForHighestMatch = NULL;
+
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+ long svcId = (long) hashMapEntry_getKey(mapEntry);
+ pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+ double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serSvcId = -1L;
+ celix_properties_t *topicProps = NULL;
+ psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId);
+ if (score > highestScore) {
+ if (topicPropertiesForHighestMatch != NULL) {
+ celix_properties_destroy(topicPropertiesForHighestMatch);
+ }
+ highestScore = score;
+ serializerSvcId = serSvcId;
selectedPsaSvcId = svcId;
- }
- }
- celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
- if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
- entry->selectedPsaSvcId = selectedPsaSvcId;
- entry->selectedSerializerSvcId = serializerSvcId;
- bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
-
- if (called && entry->endpoint != NULL) {
- entry->needsMatch = false;
-
- //announce new endpoint through the network
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
- listener->announceEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- }
- }
-
- if (entry->needsMatch) {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic);
- }
- }
- }
- celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ topicPropertiesForHighestMatch = topicProps;
+ } else if (topicProps != NULL) {
+ celix_properties_destroy(topicProps);
+ }
+ }
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+ if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+ entry->selectedPsaSvcId = selectedPsaSvcId;
+ entry->selectedSerializerSvcId = serializerSvcId;
+ entry->topicProperties = topicPropertiesForHighestMatch;
+ bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
+
+ if (called && entry->endpoint != NULL) {
+ entry->needsMatch = false;
+
+ //announce new endpoint through the network
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+ listener->announceEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
+ }
+
+ if (entry->needsMatch) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic);
+ }
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
}
static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
pstm_topic_receiver_or_sender_entry_t *entry = handle;
pubsub_admin_service_t *psa = svc;
- psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint);
+ psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint);
}
static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
- celixThreadMutex_lock(&manager->topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry->needsMatch) {
-
- double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
- long serializerSvcId = -1L;
- long selectedPsaSvcId = -1L;
-
- celixThreadMutex_lock(&manager->pubsubadmins.mutex);
- hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
- while (hashMapIterator_hasNext(&iter2)) {
- hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
- long svcId = (long) hashMapEntry_getKey(mapEntry);
- pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
- double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
- long serSvcId = -1L;
-
- psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &score, &serSvcId);
- if (score > highestScore) {
- highestScore = score;
- serializerSvcId = serSvcId;
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry->needsMatch) {
+
+ double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serializerSvcId = -1L;
+ long selectedPsaSvcId = -1L;
+ celix_properties_t *highestMatchTopicProperties = NULL;
+
+ celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2);
+ long svcId = (long) hashMapEntry_getKey(mapEntry);
+ pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
+ double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ long serSvcId = -1L;
+ celix_properties_t *topicProps = NULL;
+
+ psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score, &serSvcId);
+ if (score > highestScore) {
+ if (highestMatchTopicProperties != NULL) {
+ celix_properties_destroy(highestMatchTopicProperties);
+ }
+ highestScore = score;
+ serializerSvcId = serSvcId;
selectedPsaSvcId = svcId;
- }
- }
- celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
-
- if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
- entry->selectedPsaSvcId = selectedPsaSvcId;
- entry->selectedSerializerSvcId = serializerSvcId;
-
- bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
- entry,
- pstm_setupTopicReceiverCallback);
-
- if (called && entry->endpoint != NULL) {
- entry->needsMatch = false;
- //announce new endpoint through the network
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
- pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
- manager->announceEndpointListeners.list, i);
- listener->announceEndpoint(listener->handle, entry->endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- }
- }
-
-
- if (entry->needsMatch) {
- logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic);
- }
- }
- }
- celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ highestMatchTopicProperties = topicProps;
+ } else if (topicProps != NULL) {
+ celix_properties_destroy(topicProps);
+ }
+ }
+ celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+ if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+ entry->selectedPsaSvcId = selectedPsaSvcId;
+ entry->selectedSerializerSvcId = serializerSvcId;
+
+ bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+ entry,
+ pstm_setupTopicReceiverCallback);
+
+ if (called && entry->endpoint != NULL) {
+ entry->needsMatch = false;
+ //announce new endpoint through the network
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+ pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
+ manager->announceEndpointListeners.list, i);
+ listener->announceEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
+ }
+
+
+ if (entry->needsMatch) {
+ logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic);
+ }
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
}
-static void* pstm_psaHandlingThread(void *data) {
+static void *pstm_psaHandlingThread(void *data) {
pubsub_topology_manager_t *manager = data;
celixThreadMutex_lock(&manager->psaHandling.mutex);
@@ -841,7 +891,7 @@ static void* pstm_psaHandlingThread(void *data) {
celixThreadMutex_unlock(&manager->psaHandling.mutex);
while (running) {
- //first teardown -> also if rematch is needed
+ //first teardown -> also if rematch is needed
pstm_teardownTopicSenders(manager);
pstm_teardownTopicReceivers(manager);
@@ -849,7 +899,7 @@ static void* pstm_psaHandlingThread(void *data) {
pstm_setupTopicSenders(manager);
pstm_setupTopicReceivers(manager);
- pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
+ pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
celixThreadMutex_lock(&manager->psaHandling.mutex);
celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS, 0L);
@@ -860,92 +910,141 @@ static void* pstm_psaHandlingThread(void *data) {
}
-celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
- pubsub_topology_manager_t *manager = handle;
- //TODO add support for searching based on scope and topic
+celix_status_t pubsub_topologyManager_shellCommand(void *handle, char *commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
+ pubsub_topology_manager_t *manager = handle;
+ //TODO add support for searching based on scope and topic
- fprintf(os, "\n");
+ fprintf(os, "\n");
- fprintf(os, "Discovered Endpoints: \n");
- celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter);
+ fprintf(os, "Discovered Endpoints: \n");
+ celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter);
const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!");
const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!");
const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
- const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
- const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
- const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
- fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
+ const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+ const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
+ const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
fprintf(os, " |- container name = %s\n", cn);
fprintf(os, " |- fw uuid = %s\n", fwuuid);
fprintf(os, " |- type = %s\n", type);
- fprintf(os, " |- scope = %s\n", scope);
- fprintf(os, " |- topic = %s\n", topic);
- fprintf(os, " |- admin type = %s\n", adminType);
- fprintf(os, " |- serializer = %s\n", serType);
- if (manager->verbose) {
- fprintf(os, " |- psa svc id = %li\n", discovered->selectedPsaSvcId);
- fprintf(os, " |- usage count = %i\n", discovered->usageCount);
- }
- }
- celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
- fprintf(os,"\n");
-
-
- fprintf(os, "Active Topic Senders:\n");
- celixThreadMutex_lock(&manager->topicSenders.mutex);
- iter = hashMapIterator_construct(manager->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry->endpoint == NULL) {
- continue;
- }
- const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
- const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
- fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
- fprintf(os, " |- scope = %s\n", entry->scope);
- fprintf(os, " |- topic = %s\n", entry->topic);
- fprintf(os, " |- admin type = %s\n", adminType);
- fprintf(os, " |- serializer = %s\n", serType);
- if (manager->verbose) {
- fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
- fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
- fprintf(os, " |- usage count = %i\n", entry->usageCount);
- }
- }
- celixThreadMutex_unlock(&manager->topicSenders.mutex);
- fprintf(os,"\n");
-
- fprintf(os, "Active Topic Receivers:\n");
- celixThreadMutex_lock(&manager->topicReceivers.mutex);
- iter = hashMapIterator_construct(manager->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry->endpoint == NULL) {
- continue;
- }
- const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
- const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
- const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
- fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
- fprintf(os, " |- scope = %s\n", entry->scope);
- fprintf(os, " |- topic = %s\n", entry->topic);
- fprintf(os, " |- admin type = %s\n", adminType);
- fprintf(os, " |- serializer = %s\n", serType);
- if (manager->verbose) {
- fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
- fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
- fprintf(os, " |- usage count = %i\n", entry->usageCount);
- }
- }
- celixThreadMutex_unlock(&manager->topicReceivers.mutex);
- fprintf(os,"\n");
-
- fprintf(os, "TODO pending topic senders/receivers\n");
-
- return CELIX_SUCCESS;
+ fprintf(os, " |- scope = %s\n", scope);
+ fprintf(os, " |- topic = %s\n", topic);
+ fprintf(os, " |- admin type = %s\n", adminType);
+ fprintf(os, " |- serializer = %s\n", serType);
+ if (manager->verbose) {
+ fprintf(os, " |- psa svc id = %li\n", discovered->selectedPsaSvcId);
+ fprintf(os, " |- usage count = %i\n", discovered->usageCount);
+ }
+ }
+ celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+ fprintf(os, "\n");
+
+
+ fprintf(os, "Active Topic Senders:\n");
+ int countPendingSenders = 0;
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry->endpoint == NULL) {
+ ++countPendingSenders;
+ continue;
+ }
+ const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
+ const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
+ fprintf(os, " |- scope = %s\n", entry->scope);
+ fprintf(os, " |- topic = %s\n", entry->topic);
+ fprintf(os, " |- admin type = %s\n", adminType);
+ fprintf(os, " |- serializer = %s\n", serType);
+ if (manager->verbose) {
+ fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
+ fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
+ fprintf(os, " |- usage count = %i\n", entry->usageCount);
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ fprintf(os, "\n");
+
+ fprintf(os, "Active Topic Receivers:\n");
+ int countPendingReceivers = 0;
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry->endpoint == NULL) {
+ ++countPendingReceivers;
+ continue;
+ }
+ const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
+ const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+ const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
+ fprintf(os, " |- scope = %s\n", entry->scope);
+ fprintf(os, " |- topic = %s\n", entry->topic);
+ fprintf(os, " |- admin type = %s\n", adminType);
+ fprintf(os, " |- serializer = %s\n", serType);
+ if (manager->verbose) {
+ fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
+ fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
+ fprintf(os, " |- usage count = %i\n", entry->usageCount);
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ fprintf(os, "\n");
+
+ if (countPendingSenders > 0) {
+ fprintf(os, "Pending Topic Senders:\n");
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ iter = hashMapIterator_construct(manager->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry->endpoint == NULL) {
+ fprintf(os, "|- Pending Topic Sender for scope/topic %s/%s:\n", entry->scope, entry->topic);
+ const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)");
+ const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
+ const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)");
+ fprintf(os, " |- requested qos = %s\n", requestedQos);
+ fprintf(os, " |- requested config = %s\n", requestedConfig);
+ fprintf(os, " |- requested serializer = %s\n", requestedSer);
+ if (manager->verbose) {
+ fprintf(os, " |- usage count = %i\n", entry->usageCount);
+ }
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+ fprintf(os, "\n");
+ }
+
+ if (countPendingReceivers > 0) {
+ fprintf(os, "Pending Topic Receivers:\n");
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry->endpoint == NULL) {
+ fprintf(os, "|- Topic Receiver for scope/topic %s/%s:\n", entry->scope, entry->topic);
+ const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)");
+ const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
+ const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)");
+ fprintf(os, " |- requested qos = %s\n", requestedQos);
+ fprintf(os, " |- requested config = %s\n", requestedConfig);
+ fprintf(os, " |- requested serializer = %s\n", requestedSer);
+ if (manager->verbose) {
+ fprintf(os, " |- usage count = %i\n", entry->usageCount);
+ }
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+ fprintf(os, "\n");
+ }
+
+
+ return CELIX_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 39277fe..e66831e 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -94,6 +94,7 @@ typedef struct pstm_topic_receiver_or_sender_entry {
long selectedPsaSvcId;
long selectedSerializerSvcId;
long bndId;
+ celix_properties_t *topicProperties; //found in META-INF/(pub|sub)/(topic).properties
//for sender entry
celix_filter_t *publisherFilter;
@@ -104,7 +105,6 @@ typedef struct pstm_topic_receiver_or_sender_entry {
celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager);
celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager);
-celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_t *manager);
void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props);
void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const celix_properties_t *props);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/libs/utils/src/properties.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/properties.c b/libs/utils/src/properties.c
index 1519ecc..3c7ba74 100644
--- a/libs/utils/src/properties.c
+++ b/libs/utils/src/properties.c
@@ -357,7 +357,10 @@ celix_properties_t* celix_properties_copy(const celix_properties_t *properties)
}
const char* celix_properties_get(const celix_properties_t *properties, const char *key, const char *defaultValue) {
- const char* value = hashMap_get((hash_map_t*)properties, (void*)key);
+ const char* value = NULL;
+ if (properties != NULL) {
+ value = hashMap_get((hash_map_t*)properties, (void*)key);
+ }
return value == NULL ? defaultValue : value;
}
[3/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is
no longer part of the current API.
Posted by pn...@apache.org.
CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.
Multipart support can come back, but then in seperated service (i.e. pubsub_multipart_subscriber / pubsub_multipart_publisher
Also
- remove multipart examples
- Add optional init function to the subscriber api, making it possible for subscriber to tweak the receiver thread
- Initial support for static zmq bind,discover and connect urls to use for (among others) testing
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b2548c84
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b2548c84
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b2548c84
Branch: refs/heads/feature/CELIX-454-pubsub-disc
Commit: b2548c84502e389b4e1565b9895e16459e9ef886
Parents: 68f69f8
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Tue Nov 6 12:41:44 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Tue Nov 6 12:41:44 2018 +0100
----------------------------------------------------------------------
bundles/pubsub/examples/CMakeLists.txt | 40 -
.../pubsub/examples/mp_pubsub/CMakeLists.txt | 23 -
.../examples/mp_pubsub/common/include/ew.h | 53 -
.../examples/mp_pubsub/common/include/ide.h | 49 -
.../mp_pubsub/common/include/kinematics.h | 55 -
.../mp_pubsub/msg_descriptors/msg_ew.descriptor | 9 -
.../msg_descriptors/msg_ide.descriptor | 9 -
.../msg_descriptors/msg_kinematics.descriptor | 10 -
.../examples/mp_pubsub/publisher/CMakeLists.txt | 43 -
.../private/include/mp_publisher_private.h | 58 -
.../publisher/private/src/mp_pub_activator.c | 144 --
.../publisher/private/src/mp_publisher.c | 160 --
.../mp_pubsub/subscriber/CMakeLists.txt | 43 -
.../private/include/mp_subscriber_private.h | 50 -
.../subscriber/private/src/mp_sub_activator.c | 108 --
.../subscriber/private/src/mp_subscriber.c | 119 --
.../private/include/pubsub_subscriber_private.h | 2 +-
.../subscriber/private/src/pubsub_subscriber.c | 2 +-
bundles/pubsub/mock/src/publisher_mock.cc | 14 -
.../pubsub_admin_udp_mc/src/psa_activator.c | 6 +-
.../src/pubsub_udpmc_admin.c | 12 +-
.../src/pubsub_udpmc_admin.h | 8 +-
.../src/pubsub_udpmc_topic_receiver.c | 7 +-
.../src/pubsub_udpmc_topic_sender.c | 10 +-
.../pubsub/pubsub_admin_zmq/src/psa_activator.c | 6 +-
.../pubsub_admin_zmq/src/pubsub_zmq_admin.c | 38 +-
.../pubsub_admin_zmq/src/pubsub_zmq_admin.h | 36 +-
.../src/pubsub_zmq_topic_receiver.c | 121 +-
.../src/pubsub_zmq_topic_receiver.h | 1 +
.../src/pubsub_zmq_topic_sender.c | 49 +-
.../src/pubsub_zmq_topic_sender.h | 1 +
.../pubsub_api/include/pubsub/publisher.h | 21 +-
.../pubsub_api/include/pubsub/subscriber.h | 20 +-
.../pubsub/pubsub_discovery/src/psd_activator.c | 2 +-
.../src/pubsub_discovery_impl.c | 2 +-
.../src/pubsub_discovery_impl.h | 2 +-
.../pubsub/pubsub_spi/include/pubsub_admin.h | 18 +-
.../pubsub_spi/include/pubsub_listeners.h | 7 +-
.../pubsub/pubsub_spi/include/pubsub_utils.h | 2 +
.../pubsub/pubsub_spi/src/pubsub_utils_match.c | 12 +-
.../src/pubsub_topology_manager.c | 1407 ++++++++++--------
.../src/pubsub_topology_manager.h | 2 +-
libs/utils/src/properties.c | 5 +-
43 files changed, 1009 insertions(+), 1777 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index e8113b9..2e43e6d 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -16,7 +16,6 @@
# under the License.
add_subdirectory(pubsub)
-add_subdirectory(mp_pubsub)
find_program(ETCD_CMD NAMES etcd)
find_program(XTERM_CMD NAMES xterm)
@@ -198,33 +197,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
)
target_link_libraries(pubsub_subscriber2_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
- # ZMQ Multipart
- add_celix_container("pubsub_mp_subscriber_zmq"
- GROUP "pubsub"
- BUNDLES
- Celix::shell
- Celix::shell_tui
- Celix::pubsub_serializer_json
- Celix::pubsub_discovery_etcd
- Celix::pubsub_topology_manager
- Celix::pubsub_admin_zmq
- org.apache.celix.pubsub_subscriber.MpSubscriber
- )
- target_link_libraries(pubsub_mp_subscriber_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
-
- add_celix_container("pubsub_mp_publisher_zmq"
- GROUP "pubsub"
- BUNDLES
- Celix::shell
- Celix::shell_tui
- Celix::pubsub_serializer_json
- Celix::pubsub_discovery_etcd
- Celix::pubsub_topology_manager
- Celix::pubsub_admin_zmq
- org.apache.celix.pubsub_publisher.MpPublisher
- )
- target_link_libraries(pubsub_mp_publisher_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
-
if (ETCD_CMD AND XTERM_CMD)
#Runtime starting two bundles using both zmq and upd mc pubsub
add_celix_runtime(pubsub_rt_zmq_udpmc_combi
@@ -251,18 +223,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
etcd
USE_TERM
)
-
- #Runtime starting a multipart (multiple message in one send) publish and subscriber for zmq
- add_celix_runtime(pubsub_rt_multipart_zmq
- NAME zmq_multipart
- GROUP pubsub
- CONTAINERS
- pubsub_mp_subscriber_zmq
- pubsub_mp_publisher_zmq
- COMMANDS
- etcd
- USE_TERM
- )
endif ()
endif()
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
deleted file mode 100644
index c828832..0000000
--- a/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-include_directories("common/include")
-
-add_subdirectory(publisher)
-add_subdirectory(subscriber)
-
-
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/ew.h b/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
deleted file mode 100644
index 81ca5f3..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ew.h
- *
- * \date Jan 15, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef EW_H_
-#define EW_H_
-
-#define MIN_AREA 50.0F
-#define MAX_AREA 15000.0F
-
-#define MSG_EW_NAME "ew" //Has to match the message name in the msg descriptor!
-
-typedef enum color{
- GREEN,
- BLUE,
- RED,
- BLACK,
- WHITE,
- LAST_COLOR
-} color_e;
-
-const char* color_tostring[] = {"GREEN","BLUE","RED","BLACK","WHITE"};
-
-struct ew_data{
- double area;
- color_e color;
-};
-
-typedef struct ew_data* ew_data_pt;
-
-#endif /* EW_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/ide.h b/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
deleted file mode 100644
index 2b9588d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ide.h
- *
- * \date Jan 15, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef IDE_H_
-#define IDE_H_
-
-#define MSG_IDE_NAME "ide" //Has to match the message name in the msg descriptor!
-
-typedef enum shape{
- SQUARE,
- CIRCLE,
- TRIANGLE,
- RECTANGLE,
- HEXAGON,
- LAST_SHAPE
-} shape_e;
-
-const char* shape_tostring[] = {"SQUARE","CIRCLE","TRIANGLE","RECTANGLE","HEXAGON"};
-
-struct ide{
- shape_e shape;
-};
-
-typedef struct ide* ide_pt;
-
-#endif /* IDE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h b/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
deleted file mode 100644
index 5601509..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * kinematics.h
- *
- * \date Nov 12, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef KINEMATICS_H_
-#define KINEMATICS_H_
-
-#define MIN_LAT -90.0F
-#define MAX_LAT 90.0F
-#define MIN_LON -180.0F
-#define MAX_LON 180.0F
-
-#define MIN_OCCUR 1
-#define MAX_OCCUR 5
-
-#define MSG_KINEMATICS_NAME "kinematics" //Has to match the message name in the msg descriptor!
-
-struct position{
- double lat;
- double lon;
-};
-
-typedef struct position position_t;
-
-struct kinematics{
- position_t position;
- int occurrences;
-};
-
-typedef struct kinematics* kinematics_pt;
-
-
-#endif /* KINEMATICS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
deleted file mode 100644
index 7eb8c29..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
+++ /dev/null
@@ -1,9 +0,0 @@
-:header
-type=message
-name=ew
-version=1.0.0
-:annotations
-classname=org.example.Ew
-:types
-:message
-{Di area color}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
deleted file mode 100644
index f26286d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
+++ /dev/null
@@ -1,9 +0,0 @@
-:header
-type=message
-name=ide
-version=1.0.0
-:annotations
-classname=org.example.Ide
-:types
-:message
-{i shape}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
deleted file mode 100644
index 447b645..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
+++ /dev/null
@@ -1,10 +0,0 @@
-:header
-type=message
-name=kinematics
-version=1.0.0
-:annotations
-classname=org.example.Kinematics
-:types
-position={DD lat long}
-:message
-{lposition;N position occurrences}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
deleted file mode 100644
index 76c9bce..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-add_celix_bundle(org.apache.celix.pubsub_publisher.MpPublisher
- SYMBOLIC_NAME "apache_celix_pubsub_mp_publisher"
- VERSION "1.0.0"
- SOURCES
- private/src/mp_pub_activator.c
- private/src/mp_publisher.c
-)
-target_link_libraries(org.apache.celix.pubsub_publisher.MpPublisher PRIVATE Celix::framework Celix::pubsub_api)
-target_include_directories(org.apache.celix.pubsub_publisher.MpPublisher PRIVATE private/include)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
- DESTINATION "META-INF/descriptors"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
- DESTINATION "META-INF/keys"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
- DESTINATION "META-INF/keys/subscriber"
-)
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h b/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
deleted file mode 100644
index 9c227fd..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_publisher_private.h
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef MP_PUBLISHER_PRIVATE_H_
-#define MP_PUBLISHER_PRIVATE_H_
-
-#include <celixbool.h>
-
-#include "pubsub/publisher.h"
-
-struct pubsub_sender {
- array_list_pt trackers;
- const char *ident;
- long bundleId;
-};
-
-typedef struct pubsub_sender * pubsub_sender_pt;
-
-typedef struct send_thread_struct{
- pubsub_publisher_pt service;
- pubsub_sender_pt publisher;
-} *send_thread_struct_pt;
-
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId);
-
-void publisher_start(pubsub_sender_pt client);
-void publisher_stop(pubsub_sender_pt client);
-
-void publisher_destroy(pubsub_sender_pt client);
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service);
-
-
-#endif /* MP_PUBLISHER_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c b/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
deleted file mode 100644
index 0b6041d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_pub_activator.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <sys/cdefs.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "constants.h"
-
-#include "mp_publisher_private.h"
-
-static const char * PUB_TOPICS[] = {
- "multipart",
- NULL
-};
-
-
-struct publisherActivator {
- pubsub_sender_pt client;
- array_list_pt trackerList;//List<service_tracker_pt>
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- celix_status_t status = CELIX_SUCCESS;
-
- struct publisherActivator * act = malloc(sizeof(*act));
-
- const char* fwUUID = NULL;
-
- bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- if(fwUUID == NULL){
- printf("MP_PUBLISHER: Cannot retrieve fwUUID.\n");
- status = CELIX_INVALID_BUNDLE_CONTEXT;
- }
-
- if (status == CELIX_SUCCESS){
- bundle_pt bundle = NULL;
- long bundleId = 0;
- bundleContext_getBundle(context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
-
- arrayList_create(&(act->trackerList));
- act->client = publisher_create(act->trackerList,fwUUID,bundleId);
- *userData = act;
- } else {
- free(act);
- }
-
- return status;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-
- struct publisherActivator * act = (struct publisherActivator *) userData;
-
- int i;
-
- char filter[128];
- for(i=0; PUB_TOPICS[i] != NULL; i++){
- const char* topic = PUB_TOPICS[i];
-
- bundle_pt bundle = NULL;
- long bundleId = 0;
- bundleContext_getBundle(context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
-
- service_tracker_pt tracker = NULL;
- memset(filter,0,128);
-
- snprintf(filter, 128, "(&(%s=%s)(%s=%s))", (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC,topic);
-
- service_tracker_customizer_pt customizer = NULL;
-
- serviceTrackerCustomizer_create(act->client,NULL,publisher_publishSvcAdded,NULL,publisher_publishSvcRemoved,&customizer);
- serviceTracker_createWithFilter(context, filter, customizer, &tracker);
-
- arrayList_add(act->trackerList,tracker);
- }
-
- publisher_start(act->client);
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_open(tracker);
- }
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) {
- struct publisherActivator * act = (struct publisherActivator *) userData;
- int i;
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_close(tracker);
- }
-
- publisher_stop(act->client);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt __attribute__((unused)) context) {
- struct publisherActivator * act = (struct publisherActivator *) userData;
- int i;
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_destroy(tracker);
- }
-
- publisher_destroy(act->client);
- arrayList_destroy(act->trackerList);
-
- free(act);
-
- return CELIX_SUCCESS;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c b/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
deleted file mode 100644
index f47be29..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_publisher.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <unistd.h>
-
-#include "service_tracker.h"
-#include "celix_threads.h"
-
-#include "ew.h"
-#include "ide.h"
-#include "kinematics.h"
-#include "mp_publisher_private.h"
-
-
-static bool stop=false;
-static celix_thread_t tid;
-
-static double randDouble(double min, double max){
- double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min))) ;
- return ret;
-}
-
-static unsigned int randInt(unsigned int min, unsigned int max){
- double scaled = ((double)random())/((double)RAND_MAX);
- return (max - min +1)*scaled + min;
-}
-
-static void* send_thread(void* arg){
-
- send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
-
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
-
- unsigned int kin_msg_id = 0;
- unsigned int ide_msg_id = 0;
- unsigned int ew_msg_id = 0;
-
- if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_KINEMATICS_NAME,&kin_msg_id) != 0 ){
- printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_KINEMATICS_NAME);
- return NULL;
- }
-
- if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_IDE_NAME,&ide_msg_id) != 0 ){
- printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_IDE_NAME);
- return NULL;
- }
-
- if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_EW_NAME,&ew_msg_id) != 0 ){
- printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_EW_NAME);
- return NULL;
- }
-
- kinematics_pt kin_data = calloc(1,sizeof(*kin_data));
- ide_pt ide_data = calloc(1,sizeof(*ide_data));
- ew_data_pt ew_data = calloc(1,sizeof(*ew_data));
-
- unsigned int counter = 1;
-
- while(stop==false){
- kin_data->position.lat = randDouble(MIN_LAT,MAX_LAT);
- kin_data->position.lon = randDouble(MIN_LON,MAX_LON);
- kin_data->occurrences = randInt(MIN_OCCUR,MAX_OCCUR);
- publish_svc->sendMultipart(publish_svc->handle,kin_msg_id,kin_data, PUBSUB_PUBLISHER_FIRST_MSG);
- printf("Track#%u kin_data: pos=[%f, %f] occurrences=%d\n",counter,kin_data->position.lat,kin_data->position.lon, kin_data->occurrences);
-
- ide_data->shape = (shape_e)randInt(0,LAST_SHAPE-1);
- publish_svc->sendMultipart(publish_svc->handle,ide_msg_id,ide_data, PUBSUB_PUBLISHER_PART_MSG);
- printf("Track#%u ide_data: shape=%s\n",counter,shape_tostring[(int)ide_data->shape]);
-
- ew_data->area = randDouble(MIN_AREA,MAX_AREA);
- ew_data->color = (color_e)randInt(0,LAST_COLOR-1);
- publish_svc->sendMultipart(publish_svc->handle,ew_msg_id,ew_data, PUBSUB_PUBLISHER_LAST_MSG);
- printf("Track#%u ew_data: area=%f color=%s\n",counter,ew_data->area,color_tostring[(int)ew_data->color]);
-
- printf("\n");
- sleep(2);
- counter++;
- }
-
- free(ew_data);
- free(ide_data);
- free(kin_data);
- free(st_struct);
-
- return NULL;
-
-}
-
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId) {
- pubsub_sender_pt publisher = malloc(sizeof(*publisher));
-
- publisher->trackers = trackers;
- publisher->ident = ident;
- publisher->bundleId = bundleId;
-
- return publisher;
-}
-
-void publisher_start(pubsub_sender_pt client) {
- printf("MP_PUBLISHER: starting up...\n");
-}
-
-void publisher_stop(pubsub_sender_pt client) {
- printf("MP_PUBLISHER: stopping...\n");
-}
-
-void publisher_destroy(pubsub_sender_pt client) {
- client->trackers = NULL;
- client->ident = NULL;
- free(client);
-}
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service;
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
-
- printf("MP_PUBLISHER: new publish service exported (%s).\n",manager->ident);
-
- send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
- data->service = publish_svc;
- data->publisher = manager;
-
- celixThread_create(&tid,NULL,send_thread,(void*)data);
- return CELIX_SUCCESS;
-}
-
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){
- //publish_service_pt publish_svc = (publish_service_pt)service;
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
- printf("MP_PUBLISHER: publish service unexported (%s)!\n",manager->ident);
- stop=true;
- celixThread_join(tid,NULL);
- return CELIX_SUCCESS;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
deleted file mode 100644
index 444764e..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-add_celix_bundle(org.apache.celix.pubsub_subscriber.MpSubscriber
- SYMBOLIC_NAME "apache_celix_pubsub_mp_subscriber"
- VERSION "1.0.0"
- SOURCES
- private/src/mp_sub_activator.c
- private/src/mp_subscriber.c
-)
-target_link_libraries(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE Celix::framework Celix::pubsub_api)
-target_include_directories(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE private/include)
-
-celix_bundle_files( org.apache.celix.pubsub_subscriber.MpSubscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
- DESTINATION "META-INF/descriptors"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber
- DESTINATION "META-INF/keys"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher/public
- DESTINATION "META-INF/keys/publisher"
-)
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h b/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
deleted file mode 100644
index 2d582b3..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_subscriber_private.h
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_SUBSCRIBER_PRIVATE_H_
-#define PUBSUB_SUBSCRIBER_PRIVATE_H_
-
-
-#include <string.h>
-
-#include "celixbool.h"
-
-#include "pubsub/subscriber.h"
-
-struct pubsub_receiver {
- char * name;
-};
-
-typedef struct pubsub_receiver* pubsub_receiver_pt;
-
-pubsub_receiver_pt subscriber_create(char* topics);
-void subscriber_start(pubsub_receiver_pt client);
-void subscriber_stop(pubsub_receiver_pt client);
-void subscriber_destroy(pubsub_receiver_pt client);
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
-
-#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c b/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
deleted file mode 100644
index 591a395..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_sub_activator.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "pubsub/subscriber.h"
-#include "bundle_activator.h"
-
-#include "mp_subscriber_private.h"
-
-#define SUB_NAME "multipart"
-static const char * SUB_TOPICS[] = {
- "multipart",
- NULL
-};
-
-struct subscriberActivator {
- array_list_pt registrationList; //List<service_registration_pt>
- pubsub_subscriber_t* subsvc;
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- struct subscriberActivator * act = calloc(1,sizeof(struct subscriberActivator));
- *userData = act;
- arrayList_create(&(act->registrationList));
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
- struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
-
- pubsub_subscriber_pt subsvc = calloc(1,sizeof(*subsvc));
- pubsub_receiver_pt sub = subscriber_create(SUB_NAME);
- subsvc->handle = sub;
- subsvc->receive = pubsub_subscriber_recv;
-
- act->subsvc = subsvc;
-
- int i;
- for(i=0; SUB_TOPICS[i] != NULL; i++){
- const char* topic = SUB_TOPICS[i];
-
- properties_pt props = properties_create();
- properties_set(props, PUBSUB_SUBSCRIBER_TOPIC,topic);
- service_registration_pt reg = NULL;
- bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, subsvc, props, ®);
- arrayList_add(act->registrationList,reg);
- }
-
- subscriber_start((pubsub_receiver_pt)act->subsvc->handle);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
- struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
- int i;
- for(i=0; i<arrayList_size(act->registrationList);i++){
- service_registration_pt reg = arrayList_get(act->registrationList,i);
- serviceRegistration_unregister(reg);
-
- }
-
- subscriber_stop((pubsub_receiver_pt)act->subsvc->handle);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
-
- struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
- act->subsvc->receive = NULL;
- subscriber_destroy((pubsub_receiver_pt)act->subsvc->handle);
- act->subsvc->handle = NULL;
- free(act->subsvc);
- act->subsvc = NULL;
-
- arrayList_destroy(act->registrationList);
- free(act);
-
- return CELIX_SUCCESS;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c b/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
deleted file mode 100644
index a5ad03a..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_subscriber.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "ew.h"
-#include "ide.h"
-#include "kinematics.h"
-#include "mp_subscriber_private.h"
-
-pubsub_receiver_pt subscriber_create(char* topics) {
- pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
- sub->name = strdup(topics);
- return sub;
-}
-
-
-void subscriber_start(pubsub_receiver_pt subscriber){
- printf("MP_SUBSCRIBER: starting up...\n");
-}
-
-void subscriber_stop(pubsub_receiver_pt subscriber){
- printf("MP_SUBSCRIBER: stopping...\n");
-}
-
-void subscriber_destroy(pubsub_receiver_pt subscriber){
- if(subscriber->name!=NULL){
- free(subscriber->name);
- }
- subscriber->name=NULL;
- free(subscriber);
-}
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
-
- unsigned int kin_msg_id = 0;
- unsigned int ide_msg_id = 0;
- unsigned int ew_msg_id = 0;
-
- if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_KINEMATICS_NAME,&kin_msg_id) != 0 ){
- printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_KINEMATICS_NAME);
- return -1;
- }
-
- if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_IDE_NAME,&ide_msg_id) != 0 ){
- printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_IDE_NAME);
- return -1;
- }
-
- if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_EW_NAME,&ew_msg_id) != 0 ){
- printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_EW_NAME);
- return -1;
- }
-
- if(msgTypeId!=kin_msg_id){
- printf("MP_SUBSCRIBER: Multipart Message started with wrong message (expected %u, got %u)\n",msgTypeId,kin_msg_id);
- return -1;
- }
-
- kinematics_pt kin_data = (kinematics_pt)msg;
-
- void* ide_msg = NULL;
- callbacks->getMultipart(callbacks->handle,ide_msg_id,false,&ide_msg);
-
- void* ew_msg = NULL;
- callbacks->getMultipart(callbacks->handle,ew_msg_id,false,&ew_msg);
-
- if(kin_data==NULL){
- printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_KINEMATICS_NAME);
- }
- else{
- printf("kin_data: pos=[%f, %f] occurrences=%d\n",kin_data->position.lat,kin_data->position.lon, kin_data->occurrences);
- }
-
- if(ide_msg==NULL){
- printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_IDE_NAME);
- }
- else{
- ide_pt ide_data = (ide_pt)ide_msg;
- printf("ide_data: shape=%s\n",shape_tostring[(int)ide_data->shape]);
- }
-
- if(ew_msg==NULL){
- printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_EW_NAME);
- }
- else{
- ew_data_pt ew_data = (ew_data_pt)ew_msg;
- printf("ew_data: area=%f color=%s\n",ew_data->area,color_tostring[(int)ew_data->color]);
- }
-
- printf("\n");
-
- return 0;
-
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
index 00ca9b4..291a6aa 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -45,7 +45,7 @@ void subscriber_start(pubsub_receiver_pt client);
void subscriber_stop(pubsub_receiver_pt client);
void subscriber_destroy(pubsub_receiver_pt client);
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release);
#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index a137253..7cfbedb 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -53,7 +53,7 @@ void subscriber_destroy(pubsub_receiver_pt subscriber){
free(subscriber);
}
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release){
location_t place = (location_t)msg;
int nrchars = 25;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/mock/src/publisher_mock.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/mock/src/publisher_mock.cc b/bundles/pubsub/mock/src/publisher_mock.cc
index e8902a8..1bdae98 100644
--- a/bundles/pubsub/mock/src/publisher_mock.cc
+++ b/bundles/pubsub/mock/src/publisher_mock.cc
@@ -45,24 +45,10 @@ static int pubsub__publisherMock_send(void *handle, unsigned int msgTypeId, cons
}
/*============================================================================
- MOCK - mock function for pubsub_publisher->sendMultipart
- ============================================================================*/
-static int pubsub__publisherMock_sendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags) {
- return mock(PUBSUB_PUBLISHERMOCK_SCOPE)
- .actualCall(PUBSUB_PUBLISHERMOCK_SEND_MULTIPART_METHOD)
- .withPointerParameter("handle", handle)
- .withParameter("msgTypeId", msgTypeId)
- .withPointerParameter("msg", (void*)msg)
- .withParameter("flags", flags)
- .returnIntValue();
-}
-
-/*============================================================================
MOCK - mock setup for publisher service
============================================================================*/
void pubsub_publisherMock_init(pubsub_publisher_t* srv, void* handle) {
srv->handle = handle;
srv->localMsgTypeIdForMsgType = pubsub__publisherMock_localMsgTypeIdForMsgType;
srv->send = pubsub__publisherMock_send;
- srv->sendMultipart = pubsub__publisherMock_sendMultipart;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index c838899..63de809 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -71,13 +71,13 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
psaSvc->handle = act->admin;
psaSvc->matchPublisher = pubsub_udpmcAdmin_matchPublisher;
psaSvc->matchSubscriber = pubsub_udpmcAdmin_matchSubscriber;
- psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint;
+ psaSvc->matchDiscoveredEndpoint = pubsub_udpmcAdmin_matchEndpoint;
psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
psaSvc->setupTopicReceiver = pubsub_udpmcAdmin_setupTopicReceiver;
psaSvc->teardownTopicReceiver = pubsub_udpmcAdmin_teardownTopicReceiver;
- psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint;
- psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint;
+ psaSvc->addDiscoveredEndpoint = pubsub_udpmcAdmin_addEndpoint;
+ psaSvc->removeDiscoveredEndpoint = pubsub_udpmcAdmin_removeEndpoint;
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDPMC_ADMIN_TYPE);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 6d06ee6..73fab20 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -244,23 +244,23 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
free(psa);
}
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
pubsub_udpmc_admin_t *psa = handle;
L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
*outScore = score;
return status;
}
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
pubsub_udpmc_admin_t *psa = handle;
L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
if (outScore != NULL) {
*outScore = score;
}
@@ -278,7 +278,7 @@ celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_propert
return status;
}
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_udpmc_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
@@ -358,7 +358,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
return status;
}
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_udpmc_admin_t *psa = handle;
celix_properties_t *newEndpoint = NULL;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 02ebd44..17b8957 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -40,14 +40,14 @@ typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index efebf7c..a2f37d4 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -396,13 +396,8 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
if (status == CELIX_SUCCESS) {
bool release = true;
- pubsub_multipart_callbacks_t mp_callbacks;
- mp_callbacks.handle = receiver;
- mp_callbacks.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
- mp_callbacks.getMultipart = NULL;
-
pubsub_subscriber_t *svc = entry->svc;
- svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+ svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &release);
if(release){
msgSer->freeMsg(msgSer,msgInst);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 59c809c..ddc6172 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -79,7 +79,7 @@ typedef struct pubsub_msg{
static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
-static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback);
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg);
static unsigned int rand_range(unsigned int min, unsigned int max);
pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
@@ -205,7 +205,6 @@ static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *r
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
entry->service.send = psa_udpmc_topicPublicationSend;
- entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
svc = &entry->service;
} else {
@@ -277,7 +276,7 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
msg->payloadSize = serializedOutputLen;
- if (psa_udpmc_sendMsg(entry, msg, true, NULL) == false) {
+ if (psa_udpmc_sendMsg(entry, msg) == false) {
status = -1;
}
free(msg);
@@ -306,7 +305,7 @@ static void delay_first_send_for_late_joiners(){
}
}
-static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback) {
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg) {
const int iovec_len = 3; // header + size + payload
bool ret = true;
@@ -325,9 +324,6 @@ static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_m
ret = false;
}
- if(releaseCallback) {
- releaseCallback->release(msg->payload, entry);
- }
return ret;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index 01547cc..58f093a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -70,13 +70,13 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
psaSvc->handle = act->admin;
psaSvc->matchPublisher = pubsub_zmqAdmin_matchPublisher;
psaSvc->matchSubscriber = pubsub_zmqAdmin_matchSubscriber;
- psaSvc->matchEndpoint = pubsub_zmqAdmin_matchEndpoint;
+ psaSvc->matchDiscoveredEndpoint = pubsub_zmqAdmin_matchDiscoveredEndpoint;
psaSvc->setupTopicSender = pubsub_zmqAdmin_setupTopicSender;
psaSvc->teardownTopicSender = pubsub_zmqAdmin_teardownTopicSender;
psaSvc->setupTopicReceiver = pubsub_zmqAdmin_setupTopicReceiver;
psaSvc->teardownTopicReceiver = pubsub_zmqAdmin_teardownTopicReceiver;
- psaSvc->addEndpoint = pubsub_zmqAdmin_addEndpoint;
- psaSvc->removeEndpoint = pubsub_zmqAdmin_removeEndpoint;
+ psaSvc->addDiscoveredEndpoint = pubsub_zmqAdmin_addDiscoveredEndpoint;
+ psaSvc->removeDiscoveredEndpoint = pubsub_zmqAdmin_removeDiscoveredEndpoint;
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index a13cb26..0192bb4 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -308,30 +308,30 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
celixThreadMutex_unlock(&psa->serializers.mutex);
}
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_ZMQ_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
*outScore = score;
return status;
}
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_ZMQ_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
if (outScore != NULL) {
*outScore = score;
}
return status;
}
-celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
@@ -342,7 +342,7 @@ celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_propertie
return status;
}
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_zmq_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
@@ -353,6 +353,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
celix_properties_t *newEndpoint = NULL;
+ const char * staticBindUrl = topicProperties != NULL ?
+ celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_BIND_URL, NULL) : NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->serializers.mutex);
@@ -361,8 +363,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
if (sender == NULL) {
psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
if (serEntry != NULL) {
- sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc, psa->ipAddress,
- psa->basePort, psa->maxPort);
+ sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
+ psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
}
if (sender != NULL) {
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
@@ -370,6 +372,15 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
serType, NULL);
celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
+
+ //if configured use a static discover url
+ const char *staticDiscUrl = topicProperties != NULL ?
+ celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_DISCOVER_URL, NULL) : NULL;
+ if (staticDiscUrl != NULL) {
+ celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl);
+ }
+ celix_properties_setBool(newEndpoint, PUBSUB_ZMQ_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL);
+
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -423,11 +434,14 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
return status;
}
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_zmq_admin_t *psa = handle;
celix_properties_t *newEndpoint = NULL;
+ const char *staticConnectUrls = topicProperties != NULL ?
+ celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_CONNECT_URLS, NULL) : NULL;
+
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -435,7 +449,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
if (receiver == NULL) {
psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
if (serEntry != NULL) {
- receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc);
+ receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, staticConnectUrls, serializerSvcId, serEntry->svc);
} else {
L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
}
@@ -530,7 +544,7 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
return status;
}
-celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_zmq_admin_t *psa = handle;
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
@@ -581,7 +595,7 @@ static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_
return status;
}
-celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_zmq_admin_t *psa = handle;
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index 180a9db..304ad11 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -35,23 +35,45 @@
#define PUBSUB_ZMQ_DEFAULT_IP "127.0.0.1"
+/**
+ * Can be set in the topic properties to fix a static bind url
+ */
+#define PUBSUB_ZMQ_STATIC_BIND_URL "zmq.static.bind.url"
+
+/**
+ * Can be set in the topic properties to fix a static url used for discovery
+ */
+#define PUBSUB_ZMQ_STATIC_DISCOVER_URL "zmq.static.bind.url"
+
+/**
+ * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_ZMQ_STATIC_CONFIGURED "zmq.static.configured"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_ZMQ_STATIC_CONNECT_URLS "zmq.static.connect.url"
+
+
typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index b348fdc..bb190cc 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -66,38 +66,46 @@ struct pubsub_zmq_topic_receiver {
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+ bool allConnected; //true if all requestedConnectection are connected
} requestedConnections;
long subscriberTrackerId;
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ bool allInitialized;
} subscribers;
};
typedef struct psa_zmq_requested_connection_entry {
char *url;
bool connected;
+ bool statically; //true if the connection is statically configured through the topic properties.
} psa_zmq_requested_connection_entry_t;
typedef struct psa_zmq_subscriber_entry {
int usageCount;
hash_map_t *msgTypes; //map from serializer svc
pubsub_subscriber_t *svc;
+ bool initialized; //true if the init function is called through the receive thread
} psa_zmq_subscriber_entry_t;
static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void* psa_zmq_recvThread(void * data);
+static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver);
+static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
+
pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
log_helper_t *logHelper,
const char *scope,
- const char *topic,
- long serializerSvcId,
- pubsub_serializer_service_t *serializer) {
+ const char *topic,
+ const char *staticConnectUrls,
+ long serializerSvcId,
+ pubsub_serializer_service_t *serializer) {
pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
@@ -185,6 +193,22 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
}
+ if (receiver->zmqSocket != NULL && staticConnectUrls != NULL) {
+ char *urlsCopy = strndup(staticConnectUrls, 1024*1024);
+ char* url;
+ char* save = urlsCopy;
+
+ while ((url = strtok_r(save, " ", &save))) {
+ psa_zmq_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+ entry->statically = true;
+ entry->connected = false;
+ entry->url = strndup(url, 1024*1024);
+ hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+ receiver->requestedConnections.allConnected = false;
+ }
+ free(urlsCopy);
+ }
+
//track subscribers
if (receiver->zmqSocket != NULL ) {
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -280,10 +304,12 @@ void pubsub_zmqTopicReceiver_listConnections(pubsub_zmq_topic_receiver_t *receiv
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ char *url = NULL;
+ asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
if (entry->connected) {
- celix_arrayList_add(connectedUrls, strndup(entry->url, 1024));
+ celix_arrayList_add(connectedUrls, url);
} else {
- celix_arrayList_add(unconnectedUrls, strndup(entry->url, 1024));
+ celix_arrayList_add(unconnectedUrls, url);
}
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
@@ -301,16 +327,13 @@ void pubsub_zmqTopicReceiver_connectTo(
entry = calloc(1, sizeof(*entry));
entry->url = strndup(url, 1024*1024);
entry->connected = false;
+ entry->statically = false;
hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
- }
- if (!entry->connected) {
- if (zsock_connect(receiver->zmqSocket, "%s", url) == 0) {
- entry->connected = true;
- } else {
- L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", url, strerror(errno));
- }
+ receiver->requestedConnections.allConnected = false;
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ psa_zmq_connectToAllRequestedConnections(receiver);
}
void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url) {
@@ -351,6 +374,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
entry = calloc(1, sizeof(*entry));
entry->usageCount = 1;
entry->svc = svc;
+ entry->initialized = false;
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
if (rc == 0) {
@@ -396,7 +420,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
if(status == CELIX_SUCCESS) {
bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
if (release) {
msgSer->freeMsg(msgSer->handle, deserializedMsg);
}
@@ -428,7 +452,23 @@ static void* psa_zmq_recvThread(void * data) {
bool running = receiver->recvThread.running;
celixThreadMutex_unlock(&receiver->recvThread.mutex);
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ bool allConnected = receiver->requestedConnections.allConnected;
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ bool allInitialized = receiver->subscribers.allInitialized;
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+
while (running) {
+ if (!allConnected) {
+ psa_zmq_connectToAllRequestedConnections(receiver);
+ }
+ if (!allInitialized) {
+ psa_zmq_initializeAllSubscribers(receiver);
+ }
+
zmsg_t *zmsg = zmsg_recv(receiver->zmqSocket);
if (zmsg != NULL) {
if (zmsg_size(zmsg) != 3) {
@@ -458,7 +498,62 @@ static void* psa_zmq_recvThread(void * data) {
celixThreadMutex_lock(&receiver->recvThread.mutex);
running = receiver->recvThread.running;
celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ allConnected = receiver->requestedConnections.allConnected;
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ allInitialized = receiver->subscribers.allInitialized;
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
} // while
return NULL;
}
+
+
+static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver) {
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ if (!receiver->requestedConnections.allConnected) {
+ bool allConnected = true;
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->connected){
+ if (zsock_connect(receiver->zmqSocket, "%s", entry->url) == 0) {
+ entry->connected = true;
+ } else {
+ L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", entry->url, strerror(errno));
+ allConnected = false;
+ }
+ }
+ }
+ receiver->requestedConnections.allConnected = allConnected;
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver) {
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ if (!receiver->subscribers.allInitialized) {
+ bool allInitialized = true;
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->initialized) {
+ int rc = 0;
+ if (entry->svc != NULL && entry->svc->init != NULL) {
+ rc = entry->svc->init(entry->svc->handle);
+ }
+ if (rc == 0) {
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
+ }
+ }
+ receiver->subscribers.allInitialized = allInitialized;
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index 9049153..d093bfd 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -27,6 +27,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
log_helper_t *logHelper,
const char *scope,
const char *topic,
+ const char *staticConnectUrls,
long serializerSvcId,
pubsub_serializer_service_t *serializer);
void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index f6221a2..9ad7783 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -95,6 +95,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
long serializerSvcId,
pubsub_serializer_service_t *ser,
const char *bindIP,
+ const char *staticBindUrl,
unsigned int basePort,
unsigned int maxPort) {
pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
@@ -172,29 +173,38 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
}
#endif
- int rv = -1, retry=0;
- while(rv==-1 && retry < ZMQ_BIND_MAX_RETRY ) {
- /* Randomized part due to same bundle publishing on different topics */
- unsigned int port = rand_range(basePort,maxPort);
+ if (staticBindUrl != NULL) {
+ int rv = zsock_bind (socket, "%s", staticBindUrl);
+ if (rv == -1) {
+ L_WARN("Error for zmq_bind using static bind url '%s'. %s", staticBindUrl, strerror(errno));
+ } else {
+ sender->url = strndup(staticBindUrl, 1024*1024);
+ }
+ } else {
- size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
- char *url = calloc(len, sizeof(char*));
- snprintf(url, len, "tcp://%s:%u", bindIP, port);
+ int retry = 0;
+ while (sender->url == NULL && retry < ZMQ_BIND_MAX_RETRY) {
+ /* Randomized part due to same bundle publishing on different topics */
+ unsigned int port = rand_range(basePort, maxPort);
- len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
- char *bindUrl = calloc(len, sizeof(char));
- snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
+ char *url = NULL;
+ asprintf(&url, "tcp://%s:%u", bindIP, port);
- rv = zsock_bind (socket, "%s", bindUrl);
- if (rv == -1) {
- perror("Error for zmq_bind");
- free(url);
- } else {
- sender->url = url;
- sender->zmq.socket = socket;
+ char *bindUrl = NULL;
+ asprintf(&bindUrl, "tcp://0.0.0.0:%u", port);
+
+
+ int rv = zsock_bind(socket, "%s", bindUrl);
+ if (rv == -1) {
+ L_WARN("Error for zmq_bind using dynamic bind url '%s'. %s", bindUrl, strerror(errno));
+ free(url);
+ } else {
+ sender->url = url;
+ sender->zmq.socket = socket;
+ }
+ retry++;
+ free(bindUrl);
}
- retry++;
- free(bindUrl);
}
}
@@ -305,7 +315,6 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
entry->service.send = psa_zmq_topicPublicationSend;
- entry->service.sendMultipart = NULL; //not supported TODO remove
hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
} else {
L_ERROR("Error creating serializer map for ZMQ TopicSender %s/%s", sender->scope, sender->topic);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
index e537edd..af63870 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -31,6 +31,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
long serializerSvcId,
pubsub_serializer_service_t *ser,
const char *bindIP,
+ const char *staticBindUrl,
unsigned int basePort,
unsigned int maxPort);
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);
[2/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is
no longer part of the current API.
Posted by pn...@apache.org.
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 9f7f3b6..fb8c5f6 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -30,7 +30,7 @@
#include <stdlib.h>
#define PUBSUB_PUBLISHER_SERVICE_NAME "pubsub.publisher"
-#define PUBSUB_PUBLISHER_SERVICE_VERSION "2.0.0"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION "3.0.0"
//properties
#define PUBSUB_PUBLISHER_TOPIC "topic"
@@ -38,17 +38,7 @@
#define PUBSUB_PUBLISHER_CONFIG "pubsub.config"
#define PUBSUB_PUBLISHER_SCOPE_DEFAULT "default"
-//flags
-#define PUBSUB_PUBLISHER_FIRST_MSG 01
-#define PUBSUB_PUBLISHER_PART_MSG 02
-#define PUBSUB_PUBLISHER_LAST_MSG 04
-struct pubsub_release_callback_struct {
- void *handle;
- void (*release)(char *buf, void *handle);
-};
-typedef struct pubsub_release_callback_struct pubsub_release_callback_t;
-typedef struct pubsub_release_callback_struct* pubsub_release_callback_pt;
struct pubsub_publisher {
@@ -71,15 +61,6 @@ struct pubsub_publisher {
*/
int (*send)(void *handle, unsigned int msgTypeId, const void *msg);
-
- /**
- * sendMultipart is a async function, but the msg can be safely deleted after send returns.
- * The first (primary) message of a multipart message must have the flag PUBLISHER_PRIMARY_MSG
- * The last message of a multipart message must have the flag PUBLISHER_LAST_MSG
- * Returns 0 on success.
- */
- int (*sendMultipart)(void *handle, unsigned int msgTypeId, const void *msg, int flags);
-
};
typedef struct pubsub_publisher pubsub_publisher_t;
typedef struct pubsub_publisher* pubsub_publisher_pt;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index ca6d4d1..7a0d85b 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -30,7 +30,7 @@
#include <stdbool.h>
#define PUBSUB_SUBSCRIBER_SERVICE_NAME "pubsub.subscriber"
-#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "2.0.0"
+#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "3.0.0"
//properties
#define PUBSUB_SUBSCRIBER_TOPIC "topic"
@@ -38,17 +38,15 @@
#define PUBSUB_SUBSCRIBER_CONFIG "pubsub.config"
#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default"
-
-struct pubsub_multipart_callbacks_struct {
- void *handle;
- int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgId);
- int (*getMultipart)(void *handle, unsigned int msgTypeId, bool retain, void **part);
-};
-typedef struct pubsub_multipart_callbacks_struct pubsub_multipart_callbacks_t;
-typedef struct pubsub_multipart_callbacks_struct* pubsub_multipart_callbacks_pt;
-
+
struct pubsub_subscriber_struct {
void *handle;
+
+ /**
+ * Called to initialize the subscriber with the receiver thread.
+ * Can be used to tweak the receiver thread attributes
+ */
+ int (*init)(void *handle);
/**
* When a new message for a topic is available the receive will be called.
@@ -64,7 +62,7 @@ struct pubsub_subscriber_struct {
*
* this method can be NULL.
*/
- int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
+ int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
};
typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index 56d8997..06e3c56 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -67,7 +67,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
act->listenerSvc.handle = act->pubsub_discovery;
act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint;
- act->listenerSvc.removeEndpoint = pubsub_discovery_removeEndpoint;
+ act->listenerSvc.revokeEndpoint = pubsub_discovery_revokeEndpoint;
//register shell command service
//register shell command
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index de708ff..673bdfd 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -422,7 +422,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
return status;
}
-celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_discovery_t *disc = handle;
celix_status_t status = CELIX_SUCCESS;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index b2726fb..269fe5a 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -95,7 +95,7 @@ void pubsub_discovery_discoveredEndpointsListenerAdded(void *handle, void *svc,
void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_properties_t *endpoint);
celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine, FILE *os, FILE *errorStream);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index 7f1cc73..64abe24 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -32,8 +32,8 @@
#include "celix_filter.h"
#define PUBSUB_ADMIN_SERVICE_NAME "pubsub_admin"
-#define PUBSUB_ADMIN_SERVICE_VERSION "2.0.0"
-#define PUBSUB_ADMIN_SERVICE_RANGE "[2,3)"
+#define PUBSUB_ADMIN_SERVICE_VERSION "3.0.0"
+#define PUBSUB_ADMIN_SERVICE_RANGE "[3,4)"
//expected service properties
#define PUBSUB_ADMIN_SERVICE_TYPE "psa_type"
@@ -44,20 +44,20 @@
struct pubsub_admin_service {
void *handle;
- celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
- celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
- celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
+ celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScopre, long *outSerializerSvcId);
+ celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
+ celix_status_t (*matchDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
//note endpoint is owned by caller
- celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+ celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
celix_status_t (*teardownTopicSender)(void *handle, const char *scope, const char *topic);
//note endpoint is owned by caller
- celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+ celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t (*teardownTopicReceiver)(void *handle, const char *scope, const char *topic);
- celix_status_t (*addEndpoint)(void *handle, const celix_properties_t *endpoint);
- celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *endpoint);
+ celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
+ celix_status_t (*removeDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
};
typedef struct pubsub_admin_service pubsub_admin_service_t;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
index 7e4bfec..be4e569 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
@@ -33,15 +33,12 @@ typedef struct pubsub_discovered_endpoint_listener pubsub_discovered_endpoint_li
-//Informs the discovery admins to publish info into the network
+//Informs the pubsub discoveries to announce/revoke endpoint
struct pubsub_announce_endpoint_listener {
void *handle;
celix_status_t (*announceEndpoint)(void *handle, const celix_properties_t *properties);
- celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *properties);
-
- //getCurrentSubscriberEndPoints
- //getCurrentPublisherEndPoints
+ celix_status_t (*revokeEndpoint)(void *handle, const celix_properties_t *properties);
};
typedef struct pubsub_announce_endpoint_listener pubsub_announce_endpoint_listener_t;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 66cc44a..9d707f3 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -48,6 +48,7 @@ double pubsub_utils_matchPublisher(
double sampleScore,
double controlScore,
double defaultScore,
+ celix_properties_t **outTopicProperties,
long *outSerializerSvcId);
double pubsub_utils_matchSubscriber(
@@ -58,6 +59,7 @@ double pubsub_utils_matchSubscriber(
double sampleScore,
double controlScore,
double defaultScore,
+ celix_properties_t **outTopicProperties,
long *outSerializerSvcId);
bool pubsub_utils_matchEndpoint(
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index dc5c35e..42c141f 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -107,6 +107,7 @@ double pubsub_utils_matchPublisher(
double sampleScore,
double controlScore,
double defaultScore,
+ celix_properties_t **outTopicProperties,
long *outSerializerSvcId) {
celix_properties_t *ep = pubsubEndpoint_createFromPublisherTrackerInfo(ctx, bundleId, filter);
@@ -132,8 +133,10 @@ double pubsub_utils_matchPublisher(
*outSerializerSvcId = serializerSvcId;
}
- if (ep != NULL) {
- celix_properties_destroy(ep); //TODO improve function to that tmp endpoint is not needed -> parse filter
+ if (outTopicProperties != NULL) {
+ *outTopicProperties = ep;
+ } else if (ep != NULL) {
+ celix_properties_destroy(ep);
}
return score;
@@ -159,6 +162,7 @@ double pubsub_utils_matchSubscriber(
double sampleScore,
double controlScore,
double defaultScore,
+ celix_properties_t **outTopicProperties,
long *outSerializerSvcId) {
pubsub_get_topic_properties_data_t data;
@@ -188,7 +192,9 @@ double pubsub_utils_matchSubscriber(
*outSerializerSvcId = serializerSvcId;
}
- if (ep != NULL) {
+ if (outTopicProperties != NULL) {
+ *outTopicProperties = ep;
+ } else if (ep != NULL) {
celix_properties_destroy(ep);
}