You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by ab...@apache.org on 2020/03/25 07:11:38 UTC
[celix] 01/01: Added interceptor service and handling for PubSub
ZMQ. Updated arraylist to have a sort function.
This is an automated email from the ASF dual-hosted git repository.
abroekhuis pushed a commit to branch feature/pubsub_inteceptors
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 6025682e24786bdceffe14a4b6e85fc6e3fdc303
Author: Alexander Broekhuis <al...@luminis.eu>
AuthorDate: Wed Mar 25 08:11:26 2020 +0100
Added interceptor service and handling for PubSub ZMQ. Updated arraylist to have a sort function.
---
bundles/pubsub/examples/CMakeLists.txt | 2 +
bundles/pubsub/examples/pubsub/CMakeLists.txt | 1 +
.../pubsub/{ => interceptors}/CMakeLists.txt | 19 +--
.../include/first_interceptor_private.h | 43 +++++
.../include/second_interceptor_private.h | 36 +++++
.../pubsub/interceptors/src/first_interceptor.c | 71 +++++++++
.../interceptors/src/ps_interceptor_activator.c | 94 +++++++++++
.../pubsub/interceptors/src/second_interceptor.c | 58 +++++++
.../src/pubsub_zmq_topic_receiver.c | 28 +++-
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 174 +++++++++++----------
bundles/pubsub/pubsub_spi/CMakeLists.txt | 2 +-
.../pubsub/pubsub_spi/include/pubsub_interceptor.h | 46 ++++++
.../include/pubsub_interceptors_handler.h | 37 +++++
.../pubsub_spi/src/pubsub_interceptors_handler.c | 174 +++++++++++++++++++++
libs/utils/include/celix_array_list.h | 4 +
libs/utils/src/array_list.c | 21 +++
16 files changed, 716 insertions(+), 94 deletions(-)
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index be55811..ccb674c 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -239,6 +239,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
Celix::pubsub_admin_zmq
celix_pubsub_poi_publisher
celix_pubsub_poi_publisher2
+ celix_pubsub_interceptors_example
PROPERTIES
PSA_ZMQ_VERBOSE=true
PUBSUB_ETCD_DISCOVERY_VERBOSE=true
@@ -258,6 +259,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
celix_pubsub_poi_subscriber
+ celix_pubsub_interceptors_example
PROPERTIES
PSA_ZMQ_VERBOSE=true
PUBSUB_ETCD_DISCOVERY_VERBOSE=true
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt b/bundles/pubsub/examples/pubsub/CMakeLists.txt
index 8b5c653..427dbd1 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/CMakeLists.txt
@@ -17,6 +17,7 @@
include_directories("common/include")
+add_subdirectory(interceptors)
add_subdirectory(publisher)
add_subdirectory(publisher2)
if (BUILD_PUBSUB_PSA_WS)
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt b/bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt
similarity index 65%
copy from bundles/pubsub/examples/pubsub/CMakeLists.txt
copy to bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt
index 8b5c653..1bf920b 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt
@@ -15,13 +15,14 @@
# specific language governing permissions and limitations
# under the License.
-include_directories("common/include")
-
-add_subdirectory(publisher)
-add_subdirectory(publisher2)
-if (BUILD_PUBSUB_PSA_WS)
- add_subdirectory(pubsub_websocket)
-endif()
-add_subdirectory(subscriber)
-
+add_celix_bundle(celix_pubsub_interceptors_example
+ SYMBOLIC_NAME "celix_pubsub_interceptors_example"
+ VERSION "1.0.0"
+ SOURCES
+ src/ps_interceptor_activator.c
+ src/first_interceptor.c
+ src/second_interceptor.c
+)
+target_link_libraries(celix_pubsub_interceptors_example PRIVATE Celix::framework Celix::pubsub_spi)
+target_include_directories(celix_pubsub_interceptors_example PRIVATE include)
\ No newline at end of file
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
new file mode 100644
index 0000000..c7dd87a
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CELIX_FIRST_INTERCEPTOR_PRIVATE_H
+#define CELIX_FIRST_INTERCEPTOR_PRIVATE_H
+
+#include <celix_threads.h>
+#include "pubsub_interceptor.h"
+
+typedef struct first_interceptor {
+
+ celix_thread_mutex_t mutex;
+
+ uint64_t sequenceNumber;
+
+} first_interceptor_t;
+
+static const char *const SEQUENCE_NUMBER = "sequence.number";
+
+celix_status_t firstInterceptor_create(first_interceptor_t **interceptor);
+celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor);
+
+bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+
+#endif //CELIX_FIRST_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
new file mode 100644
index 0000000..979b2c7
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CELIX_SECOND_INTERCEPTOR_PRIVATE_H
+#define CELIX_SECOND_INTERCEPTOR_PRIVATE_H
+
+#include "pubsub_interceptor.h"
+
+typedef struct second_interceptor {
+
+} second_interceptor_t;
+
+celix_status_t secondInterceptor_create(second_interceptor_t **interceptor);
+celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor);
+
+bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+
+#endif //CELIX_SECOND_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
new file mode 100644
index 0000000..64c63fe
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "first_interceptor_private.h"
+
+celix_status_t firstInterceptor_create(first_interceptor_t **interceptor) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *interceptor = calloc(1, sizeof(**interceptor));
+ if (!*interceptor) {
+ status = CELIX_ENOMEM;
+ } else {
+ (*interceptor)->sequenceNumber = 0;
+
+ status = celixThreadMutex_create(&(*interceptor)->mutex, NULL);
+ }
+
+ return status;
+}
+
+celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor) {
+ free(interceptor);
+ return CELIX_SUCCESS;
+}
+
+
+bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+ first_interceptor_t *interceptor = handle;
+ celixThreadMutex_lock(&interceptor->mutex);
+
+ printf("Invoked preSend on first interceptor\n");
+
+ celix_properties_setLong(metadata, SEQUENCE_NUMBER, interceptor->sequenceNumber++);
+
+ celixThreadMutex_unlock(&interceptor->mutex);
+
+ return true;
+}
+
+void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+ uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
+ printf("Invoked postSend on first interceptor, for message with sequenceNumber [%llu]\n", sequence);
+}
+
+bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+ uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
+ printf("Invoked preReceive on first interceptor, for message with sequenceNumber [%llu]\n", sequence);
+
+ return true;
+}
+
+void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+ uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
+ printf("Invoked postReceive on first interceptor, for message with sequenceNumber [%llu]\n", sequence);
+}
+
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
new file mode 100644
index 0000000..12a055c
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "celix_api.h"
+
+#include "first_interceptor_private.h"
+#include "second_interceptor_private.h"
+
+#include <string.h>
+
+struct interceptorActivator {
+ first_interceptor_t *interceptor;
+ uint64_t interceptorSvcId;
+
+ second_interceptor_t *secondInterceptor;
+ uint64_t secondInterceptorSvcId;
+};
+
+static int interceptor_start(struct interceptorActivator *act, celix_bundle_context_t *ctx) {
+ pubsub_interceptor_t *interceptorSvc = calloc(1,sizeof(*interceptorSvc));
+ first_interceptor_t *interceptor = NULL;
+ firstInterceptor_create(&interceptor);
+
+ interceptorSvc->handle = interceptor;
+ interceptorSvc->preSend = firstInterceptor_preSend;
+ interceptorSvc->postSend = firstInterceptor_postSend;
+ interceptorSvc->preReceive = firstInterceptor_preReceive;
+ interceptorSvc->postReceive = firstInterceptor_postReceive;
+
+ act->interceptor = interceptor;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_setLong(props, OSGI_FRAMEWORK_SERVICE_RANKING, 10);
+
+ celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+ opts.svc = interceptorSvc;
+ opts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+ opts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
+ opts.properties = props;
+
+ act->interceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+
+ pubsub_interceptor_t *secondInterceptorSvc = calloc(1, sizeof(*secondInterceptorSvc));
+ second_interceptor_t *secondInterceptor = NULL;
+ secondInterceptor_create(&secondInterceptor);
+
+ secondInterceptorSvc->handle = secondInterceptor;
+ secondInterceptorSvc->preSend = secondInterceptor_preSend;
+ secondInterceptorSvc->postSend = secondInterceptor_postSend;
+ secondInterceptorSvc->preReceive = secondInterceptor_preReceive;
+ secondInterceptorSvc->postReceive = secondInterceptor_postReceive;
+
+ act->secondInterceptor = secondInterceptor;
+
+ celix_properties_t *secondProps = celix_properties_create();
+ celix_properties_setLong(secondProps, OSGI_FRAMEWORK_SERVICE_RANKING, 20);
+
+ celix_service_registration_options_t secondOpts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+ secondOpts.svc = secondInterceptorSvc;
+ secondOpts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+ secondOpts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
+ secondOpts.properties = secondProps;
+
+ act->secondInterceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &secondOpts);
+
+ return 0;
+}
+
+static int interceptor_stop(struct interceptorActivator *act, celix_bundle_context_t *ctx) {
+ celix_bundleContext_unregisterService(ctx, act->interceptorSvcId);
+ firstInterceptor_destroy(act->interceptor);
+
+ celix_bundleContext_unregisterService(ctx, act->secondInterceptorSvcId);
+ secondInterceptor_destroy(act->secondInterceptor);
+
+ return 0;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(struct interceptorActivator, interceptor_start, interceptor_stop)
\ No newline at end of file
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
new file mode 100644
index 0000000..33f0dd3
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "second_interceptor_private.h"
+
+celix_status_t secondInterceptor_create(second_interceptor_t **interceptor) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *interceptor = calloc(1, sizeof(**interceptor));
+ if (!*interceptor) {
+ status = CELIX_ENOMEM;
+ } else {
+ }
+
+ return status;
+}
+
+celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor) {
+ free(interceptor);
+ return CELIX_SUCCESS;
+}
+
+
+bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+ printf("Invoked preSend on second interceptor\n");
+
+ return true;
+}
+
+void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+ printf("Invoked postSend on second interceptor\n");
+}
+
+bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+ printf("Invoked preReceive on second interceptor\n");
+
+ return true;
+}
+
+void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+ printf("Invoked postReceive on second interceptor\n");
+}
+
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 979d373..cbc2cf9 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -37,6 +37,8 @@
#include <uuid/uuid.h>
#include <pubsub_admin_metrics.h>
+#include "pubsub_interceptors_handler.h"
+
#include "celix_utils_api.h"
#define PSA_ZMQ_RECV_TIMEOUT 1000
@@ -66,6 +68,8 @@ struct pubsub_zmq_topic_receiver {
char *topic;
bool metricsEnabled;
+ pubsub_interceptors_handler_t *interceptorsHandler;
+
void *zmqCtx;
void *zmqSock;
@@ -150,6 +154,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->topic = strndup(topic, 1024 * 1024);
receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
+ pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
#ifdef BUILD_WITH_ZMQ_SECURITY
char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
@@ -320,6 +325,8 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
zmq_close(receiver->zmqSock);
zmq_ctx_term(receiver->zmqCtx);
+ pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
+
free(receiver->scope);
free(receiver->topic);
}
@@ -492,12 +499,23 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
clock_gettime(CLOCK_REALTIME, &endSer);
}
if (status == CELIX_SUCCESS) {
- bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, message->metadata.metadata, &release);
- if (release) {
- msgSer->freeMsg(msgSer->handle, deserializedMsg);
+
+ const char *msgType = msgSer->msgName;
+ uint32_t msgId = message->header.msgId;
+ celix_properties_t *metadata = message->metadata.metadata;
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
+ if (cont) {
+ bool release = true;
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
+ metadata, &release);
+ if (release) {
+ msgSer->freeMsg(msgSer->handle, deserializedMsg);
+ }
+
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
+
+ updateReceiveCount += 1;
}
- updateReceiveCount += 1;
} else {
updateSerError += 1;
L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 50a7879..646c6d9 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -32,6 +32,7 @@
#include "pubsub_psa_zmq_constants.h"
#include <uuid/uuid.h>
#include "celix_constants.h"
+#include "pubsub_interceptors_handler.h"
#define FIRST_SEND_DELAY_IN_SECONDS 2
#define ZMQ_BIND_MAX_RETRY 10
@@ -56,6 +57,8 @@ struct pubsub_zmq_topic_sender {
bool metricsEnabled;
bool zeroCopyEnabled;
+ pubsub_interceptors_handler_t *interceptorsHandler;
+
char *scope;
char *topic;
char *url;
@@ -142,6 +145,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
+ pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+
//setting up zmq socket for ZMQ TopicSender
{
#ifdef BUILD_WITH_ZMQ_SECURITY
@@ -318,6 +323,8 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
celixThreadMutex_destroy(&sender->boundedServices.mutex);
celixThreadMutex_destroy(&sender->zmq.mutex);
+ pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
+
free(sender->scope);
free(sender->topic);
free(sender->url);
@@ -528,102 +535,111 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
}
if (status == CELIX_SUCCESS /*ser ok*/) {
- pubsub_protocol_message_t message;
- message.payload.payload = serializedOutput;
- message.payload.length = serializedOutputLen;
-
- void *payloadData = NULL;
- size_t payloadLength = 0;
- entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
-
- void *metadataData = NULL;
- size_t metadataLength = 0;
- if (metadata != NULL) {
- message.metadata.metadata = metadata;
- entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength);
+ if (metadata == NULL) {
+ metadata = celix_properties_create();
}
+ celixThreadMutex_lock(&entry->sendLock);
- message.header.msgId = msgTypeId;
- message.header.msgMajorVersion = 0;
- message.header.msgMinorVersion = 0;
- message.header.payloadSize = payloadLength;
- message.header.metadataSize = metadataLength;
+ bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
+ if (cont) {
+ pubsub_protocol_message_t message;
+ message.payload.payload = serializedOutput;
+ message.payload.length = serializedOutputLen;
+
+ void *payloadData = NULL;
+ size_t payloadLength = 0;
+ entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
+
+ void *metadataData = NULL;
+ size_t metadataLength = 0;
+ if (metadata != NULL) {
+ message.metadata.metadata = metadata;
+ entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength);
+ }
- void *headerData = NULL;
- size_t headerLength = 0;
+ message.header.msgId = msgTypeId;
+ message.header.msgMajorVersion = 0;
+ message.header.msgMinorVersion = 0;
+ message.header.payloadSize = payloadLength;
+ message.header.metadataSize = metadataLength;
- entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
+ void *headerData = NULL;
+ size_t headerLength = 0;
- celixThreadMutex_lock(&entry->sendLock);
+ entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
- errno = 0;
- bool sendOk;
-
- if (bound->parent->zeroCopyEnabled) {
- zmq_msg_t msg1; // Header
- zmq_msg_t msg2; // Payload
- zmq_msg_t msg3; // Metadata
- void *socket = zsock_resolve(sender->zmq.socket);
-
- zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound);
- //send header
- int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
- if (rc == -1) {
- L_WARN("Error sending header msg. %s", strerror(errno));
- zmq_msg_close(&msg1);
- }
- //send header
- if (rc > 0) {
- zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound);
- int flags = 0;
- if (metadataLength > 0) {
- flags = ZMQ_SNDMORE;
- }
- rc = zmq_msg_send(&msg2, socket, flags);
+ errno = 0;
+ bool sendOk;
+
+ if (bound->parent->zeroCopyEnabled) {
+ zmq_msg_t msg1; // Header
+ zmq_msg_t msg2; // Payload
+ zmq_msg_t msg3; // Metadata
+ void *socket = zsock_resolve(sender->zmq.socket);
+
+ zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound);
+ //send header
+ int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
if (rc == -1) {
- L_WARN("Error sending payload msg. %s", strerror(errno));
- zmq_msg_close(&msg2);
+ L_WARN("Error sending header msg. %s", strerror(errno));
+ zmq_msg_close(&msg1);
}
- }
- if (rc > 0 && metadataLength > 0) {
- zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound);
- rc = zmq_msg_send(&msg3, socket, 0);
- if (rc == -1) {
- L_WARN("Error sending metadata msg. %s", strerror(errno));
- zmq_msg_close(&msg3);
+ //send header
+ if (rc > 0) {
+ zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound);
+ int flags = 0;
+ if (metadataLength > 0) {
+ flags = ZMQ_SNDMORE;
+ }
+ rc = zmq_msg_send(&msg2, socket, flags);
+ if (rc == -1) {
+ L_WARN("Error sending payload msg. %s", strerror(errno));
+ zmq_msg_close(&msg2);
+ }
}
- }
- sendOk = rc > 0;
- } else {
- zmsg_t *msg = zmsg_new();
- zmsg_addmem(msg, headerData, headerLength);
- zmsg_addmem(msg, payloadData, payloadLength);
- if (metadataLength > 0) {
- zmsg_addmem(msg, metadataData, metadataLength);
- }
- int rc = zmsg_send(&msg, sender->zmq.socket);
- sendOk = rc == 0;
+ if (rc > 0 && metadataLength > 0) {
+ zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound);
+ rc = zmq_msg_send(&msg3, socket, 0);
+ if (rc == -1) {
+ L_WARN("Error sending metadata msg. %s", strerror(errno));
+ zmq_msg_close(&msg3);
+ }
+ }
+
+ sendOk = rc > 0;
+ } else {
+ zmsg_t *msg = zmsg_new();
+ zmsg_addmem(msg, headerData, headerLength);
+ zmsg_addmem(msg, payloadData, payloadLength);
+ if (metadataLength > 0) {
+ zmsg_addmem(msg, metadataData, metadataLength);
+ }
+ int rc = zmsg_send(&msg, sender->zmq.socket);
+ sendOk = rc == 0;
- if (!sendOk) {
- zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+ if (!sendOk) {
+ zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+ }
+
+ free(headerData);
+ free(payloadData);
+ free(metadataData);
}
- free(headerData);
- free(payloadData);
- free(metadataData);
- }
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
- celix_properties_destroy(message.metadata.metadata);
+ celix_properties_destroy(metadata);
- celixThreadMutex_unlock(&entry->sendLock);
- if (sendOk) {
- sendCountUpdate = 1;
- } else {
- sendErrorUpdate = 1;
- L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+ celixThreadMutex_unlock(&entry->sendLock);
+ if (sendOk) {
+ sendCountUpdate = 1;
+ } else {
+ sendErrorUpdate = 1;
+ L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+ }
}
} else {
serializationErrorUpdate = 1;
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt
index a19131c..03097ec 100644
--- a/bundles/pubsub/pubsub_spi/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -22,7 +22,7 @@ add_library(pubsub_spi STATIC
src/pubsub_endpoint.c
src/pubsub_utils.c
src/pubsub_admin_metrics.c
-)
+ src/pubsub_interceptors_handler.c)
set_target_properties(pubsub_spi PROPERTIES OUTPUT_NAME "celix_pubsub_spi")
target_include_directories(pubsub_spi PUBLIC
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
new file mode 100644
index 0000000..57765ee
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef __PUBSUB_INTERCEPTOR_H
+#define __PUBSUB_INTERCEPTOR_H
+
+#include <stdlib.h>
+
+#include "celix_properties.h"
+
+#define PUBSUB_INTERCEPTOR_SERVICE_NAME "pubsub.interceptor"
+#define PUBSUB_INTERCEPTOR_SERVICE_VERSION "1.0.0"
+
+typedef struct pubsub_interceptor_properties {
+ const char *scope;
+ const char *topic;
+} pubsub_interceptor_properties_t;
+
+struct pubsub_interceptor {
+ void *handle;
+
+ bool (*preSend)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+ void (*postSend)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+ bool (*preReceive)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+ void (*postReceive)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+};
+
+typedef struct pubsub_interceptor pubsub_interceptor_t;
+
+#endif //__PUBSUB_INTERCEPTOR_H
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
new file mode 100644
index 0000000..60461f8
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PUBSUB_INTERCEPTORS_HANDLER_H
+#define PUBSUB_INTERCEPTORS_HANDLER_H
+
+#include "celix_errno.h"
+#include "celix_array_list.h"
+#include "pubsub_interceptor.h"
+#include "celix_properties.h"
+
+typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t;
+
+celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler);
+celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
+
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+
+#endif //PUBSUB_INTERCEPTORS_HANDLER_H
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
new file mode 100644
index 0000000..afd17e7
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "celix_bundle_context.h"
+#include "celix_constants.h"
+#include "utils.h"
+
+#include "pubsub_interceptors_handler.h"
+
+typedef struct entry {
+ const celix_properties_t *properties;
+ pubsub_interceptor_t *interceptor;
+} entry_t;
+
+struct pubsub_interceptors_handler {
+ pubsub_interceptor_properties_t properties;
+
+ celix_array_list_t *interceptors;
+
+ long interceptorsTrackerId;
+
+ celix_bundle_context_t *ctx;
+
+ celix_thread_mutex_t mutex;
+};
+
+static int referenceCompare(const void *a, const void *b);
+
+static void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props);
+static void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, const celix_properties_t *props);
+
+celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *handler = calloc(1, sizeof(**handler));
+ if (!*handler) {
+ status = CELIX_ENOMEM;
+ } else {
+ (*handler)->ctx = ctx;
+
+ (*handler)->properties.scope = scope;
+ (*handler)->properties.topic = topic;
+
+ (*handler)->interceptors = celix_arrayList_create();
+
+ celixThreadMutex_create(&(*handler)->mutex, NULL);
+
+ // Create service tracker here, and not in the activator
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = *handler;
+ opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor;
+ opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor;
+ (*handler)->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ return status;
+}
+
+celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) {
+ celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId);
+
+ free(handler->interceptors);
+ free(handler);
+
+ return CELIX_SUCCESS;
+}
+
+void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_interceptors_handler_t *handler = handle;
+ celixThreadMutex_lock(&handler->mutex);
+
+ bool exists = false;
+ for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+ entry_t *entry = arrayList_get(handler->interceptors, i);
+ if (entry->interceptor == svc) {
+ exists = true;
+ }
+ }
+ if (!exists) {
+ entry_t *entry = calloc(1, sizeof(*entry));
+ entry->properties = props;
+ entry->interceptor = svc;
+ celix_arrayList_add(handler->interceptors, entry);
+
+ celix_arrayList_sort(handler->interceptors, referenceCompare);
+ }
+
+ celixThreadMutex_unlock(&handler->mutex);
+}
+
+void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attribute__((unused)) const celix_properties_t *props) {
+ pubsub_interceptors_handler_t *handler = handle;
+ for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+ entry_t *entry = arrayList_get(handler->interceptors, i);
+ if (entry->interceptor == svc) {
+ arrayList_remove(handler->interceptors, i);
+ break;
+ }
+ }
+}
+
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+ bool cont = true;
+ for (int i = arrayList_size(handler->interceptors) - 1; i >= 0; i--) {
+ entry_t *entry = arrayList_get(handler->interceptors, i);
+
+ cont = entry->interceptor->preSend(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+ if (!cont) {
+ break;
+ }
+ }
+
+ return cont;
+}
+
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+ for (int i = arrayList_size(handler->interceptors) - 1; i >= 0; i--) {
+ entry_t *entry = arrayList_get(handler->interceptors, i);
+
+ entry->interceptor->postSend(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+ }
+}
+
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+ bool cont = true;
+ for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+ entry_t *entry = arrayList_get(handler->interceptors, i);
+
+ cont = entry->interceptor->preReceive(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+ if (!cont) {
+ break;
+ }
+ }
+
+ return cont;
+}
+
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+ for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+ entry_t *entry = arrayList_get(handler->interceptors, i);
+
+ entry->interceptor->postReceive(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+ }
+}
+
+int referenceCompare(const void *a, const void *b) {
+ const entry_t *aEntry = a;
+ const entry_t *bEntry = b;
+
+ long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+ long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+ long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+ long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+ return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
\ No newline at end of file
diff --git a/libs/utils/include/celix_array_list.h b/libs/utils/include/celix_array_list.h
index aada9f4..ea4fc4a 100644
--- a/libs/utils/include/celix_array_list.h
+++ b/libs/utils/include/celix_array_list.h
@@ -45,6 +45,8 @@ typedef struct celix_array_list celix_array_list_t;
typedef bool (*celix_arrayList_equals_fp)(celix_array_list_entry_t, celix_array_list_entry_t);
+typedef int (*celix_arrayList_sort_fp)(const void *, const void *);
+
celix_array_list_t* celix_arrayList_create();
@@ -99,6 +101,8 @@ void celix_arrayList_removeDouble(celix_array_list_t *list, double val);
void celix_arrayList_removeBool(celix_array_list_t *list, bool val);
void celix_arrayList_removeSize(celix_array_list_t *list, size_t val);
+void celix_arrayList_sort(celix_array_list_t *list, celix_arrayList_sort_fp sortFp);
+
#ifdef __cplusplus
}
#endif
diff --git a/libs/utils/src/array_list.c b/libs/utils/src/array_list.c
index 808d3b1..665b0af 100644
--- a/libs/utils/src/array_list.c
+++ b/libs/utils/src/array_list.c
@@ -564,3 +564,24 @@ void celix_arrayList_clear(celix_array_list_t *list) {
}
list->size = 0;
}
+
+#if defined(__APPLE__)
+static int celix_arrayList_compare(void *arg, const void * a, const void *b) {
+#elif
+static int celix_arrayList_compare(const void * a, const void *b, void *arg) {
+#endif
+ const celix_array_list_entry_t *aEntry = a;
+ const celix_array_list_entry_t *bEntry = b;
+
+ celix_arrayList_sort_fp sort = arg;
+
+ return sort(aEntry->voidPtrVal, bEntry->voidPtrVal);
+}
+
+void celix_arrayList_sort(celix_array_list_t *list, celix_arrayList_sort_fp sortFp) {
+#if defined(__APPLE__)
+ qsort_r(list->elementData, list->size, sizeof(celix_array_list_entry_t), sortFp, celix_arrayList_compare);
+#elif
+ qsort_r(list->elementData, list->size, sizeof(celix_array_list_entry_t), celix_arrayList_compare, sortFp);
+#endif
+}