You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/17 18:38:54 UTC
[celix] 01/01: Refactors all pubsub admins to support multiple
subscriber services with the same topic/scope combination in a single
bundle.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/store_pubsub_subscriber_with_svcid
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 9a1c906b54c6cd38ce5820b27a432e4c20678bc2
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed Jun 17 20:38:37 2020 +0200
Refactors all pubsub admins to support multiple subscriber services with the same topic/scope combination in a single bundle.
---
bundles/pubsub/CMakeLists.txt | 10 +---
.../src/pubsub_tcp_topic_receiver.c | 66 +++++++++++++---------
.../src/pubsub_udpmc_topic_receiver.c | 57 +++++++++++--------
.../src/pubsub_websocket_topic_receiver.c | 55 +++++++++++-------
.../src/pubsub_zmq_topic_receiver.c | 60 ++++++++++++--------
5 files changed, 144 insertions(+), 104 deletions(-)
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index f4ac92a..90d762a 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -34,19 +34,11 @@ if (PUBSUB)
add_subdirectory(pubsub_admin_udp_mc)
endif (BUILD_PUBSUB_PSA_UDP_MC)
- set(BUILD_PUBSUB_PSA_WS_DEFAULT ON)
- if (APPLE)
- set(BUILD_PUBSUB_PSA_WS_DEFAULT OFF)
- endif ()
- option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ${BUILD_PUBSUB_PSA_WS_DEFAULT})
+ option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
if (BUILD_PUBSUB_PSA_WS)
add_subdirectory(pubsub_admin_websocket)
- if (APPLE)
- message(WARNING "WebSocket PubSub Admin not supported on OSX, the tests are failing! See issue #161")
- endif ()
endif (BUILD_PUBSUB_PSA_WS)
-
add_subdirectory(pubsub_api)
add_subdirectory(pubsub_utils)
add_subdirectory(pubsub_spi)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index e327662..442c087 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -37,6 +37,7 @@
#include "celix_utils_api.h"
#include <uuid/uuid.h>
#include <pubsub_admin_metrics.h>
+#include <celix_api.h>
#define MAX_EPOLL_EVENTS 16
#ifndef UUID_STR_LEN
@@ -110,11 +111,9 @@ typedef struct psa_tcp_subscriber_metrics_entry_t {
} psa_tcp_subscriber_metrics_entry_t;
typedef struct psa_tcp_subscriber_entry {
- int usageCount;
hash_map_t *msgTypes; //map from serializer svc
- hash_map_t
- *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
- pubsub_subscriber_t *svc;
+ hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
+ hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_tcp_subscriber_entry_t;
@@ -308,6 +307,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
@@ -435,6 +435,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
pubsub_tcp_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL) {
if (subScope != NULL) {
@@ -450,15 +451,16 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
if (entry != NULL) {
- entry->usageCount += 1;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
} else {
//new create entry
entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->svc = svc;
+ entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
receiver->subscribers.allInitialized = false;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
+
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
&entry->msgTypes);
@@ -489,13 +491,15 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
pubsub_tcp_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
if (entry != NULL) {
- entry->usageCount -= 1;
+ hashMap_remove(entry->subscriberServices, (void*)svcId);
}
- if (entry != NULL && entry->usageCount <= 0) {
+ if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void *) bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -510,6 +514,7 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
hashMap_destroy(origins, true, true);
}
hashMap_destroy(entry->metrics, false, false);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -520,7 +525,6 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime) {
//NOTE receiver->subscribers.mutex locked
pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) (message->header.msgId));
- pubsub_subscriber_t *svc = entry->svc;
bool monitor = receiver->metricsEnabled;
//monitoring
@@ -550,14 +554,19 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
}
if (status == CELIX_SUCCESS) {
- bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
- &release);
- if (release) {
- msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ bool release = true;
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
+ &release);
+ if (release) {
+ msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+ }
+ if (message->metadata.metadata) {
+ celix_properties_destroy(message->metadata.metadata);
+ }
}
- if (message->metadata.metadata)
- celix_properties_destroy(message->metadata.metadata);
updateReceiveCount += 1;
} else {
updateSerError += 1;
@@ -755,15 +764,20 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- int rc = 0;
- if (entry->svc != NULL && entry->svc->init != NULL) {
- rc = entry->svc->init(entry->svc->handle);
- }
- if (rc == 0) {
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ int rc = 0;
+ if (svc != NULL && svc->init != NULL) {
+ rc = svc->init(svc->handle);
+ }
+ if (rc == 0) {
+ //note now only initialized on first subscriber entries added.
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
}
}
}
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index 3a93c78..10a76a2 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -33,6 +33,7 @@
#include <pubsub_endpoint.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
+#include <celix_api.h>
#include "pubsub_udpmc_topic_receiver.h"
#include "pubsub_psa_udpmc_constants.h"
#include "large_udp.h"
@@ -94,10 +95,8 @@ typedef struct psa_udpmc_requested_connection_entry {
} psa_udpmc_requested_connection_entry_t;
typedef struct psa_udpmc_subscriber_entry {
- int usageCount;
hash_map_t *msgTypes; //map from serializer svc
- pubsub_subscriber_t *svc;
-
+ hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_udpmc_subscriber_entry_t;
@@ -235,6 +234,7 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver)
if (receiver->serializer != NULL && entry->msgTypes != NULL) {
receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
}
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
}
@@ -328,6 +328,7 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
pubsub_udpmc_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL) {
if (subScope != NULL) {
@@ -343,14 +344,14 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount += 1;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
} else {
//new create entry
entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->svc = svc;
+ entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
receiver->subscribers.allInitialized = false;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
if (rc == 0) {
@@ -367,19 +368,21 @@ static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc,
pubsub_udpmc_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount -= 1;
+ hashMap_remove(entry->subscriberServices, (void*)svcId);
}
- if (entry != NULL && entry->usageCount <= 0) {
+ if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void*)bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
if (rc != 0) {
fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -479,12 +482,14 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &msgInst);
if (status == CELIX_SUCCESS) {
- bool release = true;
- pubsub_subscriber_t *svc = entry->svc;
- svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
-
- if (release) {
- msgSer->freeDeserializeMsg(msgSer->handle, msgInst);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ bool release = true;
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
+ if (release) {
+ msgSer->freeDeserializeMsg(msgSer->handle, msgInst);
+ }
}
} else {
printf("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
@@ -589,15 +594,21 @@ static void psa_udpmc_initializeAllSubscribers(pubsub_udpmc_topic_receiver_t *re
while (hashMapIterator_hasNext(&iter)) {
psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- int rc = 0;
- if (entry->svc != NULL && entry->svc->init != NULL) {
- rc = entry->svc->init(entry->svc->handle);
- }
- if (rc == 0) {
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
+
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ int rc = 0;
+ if (svc != NULL && svc->init != NULL) {
+ rc = svc->init(svc->handle);
+ }
+ if (rc == 0) {
+ //note now only initialized on first subscriber entries added.
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
}
}
}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index 375177a..0c991c5 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -34,6 +34,7 @@
#include <uuid/uuid.h>
#include <http_admin/api.h>
#include <jansson.h>
+#include <celix_api.h>
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
@@ -107,9 +108,8 @@ typedef struct psa_websocket_requested_connection_entry {
} psa_websocket_requested_connection_entry_t;
typedef struct psa_websocket_subscriber_entry {
- int usageCount;
hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
- pubsub_subscriber_t *svc;
+ hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_websocket_subscriber_entry_t;
@@ -263,6 +263,7 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
@@ -394,6 +395,7 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
pubsub_websocket_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL){
if (subScope != NULL){
@@ -409,13 +411,13 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount += 1;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
} else {
//new create entry
entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->svc = svc;
+ entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
@@ -433,19 +435,22 @@ static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
pubsub_websocket_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount -= 1;
+ hashMap_remove(entry->subscriberServices, (void*)svcId);
}
- if (entry != NULL && entry->usageCount <= 0) {
+ if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void*)bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
if (rc != 0) {
L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -472,7 +477,6 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
//NOTE receiver->subscribers.mutex locked
void *msgTypeId = psa_websocket_getMsgTypeIdFromFqn(hdr->id, entry->msgTypes);
pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, msgTypeId);
- pubsub_subscriber_t *svc = entry->svc;
if (msgSer!= NULL && msgTypeId != 0) {
void *deSerializedMsg = NULL;
@@ -484,10 +488,14 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
if (status == CELIX_SUCCESS) {
- bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
- if (release) {
- msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ bool release = true;
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
+ if (release) {
+ msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+ }
}
} else {
L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
@@ -727,15 +735,20 @@ static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiv
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- int rc = 0;
- if (entry->svc != NULL && entry->svc->init != NULL) {
- rc = entry->svc->init(entry->svc->handle);
- }
- if (rc == 0) {
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ int rc = 0;
+ if (svc != NULL && svc->init != NULL) {
+ rc = svc->init(svc->handle);
+ }
+ if (rc == 0) {
+ //note now only initialized on first subscriber entries added.
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
}
}
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 0ec9d7a..af9a2b7 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -36,6 +36,7 @@
#include <uuid/uuid.h>
#include <pubsub_admin_metrics.h>
+#include <celix_api.h>
#include "pubsub_interceptors_handler.h"
@@ -118,10 +119,9 @@ typedef struct psa_zmq_subscriber_metrics_entry_t {
} psa_zmq_subscriber_metrics_entry_t;
typedef struct psa_zmq_subscriber_entry {
- int usageCount;
hash_map_t *msgTypes; //map from serializer svc
hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_zmq_subscriber_metrics_entry_t*
- pubsub_subscriber_t *svc;
+ hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_zmq_subscriber_entry_t;
@@ -295,6 +295,7 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
@@ -412,6 +413,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
pubsub_zmq_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL){
if (subScope != NULL){
@@ -427,13 +429,13 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount += 1;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
} else {
//new create entry
entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->svc = svc;
+ entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
@@ -461,13 +463,14 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
pubsub_zmq_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount -= 1;
+ hashMap_remove(entry->subscriberServices, (void*)svcId);
}
- if (entry != NULL && entry->usageCount <= 0) {
+ if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void*)bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -480,6 +483,7 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
hashMap_destroy(origins, true, true);
}
hashMap_destroy(entry->metrics, false, false);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -488,7 +492,6 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
//NOTE receiver->subscribers.mutex locked
pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(message->header.msgId));
- pubsub_subscriber_t *svc = entry->svc;
bool monitor = receiver->metricsEnabled;
//monitoring
@@ -518,15 +521,17 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
celix_properties_t *metadata = message->metadata.metadata;
bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, &metadata);
if (cont) {
- bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
- metadata, &release);
- if (release) {
- msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ bool release = true;
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
+ metadata, &release);
+ if (release) {
+ msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
+ }
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
}
-
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
-
updateReceiveCount += 1;
}
} else {
@@ -778,15 +783,20 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- int rc = 0;
- if (entry->svc != NULL && entry->svc->init != NULL) {
- rc = entry->svc->init(entry->svc->handle);
- }
- if (rc == 0) {
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ int rc = 0;
+ if (svc != NULL && svc->init != NULL) {
+ rc = svc->init(svc->handle);
+ }
+ if (rc == 0) {
+ //note now only initialized on first subscriber entries added.
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
}
}
}