You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/19 05:18:14 UTC
[celix] branch master updated: Refactors all pubsub admins to
handle multiple subscribers per bundle (#257)
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/master by this push:
new 009b581 Refactors all pubsub admins to handle multiple subscribers per bundle (#257)
009b581 is described below
commit 009b581381790c9bd5d6184620a15d1f5a826e77
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Fri Jun 19 07:18:08 2020 +0200
Refactors all pubsub admins to handle multiple subscribers per bundle (#257)
* Refactors all pubsub admins to support multiple subscriber services with the same topic/scope combination in a single bundle.
* Fixes pubsub receive function so that they can handle multiple subscriber and releases.
---
bundles/pubsub/CMakeLists.txt | 10 +-
.../src/pubsub_tcp_topic_receiver.c | 69 +++++++++-----
.../src/pubsub_udpmc_topic_receiver.c | 67 +++++++++-----
.../src/pubsub_websocket_topic_receiver.c | 59 ++++++++----
.../src/pubsub_zmq_topic_receiver.c | 65 ++++++++-----
bundles/pubsub/test/CMakeLists.txt | 2 +-
bundles/pubsub/test/test/tst_activator.c | 56 ++++++++---
bundles/pubsub/test/test/tst_endpoint_activator.c | 102 ---------------------
8 files changed, 221 insertions(+), 209 deletions(-)
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index f4ac92a..90d762a 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -34,19 +34,11 @@ if (PUBSUB)
add_subdirectory(pubsub_admin_udp_mc)
endif (BUILD_PUBSUB_PSA_UDP_MC)
- set(BUILD_PUBSUB_PSA_WS_DEFAULT ON)
- if (APPLE)
- set(BUILD_PUBSUB_PSA_WS_DEFAULT OFF)
- endif ()
- option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ${BUILD_PUBSUB_PSA_WS_DEFAULT})
+ option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
if (BUILD_PUBSUB_PSA_WS)
add_subdirectory(pubsub_admin_websocket)
- if (APPLE)
- message(WARNING "WebSocket PubSub Admin not supported on OSX, the tests are failing! See issue #161")
- endif ()
endif (BUILD_PUBSUB_PSA_WS)
-
add_subdirectory(pubsub_api)
add_subdirectory(pubsub_utils)
add_subdirectory(pubsub_spi)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index e327662..e9add2d 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -37,6 +37,7 @@
#include "celix_utils_api.h"
#include <uuid/uuid.h>
#include <pubsub_admin_metrics.h>
+#include <celix_api.h>
#define MAX_EPOLL_EVENTS 16
#ifndef UUID_STR_LEN
@@ -110,11 +111,9 @@ typedef struct psa_tcp_subscriber_metrics_entry_t {
} psa_tcp_subscriber_metrics_entry_t;
typedef struct psa_tcp_subscriber_entry {
- int usageCount;
hash_map_t *msgTypes; //map from serializer svc
- hash_map_t
- *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
- pubsub_subscriber_t *svc;
+ hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
+ hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_tcp_subscriber_entry_t;
@@ -308,6 +307,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
@@ -435,6 +435,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
pubsub_tcp_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL) {
if (subScope != NULL) {
@@ -450,15 +451,16 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
if (entry != NULL) {
- entry->usageCount += 1;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
} else {
//new create entry
entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->svc = svc;
+ entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
receiver->subscribers.allInitialized = false;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
+
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
&entry->msgTypes);
@@ -489,13 +491,15 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
pubsub_tcp_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
if (entry != NULL) {
- entry->usageCount -= 1;
+ hashMap_remove(entry->subscriberServices, (void*)svcId);
}
- if (entry != NULL && entry->usageCount <= 0) {
+ if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void *) bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -510,6 +514,7 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
hashMap_destroy(origins, true, true);
}
hashMap_destroy(entry->metrics, false, false);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -520,7 +525,6 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime) {
//NOTE receiver->subscribers.mutex locked
pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) (message->header.msgId));
- pubsub_subscriber_t *svc = entry->svc;
bool monitor = receiver->metricsEnabled;
//monitoring
@@ -550,14 +554,30 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
}
if (status == CELIX_SUCCESS) {
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
- &release);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
+ &release);
+ if (!release && hashMapIterator_hasNext(&iter)) {
+ //receive function has taken ownership and still more receive function to come ..
+ //deserialize again for new message
+ status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
+ receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ break;
+ }
+ release = true;
+ }
+ }
if (release) {
msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
}
- if (message->metadata.metadata)
+ if (message->metadata.metadata) {
celix_properties_destroy(message->metadata.metadata);
+ }
updateReceiveCount += 1;
} else {
updateSerError += 1;
@@ -755,15 +775,20 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- int rc = 0;
- if (entry->svc != NULL && entry->svc->init != NULL) {
- rc = entry->svc->init(entry->svc->handle);
- }
- if (rc == 0) {
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ int rc = 0;
+ if (svc != NULL && svc->init != NULL) {
+ rc = svc->init(svc->handle);
+ }
+ if (rc == 0) {
+ //note now only initialized on first subscriber entries added.
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
}
}
}
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index 3a93c78..2827e53 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -33,6 +33,7 @@
#include <pubsub_endpoint.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
+#include <celix_api.h>
#include "pubsub_udpmc_topic_receiver.h"
#include "pubsub_psa_udpmc_constants.h"
#include "large_udp.h"
@@ -94,10 +95,8 @@ typedef struct psa_udpmc_requested_connection_entry {
} psa_udpmc_requested_connection_entry_t;
typedef struct psa_udpmc_subscriber_entry {
- int usageCount;
hash_map_t *msgTypes; //map from serializer svc
- pubsub_subscriber_t *svc;
-
+ hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_udpmc_subscriber_entry_t;
@@ -235,6 +234,7 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver)
if (receiver->serializer != NULL && entry->msgTypes != NULL) {
receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
}
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
}
@@ -328,6 +328,7 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
pubsub_udpmc_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL) {
if (subScope != NULL) {
@@ -343,14 +344,14 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount += 1;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
} else {
//new create entry
entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->svc = svc;
+ entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
receiver->subscribers.allInitialized = false;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
if (rc == 0) {
@@ -367,19 +368,21 @@ static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc,
pubsub_udpmc_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount -= 1;
+ hashMap_remove(entry->subscriberServices, (void*)svcId);
}
- if (entry != NULL && entry->usageCount <= 0) {
+ if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void*)bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
if (rc != 0) {
fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -467,7 +470,7 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) msg->header.type);
}
if (msgSer == NULL) {
- printf("[PSA_UDPMC] Serializer not available for message %d.\n", msg->header.type);
+ L_WARN("[PSA_UDPMC] Serializer not available for message %d.\n", msg->header.type);
} else {
void *msgInst = NULL;
bool validVersion = psa_udpmc_checkVersion(msgSer->msgVersion, &msg->header);
@@ -479,22 +482,34 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &msgInst);
if (status == CELIX_SUCCESS) {
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
bool release = true;
- pubsub_subscriber_t *svc = entry->svc;
- svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
-
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
+ if (!release && hashMapIterator_hasNext(&iter2)) {
+ //receive function has taken ownership and still more receive function to come ..
+ //deserialize again for new message
+ status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &msgInst);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
+ break;
+ }
+ release = true;
+ }
+ }
if (release) {
msgSer->freeDeserializeMsg(msgSer->handle, msgInst);
}
} else {
- printf("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
+ L_WARN("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
}
} else {
int major = 0, minor = 0;
version_getMajor(msgSer->msgVersion, &major);
version_getMinor(msgSer->msgVersion, &minor);
- printf("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+ L_WARN("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
}
@@ -589,15 +604,21 @@ static void psa_udpmc_initializeAllSubscribers(pubsub_udpmc_topic_receiver_t *re
while (hashMapIterator_hasNext(&iter)) {
psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- int rc = 0;
- if (entry->svc != NULL && entry->svc->init != NULL) {
- rc = entry->svc->init(entry->svc->handle);
- }
- if (rc == 0) {
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
+
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ int rc = 0;
+ if (svc != NULL && svc->init != NULL) {
+ rc = svc->init(svc->handle);
+ }
+ if (rc == 0) {
+ //note now only initialized on first subscriber entries added.
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
}
}
}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index 375177a..45da454 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -34,6 +34,7 @@
#include <uuid/uuid.h>
#include <http_admin/api.h>
#include <jansson.h>
+#include <celix_api.h>
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
@@ -107,9 +108,8 @@ typedef struct psa_websocket_requested_connection_entry {
} psa_websocket_requested_connection_entry_t;
typedef struct psa_websocket_subscriber_entry {
- int usageCount;
hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
- pubsub_subscriber_t *svc;
+ hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_websocket_subscriber_entry_t;
@@ -263,6 +263,7 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
@@ -394,6 +395,7 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
pubsub_websocket_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL){
if (subScope != NULL){
@@ -409,13 +411,13 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount += 1;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
} else {
//new create entry
entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->svc = svc;
+ entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
@@ -433,19 +435,22 @@ static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
pubsub_websocket_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount -= 1;
+ hashMap_remove(entry->subscriberServices, (void*)svcId);
}
- if (entry != NULL && entry->usageCount <= 0) {
+ if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void*)bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
if (rc != 0) {
L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -472,7 +477,6 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
//NOTE receiver->subscribers.mutex locked
void *msgTypeId = psa_websocket_getMsgTypeIdFromFqn(hdr->id, entry->msgTypes);
pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, msgTypeId);
- pubsub_subscriber_t *svc = entry->svc;
if (msgSer!= NULL && msgTypeId != 0) {
void *deSerializedMsg = NULL;
@@ -484,8 +488,22 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
if (status == CELIX_SUCCESS) {
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
+ if (!release && hashMapIterator_hasNext(&iter)) {
+ //receive function has taken ownership and still more receive function to come ..
+ //deserialize again for new message
+ status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ break;
+ }
+ release = true;
+ }
+ }
if (release) {
msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
}
@@ -727,15 +745,20 @@ static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiv
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- int rc = 0;
- if (entry->svc != NULL && entry->svc->init != NULL) {
- rc = entry->svc->init(entry->svc->handle);
- }
- if (rc == 0) {
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ int rc = 0;
+ if (svc != NULL && svc->init != NULL) {
+ rc = svc->init(svc->handle);
+ }
+ if (rc == 0) {
+ //note now only initialized on first subscriber entries added.
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
}
}
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 0ec9d7a..7e0fff3 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -36,6 +36,7 @@
#include <uuid/uuid.h>
#include <pubsub_admin_metrics.h>
+#include <celix_api.h>
#include "pubsub_interceptors_handler.h"
@@ -118,10 +119,9 @@ typedef struct psa_zmq_subscriber_metrics_entry_t {
} psa_zmq_subscriber_metrics_entry_t;
typedef struct psa_zmq_subscriber_entry {
- int usageCount;
hash_map_t *msgTypes; //map from serializer svc
hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_zmq_subscriber_metrics_entry_t*
- pubsub_subscriber_t *svc;
+ hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_zmq_subscriber_entry_t;
@@ -295,6 +295,7 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
@@ -412,6 +413,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
pubsub_zmq_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL){
if (subScope != NULL){
@@ -427,13 +429,13 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount += 1;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
} else {
//new create entry
entry = calloc(1, sizeof(*entry));
- entry->usageCount = 1;
- entry->svc = svc;
+ entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
@@ -461,13 +463,14 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
pubsub_zmq_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
if (entry != NULL) {
- entry->usageCount -= 1;
+ hashMap_remove(entry->subscriberServices, (void*)svcId);
}
- if (entry != NULL && entry->usageCount <= 0) {
+ if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void*)bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -480,6 +483,7 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
hashMap_destroy(origins, true, true);
}
hashMap_destroy(entry->metrics, false, false);
+ hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -488,7 +492,6 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
//NOTE receiver->subscribers.mutex locked
pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(message->header.msgId));
- pubsub_subscriber_t *svc = entry->svc;
bool monitor = receiver->metricsEnabled;
//monitoring
@@ -517,16 +520,27 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
uint32_t msgId = message->header.msgId;
celix_properties_t *metadata = message->metadata.metadata;
bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, &metadata);
+ bool release = true;
if (cont) {
- bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
- metadata, &release);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
+ if (!release && hashMapIterator_hasNext(&iter2)) {
+ //receive function has taken ownership and still more receive function to come ..
+ //deserialize again for new message
+ status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ break;
+ }
+ release = true;
+ }
+ }
if (release) {
msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
}
-
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
-
updateReceiveCount += 1;
}
} else {
@@ -778,15 +792,20 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- int rc = 0;
- if (entry->svc != NULL && entry->svc->init != NULL) {
- rc = entry->svc->init(entry->svc->handle);
- }
- if (rc == 0) {
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ int rc = 0;
+ if (svc != NULL && svc->init != NULL) {
+ rc = svc->init(svc->handle);
+ }
+ if (rc == 0) {
+ //note now only initialized on first subscriber entries added.
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
}
}
}
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index 9282aca..ef73910 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -38,7 +38,7 @@ celix_bundle_files(pubsub_endpoint_sut
add_celix_bundle(pubsub_endpoint_tst
#Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
SOURCES
- test/tst_endpoint_activator.c
+ test/tst_activator.c
VERSION 1.0.0
)
target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
diff --git a/bundles/pubsub/test/test/tst_activator.c b/bundles/pubsub/test/test/tst_activator.c
index b1df062..c930a9f 100644
--- a/bundles/pubsub/test/test/tst_activator.c
+++ b/bundles/pubsub/test/test/tst_activator.c
@@ -29,17 +29,22 @@
#include "receive_count_service.h"
static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, const celix_properties_t *metadata, bool *release);
+static int tst_receive2(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, const celix_properties_t *metadata, bool *release);
static size_t tst_count(void *handle);
struct activator {
- pubsub_subscriber_t subSvc;
- long subSvcId;
+ pubsub_subscriber_t subSvc1;
+ long subSvcId1;
+
+ pubsub_subscriber_t subSvc2;
+ long subSvcId2;
celix_receive_count_service_t countSvc;
long countSvcId;
pthread_mutex_t mutex;
- unsigned int count;
+ unsigned int count1;
+ unsigned int count2;
};
celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
@@ -48,9 +53,17 @@ celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
{
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping");
- act->subSvc.handle = act;
- act->subSvc.receive = tst_receive;
- act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
+ act->subSvc1.handle = act;
+ act->subSvc1.receive = tst_receive;
+ act->subSvcId1 = celix_bundleContext_registerService(ctx, &act->subSvc1, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
+ }
+
+ {
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping");
+ act->subSvc2.handle = act;
+ act->subSvc2.receive = tst_receive2;
+ act->subSvcId2 = celix_bundleContext_registerService(ctx, &act->subSvc2, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
}
{
@@ -63,7 +76,8 @@ celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
}
celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
- celix_bundleContext_unregisterService(ctx, act->subSvcId);
+ celix_bundleContext_unregisterService(ctx, act->subSvcId1);
+ celix_bundleContext_unregisterService(ctx, act->subSvcId2);
celix_bundleContext_unregisterService(ctx, act->countSvcId);
pthread_mutex_destroy(&act->mutex);
return CELIX_SUCCESS;
@@ -84,16 +98,36 @@ static int tst_receive(void *handle, const char * msgType __attribute__((unused)
prevSeqNr = msg->seqNr;
pthread_mutex_lock(&act->mutex);
- act->count += 1;
+ act->count1 += 1;
+ pthread_mutex_unlock(&act->mutex);
+ return CELIX_SUCCESS;
+}
+
+static int tst_receive2(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata __attribute__((unused)), bool *release __attribute__((unused))) {
+ struct activator *act = handle;
+
+ msg_t *msg = voidMsg;
+ static int prevSeqNr = 0;
+ int delta = msg->seqNr - prevSeqNr;
+ if (delta != 1) {
+ fprintf(stderr, "Warning: missing messages. seq jumped from %i to %i\n", prevSeqNr, msg->seqNr);
+ }
+ prevSeqNr = msg->seqNr;
+
+ pthread_mutex_lock(&act->mutex);
+ act->count2 += 1;
pthread_mutex_unlock(&act->mutex);
return CELIX_SUCCESS;
}
static size_t tst_count(void *handle) {
struct activator *act = handle;
- size_t count;
+ size_t count1;
+ size_t count2;
pthread_mutex_lock(&act->mutex);
- count = act->count;
+ count1 = act->count1;
+ count2 = act->count2;
pthread_mutex_unlock(&act->mutex);
- return count;
+ printf("msg count1 is %lu and msg count 2 is %lu\n", count1, count2);
+ return count1 >= count2 ? count1 : count2;
}
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.c b/bundles/pubsub/test/test/tst_endpoint_activator.c
deleted file mode 100644
index 66a9749..0000000
--- a/bundles/pubsub/test/test/tst_endpoint_activator.c
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "celix_api.h"
-#include "pubsub/api.h"
-
-#include "msg.h"
-#include "receive_count_service.h"
-
-
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, const celix_properties_t *metadata, bool *release);
-static size_t tst_count(void *handle);
-
-struct activator {
- pubsub_subscriber_t subSvc;
- long subSvcId;
-
- celix_receive_count_service_t countSvc;
- long countSvcId;
-
- pthread_mutex_t mutex;
- unsigned int count;
-};
-
-celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
- pthread_mutex_init(&act->mutex, NULL);
-
- {
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping2");
- act->subSvc.handle = act;
- act->subSvc.receive = tst_receive;
- act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
- }
-
-
- {
- act->countSvc.handle = act;
- act->countSvc.receiveCount = tst_count;
- act->countSvcId = celix_bundleContext_registerService(ctx, &act->countSvc, CELIX_RECEIVE_COUNT_SERVICE_NAME, NULL);
- }
-
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
- celix_bundleContext_unregisterService(ctx, act->subSvcId);
- celix_bundleContext_unregisterService(ctx, act->countSvcId);
- pthread_mutex_destroy(&act->mutex);
- return CELIX_SUCCESS;
-}
-
-CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
-
-
-static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata __attribute__((unused)), bool *release __attribute__((unused))) {
- struct activator *act = handle;
-
- msg_t *msg = voidMsg;
- static int prevSeqNr = 0;
- int delta = msg->seqNr - prevSeqNr;
- if (delta != 1) {
- fprintf(stderr, "Warning: missing messages. seq jumped from %i to %i\n", prevSeqNr, msg->seqNr);
- }
- prevSeqNr = msg->seqNr;
-
- pthread_mutex_lock(&act->mutex);
- act->count += 1;
- pthread_mutex_unlock(&act->mutex);
- return CELIX_SUCCESS;
-}
-
-static size_t tst_count(void *handle) {
- struct activator *act = handle;
- size_t count;
- pthread_mutex_lock(&act->mutex);
- count = act->count;
- pthread_mutex_unlock(&act->mutex);
- return count;
-}