You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/06/28 19:42:57 UTC
[celix] 06/07: Refactors v2 of pubsub tcp and zmq to only call
interceptors callback once per receive. renames old psa to _v1.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 461a2cea194c0bc203e03af38ce21a9a15d48182
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Sun Jun 27 22:36:30 2021 +0200
Refactors v2 of pubsub tcp and zmq to only call interceptors callback once per receive. renames old psa to _v1.
---
bundles/pubsub/CMakeLists.txt | 9 +-
bundles/pubsub/integration/CMakeLists.txt | 4 +-
bundles/pubsub/integration/src/tst_activator.c | 6 +-
bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt | 22 +--
.../v1/src/pubsub_tcp_topic_receiver.c | 6 +-
.../v2/src/pubsub_tcp_topic_receiver.c | 183 +++++++++------------
.../pubsub_admin_websocket/v1/CMakeLists.txt | 20 ++-
.../v1/src/pubsub_websocket_topic_receiver.c | 5 +-
bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt | 20 ++-
.../v1/src/pubsub_zmq_topic_receiver.c | 59 +------
.../v2/src/pubsub_zmq_topic_receiver.c | 172 +++++++++----------
.../include/pubsub_interceptors_handler.h | 2 +-
.../pubsub_spi/src/pubsub_interceptors_handler.c | 41 ++---
13 files changed, 232 insertions(+), 317 deletions(-)
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 9285195..482ee3a 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -21,14 +21,16 @@ if (PUBSUB)
option(BUILD_PUBSUB_PSA_ZMQ "Build ZeroMQ PubSub Admin (LGPL License)" ON)
if (BUILD_PUBSUB_PSA_ZMQ)
option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF)
- add_subdirectory(pubsub_admin_zmq/v1)
+ add_subdirectory(pubsub_admin_zmq/v1) #TODO option for v1 admins
add_subdirectory(pubsub_admin_zmq/v2)
+ add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq_v1) #TODO move to config and set to v2
endif (BUILD_PUBSUB_PSA_ZMQ)
option(BUILD_PUBSUB_PSA_TCP "Build TCP PubSub Admin" ON)
if (BUILD_PUBSUB_PSA_TCP)
- add_subdirectory(pubsub_admin_tcp/v1)
+ add_subdirectory(pubsub_admin_tcp/v1) #TODO option for v1 admins
add_subdirectory(pubsub_admin_tcp/v2)
+ add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp_v1) #TODO move to config and set to v2
endif (BUILD_PUBSUB_PSA_TCP)
option(BUILD_PUBSUB_PSA_UDP_MC "Build UDP MC PubSub Admin" ON)
@@ -38,8 +40,9 @@ if (PUBSUB)
option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
if (BUILD_PUBSUB_PSA_WS)
- add_subdirectory(pubsub_admin_websocket/v1)
+ add_subdirectory(pubsub_admin_websocket/v1) #TODO option for v1 admins
add_subdirectory(pubsub_admin_websocket/v2)
+ add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket_v1) #TODO move to config and set to v2
endif (BUILD_PUBSUB_PSA_WS)
add_subdirectory(pubsub_api)
diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index a134b74..9c9ce54 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -793,11 +793,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
#configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles
celix_get_bundle_file(Celix::pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE)
celix_get_bundle_file(Celix::pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE)
- celix_get_bundle_file(Celix::pubsub_admin_zmq PUBSUB_ZMQ_BUNDLE_FILE)
+ celix_get_bundle_file(Celix::pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE)
celix_get_bundle_file(Celix::pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE)
celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE)
celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE)
- add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
+ add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq_v2 Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
target_compile_definitions(test_pubsub_interceptors_integration PRIVATE
PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}"
PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}"
diff --git a/bundles/pubsub/integration/src/tst_activator.c b/bundles/pubsub/integration/src/tst_activator.c
index 8ce13ac..4017326 100644
--- a/bundles/pubsub/integration/src/tst_activator.c
+++ b/bundles/pubsub/integration/src/tst_activator.c
@@ -86,7 +86,7 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
-static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata __attribute__((unused)), bool *release __attribute__((unused))) {
+static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata __attribute__((unused)), bool *release) {
struct activator *act = handle;
msg_t *msg = voidMsg;
@@ -100,6 +100,10 @@ static int tst_receive(void *handle, const char * msgType __attribute__((unused)
pthread_mutex_lock(&act->mutex);
act->count1 += 1;
pthread_mutex_unlock(&act->mutex);
+
+ *release = false;
+ free(voidMsg);
+
return CELIX_SUCCESS;
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
index 0314cb6..2f1bca8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
@@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.
+message(WARNING "PubSub TCP Admin V1 is deprecated, use PubSub TCP Websocket v2 instead")
+
find_package(UUID REQUIRED)
-add_celix_bundle(celix_pubsub_admin_tcp
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp"
+add_celix_bundle(celix_pubsub_admin_tcp_v1
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp_v1"
VERSION "1.0.0"
GROUP "Celix/PubSub"
SOURCES
@@ -30,15 +32,15 @@ add_celix_bundle(celix_pubsub_admin_tcp
src/pubsub_tcp_common.c
)
-set_target_properties(celix_pubsub_admin_tcp PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::framework Celix::dfi Celix::log_helper)
-target_include_directories(celix_pubsub_admin_tcp PRIVATE src)
+set_target_properties(celix_pubsub_admin_tcp_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
+target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::framework Celix::dfi Celix::log_helper)
+target_include_directories(celix_pubsub_admin_tcp_v1 PRIVATE src)
# cmake find package UUID set the wrong include dir for OSX
if (NOT APPLE)
- target_link_libraries(celix_pubsub_admin_tcp PRIVATE UUID::lib)
+ target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE UUID::lib)
endif()
-install_celix_bundle(celix_pubsub_admin_tcp EXPORT celix COMPONENT pubsub)
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp)
+install_celix_bundle(celix_pubsub_admin_tcp_v1 EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_tcp_v1 ALIAS celix_pubsub_admin_tcp_v1)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
index b1b2c58..8acb8e2 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
@@ -560,9 +560,8 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
- if (!release && hashMapIterator_hasNext(&iter)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
+ if (!release) {
+ //receive function has taken ownership, deserialize again for new message
status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
if (status != CELIX_SUCCESS) {
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
@@ -574,6 +573,7 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
release = true;
}
}
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
if (release) {
msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index a49ff23..ad321e8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -79,7 +79,7 @@ struct pubsub_tcp_topic_receiver {
long subscriberTrackerId;
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t
+ hash_map_t *map; //key = long svc id, value = psa_tcp_subscriber_entry_t
bool allInitialized;
} subscribers;
};
@@ -92,12 +92,12 @@ typedef struct psa_tcp_requested_connection_entry {
} psa_tcp_requested_connection_entry_t;
typedef struct psa_tcp_subscriber_entry {
- hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
+ pubsub_subscriber_t* subscriberSvc;
bool initialized; //true if the init function is called through the receive thread
} psa_tcp_subscriber_entry_t;
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props);
static void *psa_tcp_recvThread(void *data);
static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
@@ -229,8 +229,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.filter = buf;
opts.callbackHandle = receiver;
- opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber;
- opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber;
+ opts.addWithProperties = pubsub_tcpTopicReceiver_addSubscriber;
+ opts.removeWithProperties = pubsub_tcpTopicReceiver_removeSubscriber;
receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
@@ -259,19 +259,11 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
celixThreadMutex_lock(&receiver->subscribers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- hashMap_destroy(entry->subscriberServices, false, false);
- free(entry);
- }
- }
- hashMap_destroy(receiver->subscribers.map, false, false);
+ hashMap_destroy(receiver->subscribers.map, false, true);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
@@ -394,11 +386,9 @@ void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receive
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
- const celix_bundle_t *bnd) {
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) {
pubsub_tcp_topic_receiver_t *receiver = handle;
- long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL) {
@@ -415,62 +405,78 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
return;
}
- celixThreadMutex_lock(&receiver->subscribers.mutex);
- psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *)bndId);
- if (entry != NULL) {
- hashMap_put(entry->subscriberServices, (void*)svcId, svc);
- } else {
- //new create entry
- entry = calloc(1, sizeof(*entry));
- entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
- entry->initialized = false;
- receiver->subscribers.allInitialized = false;
+ psa_tcp_subscriber_entry_t *entry = calloc(1, sizeof(*entry));
+ entry->subscriberSvc = svc;
+ entry->initialized = false;
- hashMap_put(entry->subscriberServices, (void*)svcId, svc);
- hashMap_put(receiver->subscribers.map, (void *)bndId, entry);
- }
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+ receiver->subscribers.allInitialized = false;
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props,
- const celix_bundle_t *bnd) {
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
pubsub_tcp_topic_receiver_t *receiver = handle;
- long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
celixThreadMutex_lock(&receiver->subscribers.mutex);
- psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
- if (entry != NULL) {
- hashMap_remove(entry->subscriberServices, (void*)svcId);
- }
- if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
- //remove entry
- hashMap_remove(receiver->subscribers.map, (void *) bndId);
- hashMap_destroy(entry->subscriberServices, false, false);
- free(entry);
+ psa_tcp_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void *)svcId);
+ free(entry);
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) {
+ *release = true;
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_tcp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->subscriberSvc->receive != NULL) {
+ entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release);
+ if (!(*release)) {
+ //receive function has taken ownership, deserialize again for new message
+ struct iovec deSerializeBuffer;
+ deSerializeBuffer.iov_base = message->payload.payload;
+ deSerializeBuffer.iov_len = message->payload.length;
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
+ message->header.msgId,
+ message->header.msgMajorVersion,
+ message->header.msgMinorVersion,
+ &deSerializeBuffer, 0, msg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_TCO_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+ receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ break;
+ }
+ }
+ *release = true;
+ }
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static inline void
-processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
- const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
- //NOTE receiver->subscribers.mutex locked
+static inline void processMsg(void* handle, const pubsub_protocol_message_t *message, bool* releaseMsg, struct timespec *receiveTime) {
+ pubsub_tcp_topic_receiver_t *receiver = handle;
- const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+ const char *msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
if (msgFqn == NULL) {
L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
return;
}
-
void *deSerializedMsg = NULL;
- bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId,
+ message->header.msgMajorVersion,
+ message->header.msgMinorVersion);
if (validVersion) {
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = message->payload.payload;
deSerializeBuffer.iov_len = message->payload.length;
- celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId,
+ message->header.msgMajorVersion,
+ message->header.msgMinorVersion,
+ &deSerializeBuffer, 0, &deSerializedMsg);
+
// When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver
if (message->payload.payload == deSerializedMsg) {
*releaseMsg = true;
@@ -479,33 +485,15 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
if (status == CELIX_SUCCESS) {
uint32_t msgId = message->header.msgId;
celix_properties_t *metadata = message->metadata.metadata;
- bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata);
- bool release = true;
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId,
+ deSerializedMsg, &metadata);
if (cont) {
- hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
- if (!release && hashMapIterator_hasNext(&iter)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
- status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
- if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
- msgFqn,
- receiver->scope == NULL ? "(null)" : receiver->scope,
- receiver->topic);
- break;
- }
- release = true;
- }
- }
+ bool release;
+ callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
if (release) {
- pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
- }
- if (message->metadata.metadata) {
- celix_properties_destroy(message->metadata.metadata);
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId,
+ deSerializedMsg);
}
}
} else {
@@ -516,27 +504,13 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
msgFqn,
pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
- (int)message->header.msgMajorVersion,
- (int)message->header.msgMinorVersion,
+ (int) message->header.msgMajorVersion,
+ (int) message->header.msgMinorVersion,
pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
}
}
-static void
-processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime) {
- pubsub_tcp_topic_receiver_t *receiver = handle;
- celixThreadMutex_lock(&receiver->subscribers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- processMsgForSubscriberEntry(receiver, entry, message, release, receiveTime);
- }
- }
- celixThreadMutex_unlock(&receiver->subscribers.mutex);
-}
-
static void *psa_tcp_recvThread(void *data) {
pubsub_tcp_topic_receiver_t *receiver = data;
@@ -642,20 +616,15 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
- while (hashMapIterator_hasNext(&iter2)) {
- pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
- int rc = 0;
- if (svc != NULL && svc->init != NULL) {
- rc = svc->init(svc->handle);
- }
- if (rc == 0) {
- //note now only initialized on first subscriber entries added.
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
- }
+ int rc = 0;
+ if (entry->subscriberSvc->init != NULL) {
+ rc = entry->subscriberSvc->init(entry->subscriberSvc->handle);
+ }
+ if (rc == 0) {
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
}
}
}
diff --git a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
index f6397e0..021310f 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
@@ -15,11 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+message(WARNING "PubSub Websocket Admin V1 is deprecated, use PubSub ZMQ Websocket v2 instead")
+
find_package(Jansson REQUIRED)
find_package(UUID REQUIRED)
-add_celix_bundle(celix_pubsub_admin_websocket
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket"
+add_celix_bundle(celix_pubsub_admin_websocket_v1
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket_v1"
VERSION "1.0.0"
GROUP "Celix/PubSub"
SOURCES
@@ -30,16 +32,16 @@ add_celix_bundle(celix_pubsub_admin_websocket
src/pubsub_websocket_common.c
)
-set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE
+set_target_properties(celix_pubsub_admin_websocket_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE
Celix::framework Celix::dfi Celix::log_helper Celix::utils
Celix::http_admin_api
)
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
-target_include_directories(celix_pubsub_admin_websocket PRIVATE
+target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
+target_include_directories(celix_pubsub_admin_websocket_v1 PRIVATE
src
)
-install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT pubsub)
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket)
+install_celix_bundle(celix_pubsub_admin_websocket_v1 EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_websocket_v1 ALIAS celix_pubsub_admin_websocket_v1)
diff --git a/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
index 7d8cd00..7b2cfcc 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
@@ -500,9 +500,8 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
while (hashMapIterator_hasNext(&iter)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
- if (!release && hashMapIterator_hasNext(&iter)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
+ if (!release) {
+ //receive function has taken ownership, deserialize again for new message
status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
if (status != CELIX_SUCCESS) {
L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
index 52723b9..bfd8eef 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+message(WARNING "PubSub ZMQ Admin V1 is deprecated, use PubSub ZMQ Admin v2 instead")
+
find_package(ZMQ REQUIRED)
find_package(CZMQ REQUIRED)
find_package(Jansson REQUIRED)
@@ -31,8 +33,8 @@ if (BUILD_ZMQ_SECURITY)
set (ZMQ_CRYPTO_C "src/zmq_crypto.c")
endif()
-add_celix_bundle(celix_pubsub_admin_zmq
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
+add_celix_bundle(celix_pubsub_admin_zmq_v1
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq_v1"
VERSION "1.1.0"
GROUP "Celix/PubSub"
SOURCES
@@ -43,17 +45,17 @@ add_celix_bundle(celix_pubsub_admin_zmq
${ZMQ_CRYPTO_C}
)
-set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(celix_pubsub_admin_zmq PRIVATE
+set_target_properties(celix_pubsub_admin_zmq_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE
Celix::framework Celix::dfi Celix::log_helper Celix::utils
ZMQ::lib CZMQ::lib ${OPTIONAL_OPENSSL_LIB}
)
-target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
-target_include_directories(celix_pubsub_admin_zmq PRIVATE
+target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
+target_include_directories(celix_pubsub_admin_zmq_v1 PRIVATE
src
)
-install_celix_bundle(celix_pubsub_admin_zmq EXPORT celix COMPONENT pubsub)
-target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq)
+install_celix_bundle(celix_pubsub_admin_zmq_v1 EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE Celix::shell_api)
+add_library(Celix::celix_pubsub_admin_zmq_v1 ALIAS celix_pubsub_admin_zmq_v1)
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
index 11f2b8f..9cd99f4 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
@@ -533,10 +533,8 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
while (hashMapIterator_hasNext(&iter2)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
- if (!release && hashMapIterator_hasNext(&iter2)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
+ if (!release) {
+ //receive function has taken ownership deserialize again for new message
status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg);
if (status != CELIX_SUCCESS) {
L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
@@ -544,6 +542,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
}
release = true;
}
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
}
if (release) {
msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
@@ -558,58 +557,6 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
} else {
L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId);
}
-
- if (msgSer != NULL && monitor) {
- // TODO disabled for now, should move to an interceptor?
-// hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )message->header.msgId);
-// char uuidStr[UUID_STR_LEN+1];
-// uuid_unparse(hdr->originUUID, uuidStr);
-// psa_zmq_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
-//
-// if (metrics == NULL) {
-// metrics = calloc(1, sizeof(*metrics));
-// hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
-// uuid_copy(metrics->origin, hdr->originUUID);
-// metrics->msgTypeId = hdr->type;
-// metrics->maxDelayInSeconds = -INFINITY;
-// metrics->minDelayInSeconds = INFINITY;
-// metrics->lastSeqNr = 0;
-// }
-//
-// double diff = celix_difftime(&beginSer, &endSer);
-// long n = metrics->nrOfMessagesReceived;
-// metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
-//
-// diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
-// n = metrics->nrOfMessagesReceived;
-// if (metrics->nrOfMessagesReceived >= 1) {
-// metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
-// }
-// metrics->lastMessageReceived = *receiveTime;
-//
-//
-// int incr = hdr->seqNr - metrics->lastSeqNr;
-// if (metrics->lastSeqNr >0 && incr > 1) {
-// metrics->nrOfMissingSeqNumbers += (incr - 1);
-// L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
-// }
-// metrics->lastSeqNr = hdr->seqNr;
-//
-// struct timespec sendTime;
-// sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
-// sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
-// diff = celix_difftime(&sendTime, receiveTime);
-// metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1);
-// if (diff < metrics->minDelayInSeconds) {
-// metrics->minDelayInSeconds = diff;
-// }
-// if (diff > metrics->maxDelayInSeconds) {
-// metrics->maxDelayInSeconds = diff;
-// }
-//
-// metrics->nrOfMessagesReceived += updateReceiveCount;
-// metrics->nrOfSerializationErrors += updateSerError;
- }
}
static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 90e9510..0fc2bc2 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -93,7 +93,7 @@ struct pubsub_zmq_topic_receiver {
long subscriberTrackerId;
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ hash_map_t *map; //key = long svc id, value = psa_zmq_subscriber_entry_t
bool allInitialized;
} subscribers;
};
@@ -105,13 +105,13 @@ typedef struct psa_zmq_requested_connection_entry {
} psa_zmq_requested_connection_entry_t;
typedef struct psa_zmq_subscriber_entry {
- hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
+ pubsub_subscriber_t* subscriberSvc;
bool initialized; //true if the init function is called through the receive thread
} psa_zmq_subscriber_entry_t;
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
-static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props);
static void* psa_zmq_recvThread(void * data);
static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver);
static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
@@ -237,8 +237,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.filter = buf;
opts.callbackHandle = receiver;
- opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
- opts.removeWithOwner = pubsub_zmqTopicReceiver_removeSubscriber;
+ opts.addWithProperties = pubsub_zmqTopicReceiver_addSubscriber;
+ opts.removeWithProperties = pubsub_zmqTopicReceiver_removeSubscriber;
receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
@@ -275,19 +275,11 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
celixThreadMutex_lock(&receiver->subscribers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- hashMap_destroy(entry->subscriberServices, false, false);
- free(entry);
- }
- }
- hashMap_destroy(receiver->subscribers.map, false, false);
+ hashMap_destroy(receiver->subscribers.map, false, true);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
@@ -384,10 +376,9 @@ void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receive
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) {
pubsub_zmq_topic_receiver_t *receiver = handle;
- long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL) {
@@ -404,107 +395,104 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
return;
}
+ psa_zmq_subscriber_entry_t *entry = calloc(1, sizeof(*entry));
+ entry->subscriberSvc = svc;
+ entry->initialized = false;
+
celixThreadMutex_lock(&receiver->subscribers.mutex);
- psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
- if (entry != NULL) {
- hashMap_put(entry->subscriberServices, (void*)svcId, svc);
- } else {
- //new create entry
- entry = calloc(1, sizeof(*entry));
- entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
- entry->initialized = false;
- hashMap_put(entry->subscriberServices, (void*)svcId, svc);
- hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
- }
+ hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+ receiver->subscribers.allInitialized = false;
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props) {
pubsub_zmq_topic_receiver_t *receiver = handle;
- long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
celixThreadMutex_lock(&receiver->subscribers.mutex);
- psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
- if (entry != NULL) {
- hashMap_remove(entry->subscriberServices, (void*)svcId);
- }
- if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
- //remove entry
- hashMap_remove(receiver->subscribers.map, (void*)bndId);
- hashMap_destroy(entry->subscriberServices, false, false);
- free(entry);
+ psa_zmq_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void*)svcId);
+ free(entry);
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void callReceivers(pubsub_zmq_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) {
+ *release = true;
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_zmq_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->subscriberSvc->receive != NULL) {
+ entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release);
+ if (!(*release)) {
+ //receive function has taken ownership, deserialize again for new message
+ struct iovec deSerializeBuffer;
+ deSerializeBuffer.iov_base = message->payload.payload;
+ deSerializeBuffer.iov_len = message->payload.length;
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
+ message->header.msgId,
+ message->header.msgMajorVersion,
+ message->header.msgMinorVersion,
+ &deSerializeBuffer, 0, msg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+ receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ break;
+ }
+ }
+ *release = true;
+ }
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
- const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
+ const char *msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
if (msgFqn == NULL) {
L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
return;
}
-
void *deserializedMsg = NULL;
- bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId,
+ message->header.msgMajorVersion,
+ message->header.msgMinorVersion);
if (validVersion) {
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = message->payload.payload;
- deSerializeBuffer.iov_len = message->payload.length;
- celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+ deSerializeBuffer.iov_len = message->payload.length;
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId,
+ message->header.msgMajorVersion,
+ message->header.msgMinorVersion,
+ &deSerializeBuffer, 0, &deserializedMsg);
if (status == CELIX_SUCCESS) {
uint32_t msgId = message->header.msgId;
celix_properties_t *metadata = message->metadata.metadata;
- bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
- bool release = true;
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId,
+ deserializedMsg, &metadata);
if (cont) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
- while (hashMapIterator_hasNext(&iter2)) {
- pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
- svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
- if (!release && hashMapIterator_hasNext(&iter2)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
- status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
- if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
- break;
- }
- release = true;
- }
- }
+ bool release;
+ callReceivers(receiver, msgFqn, message, &deserializedMsg, &release, metadata);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
if (release) {
- pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId,
+ deserializedMsg);
}
}
} else {
- L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+ receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
} else {
L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
msgFqn,
pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
- (int)message->header.msgMajorVersion,
- (int)message->header.msgMinorVersion,
+ (int) message->header.msgMajorVersion,
+ (int) message->header.msgMinorVersion,
pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
}
}
-static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
- celixThreadMutex_lock(&receiver->subscribers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- processMsgForSubscriberEntry(receiver, entry, message, receiveTime);
- }
- }
- celixThreadMutex_unlock(&receiver->subscribers.mutex);
-}
-
static void* psa_zmq_recvThread(void * data) {
pubsub_zmq_topic_receiver_t *receiver = data;
@@ -627,20 +615,16 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
- while (hashMapIterator_hasNext(&iter2)) {
- pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
- int rc = 0;
- if (svc != NULL && svc->init != NULL) {
- rc = svc->init(svc->handle);
- }
- if (rc == 0) {
- //note now only initialized on first subscriber entries added.
- entry->initialized = true;
- } else {
- L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
- allInitialized = false;
- }
+ int rc = 0;
+ if (entry->subscriberSvc != NULL && entry->subscriberSvc->init != NULL) {
+ rc = entry->subscriberSvc->init(entry->subscriberSvc->handle);
+ }
+ if (rc == 0) {
+ //note now only initialized on first subscriber entries added.
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
}
}
}
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
index 801fd35..6595853 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -37,7 +37,7 @@ celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *
bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
#ifdef __cplusplus
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index 331a41d..6118fbe 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -18,7 +18,8 @@
*/
#include "celix_bundle_context.h"
#include "celix_constants.h"
-#include "utils.h"
+#include "celix_array_list.h"
+#include "celix_utils.h"
#include "pubsub_interceptors_handler.h"
@@ -91,8 +92,8 @@ void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const cel
celixThreadMutex_lock(&handler->lock);
bool exists = false;
- for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
- entry_t *entry = arrayList_get(handler->interceptors, i);
+ for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+ entry_t *entry = celix_arrayList_get(handler->interceptors, i);
if (entry->interceptor == svc) {
exists = true;
}
@@ -114,11 +115,11 @@ void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attr
celixThreadMutex_lock(&handler->lock);
- for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
- entry_t *entry = arrayList_get(handler->interceptors, i);
+ for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+ entry_t *entry = celix_arrayList_get(handler->interceptors, i);
if (entry->interceptor == svc) {
- void *old = arrayList_remove(handler->interceptors, i);
- free(old);
+ celix_arrayList_removeAt(handler->interceptors, i);
+ free(entry);
break;
}
}
@@ -131,12 +132,12 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl
celixThreadMutex_lock(&handler->lock);
- if (*metadata == NULL && arrayList_size(handler->interceptors) > 0) {
+ if (*metadata == NULL && celix_arrayList_size(handler->interceptors) > 0) {
*metadata = celix_properties_create();
}
- for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) {
- entry_t *entry = arrayList_get(handler->interceptors, i - 1);
+ for (uint32_t i = celix_arrayList_size(handler->interceptors); i > 0; i--) {
+ entry_t *entry = celix_arrayList_get(handler->interceptors, i - 1);
if (entry->interceptor->preSend != NULL) {
cont = entry->interceptor->preSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata);
}
@@ -153,8 +154,8 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl
void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
celixThreadMutex_lock(&handler->lock);
- for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) {
- entry_t *entry = arrayList_get(handler->interceptors, i - 1);
+ for (uint32_t i = celix_arrayList_size(handler->interceptors); i > 0; i--) {
+ entry_t *entry = celix_arrayList_get(handler->interceptors, i - 1);
if (entry->interceptor->postSend != NULL) {
entry->interceptor->postSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
}
@@ -163,15 +164,17 @@ void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *hand
celixThreadMutex_unlock(&handler->lock);
}
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata) {
bool cont = true;
celixThreadMutex_lock(&handler->lock);
-
- for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
- entry_t *entry = arrayList_get(handler->interceptors, i);
+ if (*metadata == NULL && celix_arrayList_size(handler->interceptors) > 0) {
+ *metadata = celix_properties_create();
+ }
+ for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+ entry_t *entry = celix_arrayList_get(handler->interceptors, i);
if (entry->interceptor->preReceive != NULL) {
- cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
+ cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata);
}
if (!cont) {
break;
@@ -186,8 +189,8 @@ bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *ha
void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
celixThreadMutex_lock(&handler->lock);
- for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
- entry_t *entry = arrayList_get(handler->interceptors, i);
+ for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+ entry_t *entry = celix_arrayList_get(handler->interceptors, i);
if (entry->interceptor->postReceive != NULL) {
entry->interceptor->postReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
}