You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by ab...@apache.org on 2020/03/25 07:11:38 UTC

[celix] 01/01: Added interceptor service and handling for PubSub ZMQ. Updated arraylist to have a sort function.

This is an automated email from the ASF dual-hosted git repository.

abroekhuis pushed a commit to branch feature/pubsub_inteceptors
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 6025682e24786bdceffe14a4b6e85fc6e3fdc303
Author: Alexander Broekhuis <al...@luminis.eu>
AuthorDate: Wed Mar 25 08:11:26 2020 +0100

    Added interceptor service and handling for PubSub ZMQ. Updated arraylist to have a sort function.
---
 bundles/pubsub/examples/CMakeLists.txt             |   2 +
 bundles/pubsub/examples/pubsub/CMakeLists.txt      |   1 +
 .../pubsub/{ => interceptors}/CMakeLists.txt       |  19 +--
 .../include/first_interceptor_private.h            |  43 +++++
 .../include/second_interceptor_private.h           |  36 +++++
 .../pubsub/interceptors/src/first_interceptor.c    |  71 +++++++++
 .../interceptors/src/ps_interceptor_activator.c    |  94 +++++++++++
 .../pubsub/interceptors/src/second_interceptor.c   |  58 +++++++
 .../src/pubsub_zmq_topic_receiver.c                |  28 +++-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 174 +++++++++++----------
 bundles/pubsub/pubsub_spi/CMakeLists.txt           |   2 +-
 .../pubsub/pubsub_spi/include/pubsub_interceptor.h |  46 ++++++
 .../include/pubsub_interceptors_handler.h          |  37 +++++
 .../pubsub_spi/src/pubsub_interceptors_handler.c   | 174 +++++++++++++++++++++
 libs/utils/include/celix_array_list.h              |   4 +
 libs/utils/src/array_list.c                        |  21 +++
 16 files changed, 716 insertions(+), 94 deletions(-)

diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index be55811..ccb674c 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -239,6 +239,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::pubsub_admin_zmq
             celix_pubsub_poi_publisher
             celix_pubsub_poi_publisher2
+            celix_pubsub_interceptors_example
         PROPERTIES
             PSA_ZMQ_VERBOSE=true
             PUBSUB_ETCD_DISCOVERY_VERBOSE=true
@@ -258,6 +259,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_zmq
             celix_pubsub_poi_subscriber
+            celix_pubsub_interceptors_example
         PROPERTIES
             PSA_ZMQ_VERBOSE=true
             PUBSUB_ETCD_DISCOVERY_VERBOSE=true
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt b/bundles/pubsub/examples/pubsub/CMakeLists.txt
index 8b5c653..427dbd1 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/CMakeLists.txt
@@ -17,6 +17,7 @@
 
 include_directories("common/include")
 
+add_subdirectory(interceptors)
 add_subdirectory(publisher)
 add_subdirectory(publisher2)
 if (BUILD_PUBSUB_PSA_WS)
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt b/bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt
similarity index 65%
copy from bundles/pubsub/examples/pubsub/CMakeLists.txt
copy to bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt
index 8b5c653..1bf920b 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt
@@ -15,13 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-include_directories("common/include")
-
-add_subdirectory(publisher)
-add_subdirectory(publisher2)
-if (BUILD_PUBSUB_PSA_WS)
-    add_subdirectory(pubsub_websocket)
-endif()
-add_subdirectory(subscriber)
-
+add_celix_bundle(celix_pubsub_interceptors_example
+    SYMBOLIC_NAME "celix_pubsub_interceptors_example"
+    VERSION "1.0.0"
+    SOURCES 
+        src/ps_interceptor_activator.c
+        src/first_interceptor.c
+        src/second_interceptor.c
+)
 
+target_link_libraries(celix_pubsub_interceptors_example PRIVATE Celix::framework Celix::pubsub_spi)
+target_include_directories(celix_pubsub_interceptors_example PRIVATE include)
\ No newline at end of file
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
new file mode 100644
index 0000000..c7dd87a
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CELIX_FIRST_INTERCEPTOR_PRIVATE_H
+#define CELIX_FIRST_INTERCEPTOR_PRIVATE_H
+
+#include <celix_threads.h>
+#include "pubsub_interceptor.h"
+
+typedef struct first_interceptor {
+
+    celix_thread_mutex_t mutex;
+
+    uint64_t sequenceNumber;
+
+} first_interceptor_t;
+
+static const char *const SEQUENCE_NUMBER = "sequence.number";
+
+celix_status_t firstInterceptor_create(first_interceptor_t **interceptor);
+celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor);
+
+bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+
+#endif //CELIX_FIRST_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
new file mode 100644
index 0000000..979b2c7
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CELIX_SECOND_INTERCEPTOR_PRIVATE_H
+#define CELIX_SECOND_INTERCEPTOR_PRIVATE_H
+
+#include "pubsub_interceptor.h"
+
+typedef struct second_interceptor {
+
+} second_interceptor_t;
+
+celix_status_t secondInterceptor_create(second_interceptor_t **interceptor);
+celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor);
+
+bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+
+#endif //CELIX_SECOND_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
new file mode 100644
index 0000000..64c63fe
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "first_interceptor_private.h"
+
+celix_status_t firstInterceptor_create(first_interceptor_t **interceptor) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *interceptor = calloc(1, sizeof(**interceptor));
+    if (!*interceptor) {
+        status = CELIX_ENOMEM;
+    } else {
+        (*interceptor)->sequenceNumber = 0;
+
+        status = celixThreadMutex_create(&(*interceptor)->mutex, NULL);
+    }
+
+    return status;
+}
+
+celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor) {
+    free(interceptor);
+    return CELIX_SUCCESS;
+}
+
+
+bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    first_interceptor_t *interceptor = handle;
+    celixThreadMutex_lock(&interceptor->mutex);
+
+    printf("Invoked preSend on first interceptor\n");
+
+    celix_properties_setLong(metadata, SEQUENCE_NUMBER, interceptor->sequenceNumber++);
+
+    celixThreadMutex_unlock(&interceptor->mutex);
+
+    return true;
+}
+
+void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
+    printf("Invoked postSend on first interceptor, for message with sequenceNumber [%llu]\n", sequence);
+}
+
+bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
+    printf("Invoked preReceive on first interceptor, for message with sequenceNumber [%llu]\n", sequence);
+
+    return true;
+}
+
+void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
+    printf("Invoked postReceive on first interceptor, for message with sequenceNumber [%llu]\n", sequence);
+}
+
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
new file mode 100644
index 0000000..12a055c
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "celix_api.h"
+
+#include "first_interceptor_private.h"
+#include "second_interceptor_private.h"
+
+#include <string.h>
+
+struct interceptorActivator {
+    first_interceptor_t *interceptor;
+    uint64_t interceptorSvcId;
+
+    second_interceptor_t *secondInterceptor;
+    uint64_t secondInterceptorSvcId;
+};
+
+static int interceptor_start(struct interceptorActivator *act, celix_bundle_context_t *ctx) {
+    pubsub_interceptor_t *interceptorSvc = calloc(1,sizeof(*interceptorSvc));
+    first_interceptor_t *interceptor = NULL;
+    firstInterceptor_create(&interceptor);
+
+    interceptorSvc->handle = interceptor;
+    interceptorSvc->preSend = firstInterceptor_preSend;
+    interceptorSvc->postSend = firstInterceptor_postSend;
+    interceptorSvc->preReceive = firstInterceptor_preReceive;
+    interceptorSvc->postReceive = firstInterceptor_postReceive;
+
+    act->interceptor = interceptor;
+
+    celix_properties_t *props = celix_properties_create();
+    celix_properties_setLong(props, OSGI_FRAMEWORK_SERVICE_RANKING, 10);
+
+    celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+    opts.svc = interceptorSvc;
+    opts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+    opts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
+    opts.properties = props;
+
+    act->interceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+
+    pubsub_interceptor_t *secondInterceptorSvc = calloc(1, sizeof(*secondInterceptorSvc));
+    second_interceptor_t *secondInterceptor = NULL;
+    secondInterceptor_create(&secondInterceptor);
+
+    secondInterceptorSvc->handle = secondInterceptor;
+    secondInterceptorSvc->preSend = secondInterceptor_preSend;
+    secondInterceptorSvc->postSend = secondInterceptor_postSend;
+    secondInterceptorSvc->preReceive = secondInterceptor_preReceive;
+    secondInterceptorSvc->postReceive = secondInterceptor_postReceive;
+
+    act->secondInterceptor = secondInterceptor;
+
+    celix_properties_t *secondProps = celix_properties_create();
+    celix_properties_setLong(secondProps, OSGI_FRAMEWORK_SERVICE_RANKING, 20);
+
+    celix_service_registration_options_t secondOpts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+    secondOpts.svc = secondInterceptorSvc;
+    secondOpts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+    secondOpts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
+    secondOpts.properties = secondProps;
+
+    act->secondInterceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &secondOpts);
+
+    return 0;
+}
+
+static int interceptor_stop(struct interceptorActivator *act, celix_bundle_context_t *ctx) {
+    celix_bundleContext_unregisterService(ctx, act->interceptorSvcId);
+    firstInterceptor_destroy(act->interceptor);
+
+    celix_bundleContext_unregisterService(ctx, act->secondInterceptorSvcId);
+    secondInterceptor_destroy(act->secondInterceptor);
+
+    return 0;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(struct interceptorActivator, interceptor_start, interceptor_stop)
\ No newline at end of file
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
new file mode 100644
index 0000000..33f0dd3
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "second_interceptor_private.h"
+
+celix_status_t secondInterceptor_create(second_interceptor_t **interceptor) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *interceptor = calloc(1, sizeof(**interceptor));
+    if (!*interceptor) {
+        status = CELIX_ENOMEM;
+    } else {
+    }
+
+    return status;
+}
+
+celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor) {
+    free(interceptor);
+    return CELIX_SUCCESS;
+}
+
+
+bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    printf("Invoked preSend on second interceptor\n");
+
+    return true;
+}
+
+void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    printf("Invoked postSend on second interceptor\n");
+}
+
+bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    printf("Invoked preReceive on second interceptor\n");
+
+    return true;
+}
+
+void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    printf("Invoked postReceive on second interceptor\n");
+}
+
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 979d373..cbc2cf9 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -37,6 +37,8 @@
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 
+#include "pubsub_interceptors_handler.h"
+
 #include "celix_utils_api.h"
 
 #define PSA_ZMQ_RECV_TIMEOUT 1000
@@ -66,6 +68,8 @@ struct pubsub_zmq_topic_receiver {
     char *topic;
     bool metricsEnabled;
 
+    pubsub_interceptors_handler_t *interceptorsHandler;
+
     void *zmqCtx;
     void *zmqSock;
 
@@ -150,6 +154,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
     char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
@@ -320,6 +325,8 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
         zmq_close(receiver->zmqSock);
         zmq_ctx_term(receiver->zmqCtx);
 
+        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
+
         free(receiver->scope);
         free(receiver->topic);
     }
@@ -492,12 +499,23 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                 clock_gettime(CLOCK_REALTIME, &endSer);
             }
             if (status == CELIX_SUCCESS) {
-                bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, message->metadata.metadata, &release);
-                if (release) {
-                    msgSer->freeMsg(msgSer->handle, deserializedMsg);
+
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
+                if (cont) {
+                    bool release = true;
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
+                                 metadata, &release);
+                    if (release) {
+                        msgSer->freeMsg(msgSer->handle, deserializedMsg);
+                    }
+
+                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
+
+                    updateReceiveCount += 1;
                 }
-                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 50a7879..646c6d9 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -32,6 +32,7 @@
 #include "pubsub_psa_zmq_constants.h"
 #include <uuid/uuid.h>
 #include "celix_constants.h"
+#include "pubsub_interceptors_handler.h"
 
 #define FIRST_SEND_DELAY_IN_SECONDS             2
 #define ZMQ_BIND_MAX_RETRY                      10
@@ -56,6 +57,8 @@ struct pubsub_zmq_topic_sender {
     bool metricsEnabled;
     bool zeroCopyEnabled;
 
+    pubsub_interceptors_handler_t *interceptorsHandler;
+
     char *scope;
     char *topic;
     char *url;
@@ -142,6 +145,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
     sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+
     //setting up zmq socket for ZMQ TopicSender
     {
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -318,6 +323,8 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->zmq.mutex);
 
+        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
+
         free(sender->scope);
         free(sender->topic);
         free(sender->url);
@@ -528,102 +535,111 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
         }
 
         if (status == CELIX_SUCCESS /*ser ok*/) {
-            pubsub_protocol_message_t message;
-            message.payload.payload = serializedOutput;
-            message.payload.length = serializedOutputLen;
-
-            void *payloadData = NULL;
-            size_t payloadLength = 0;
-            entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
-
-            void *metadataData = NULL;
-            size_t metadataLength = 0;
-            if (metadata != NULL) {
-                message.metadata.metadata = metadata;
-                entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength);
+            if (metadata == NULL) {
+                metadata = celix_properties_create();
             }
+            celixThreadMutex_lock(&entry->sendLock);
 
-            message.header.msgId = msgTypeId;
-            message.header.msgMajorVersion = 0;
-            message.header.msgMinorVersion = 0;
-            message.header.payloadSize = payloadLength;
-            message.header.metadataSize = metadataLength;
+            bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
+            if (cont) {
+                pubsub_protocol_message_t message;
+                message.payload.payload = serializedOutput;
+                message.payload.length = serializedOutputLen;
+
+                void *payloadData = NULL;
+                size_t payloadLength = 0;
+                entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
+
+                void *metadataData = NULL;
+                size_t metadataLength = 0;
+                if (metadata != NULL) {
+                    message.metadata.metadata = metadata;
+                    entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength);
+                }
 
-            void *headerData = NULL;
-            size_t headerLength = 0;
+                message.header.msgId = msgTypeId;
+                message.header.msgMajorVersion = 0;
+                message.header.msgMinorVersion = 0;
+                message.header.payloadSize = payloadLength;
+                message.header.metadataSize = metadataLength;
 
-            entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
+                void *headerData = NULL;
+                size_t headerLength = 0;
 
-            celixThreadMutex_lock(&entry->sendLock);
+                entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
 
-            errno = 0;
-            bool sendOk;
-
-            if (bound->parent->zeroCopyEnabled) {
-                zmq_msg_t msg1; // Header
-                zmq_msg_t msg2; // Payload
-                zmq_msg_t msg3; // Metadata
-                void *socket = zsock_resolve(sender->zmq.socket);
-
-                zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound);
-                //send header
-                int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
-                if (rc == -1) {
-                    L_WARN("Error sending header msg. %s", strerror(errno));
-                    zmq_msg_close(&msg1);
-                }
 
-                //send header
-                if (rc > 0) {
-                    zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound);
-                    int flags = 0;
-                    if (metadataLength > 0) {
-                        flags = ZMQ_SNDMORE;
-                    }
-                    rc = zmq_msg_send(&msg2, socket, flags);
+                errno = 0;
+                bool sendOk;
+
+                if (bound->parent->zeroCopyEnabled) {
+                    zmq_msg_t msg1; // Header
+                    zmq_msg_t msg2; // Payload
+                    zmq_msg_t msg3; // Metadata
+                    void *socket = zsock_resolve(sender->zmq.socket);
+
+                    zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound);
+                    //send header
+                    int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
                     if (rc == -1) {
-                        L_WARN("Error sending payload msg. %s", strerror(errno));
-                        zmq_msg_close(&msg2);
+                        L_WARN("Error sending header msg. %s", strerror(errno));
+                        zmq_msg_close(&msg1);
                     }
-                }
 
-                if (rc > 0 && metadataLength > 0) {
-                    zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound);
-                    rc = zmq_msg_send(&msg3, socket, 0);
-                    if (rc == -1) {
-                        L_WARN("Error sending metadata msg. %s", strerror(errno));
-                        zmq_msg_close(&msg3);
+                    //send header
+                    if (rc > 0) {
+                        zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound);
+                        int flags = 0;
+                        if (metadataLength > 0) {
+                            flags = ZMQ_SNDMORE;
+                        }
+                        rc = zmq_msg_send(&msg2, socket, flags);
+                        if (rc == -1) {
+                            L_WARN("Error sending payload msg. %s", strerror(errno));
+                            zmq_msg_close(&msg2);
+                        }
                     }
-                }
 
-                sendOk = rc > 0;
-            } else {
-                zmsg_t *msg = zmsg_new();
-                zmsg_addmem(msg, headerData, headerLength);
-                zmsg_addmem(msg, payloadData, payloadLength);
-                if (metadataLength > 0) {
-                    zmsg_addmem(msg, metadataData, metadataLength);
-                }
-                int rc = zmsg_send(&msg, sender->zmq.socket);
-                sendOk = rc == 0;
+                    if (rc > 0 && metadataLength > 0) {
+                        zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound);
+                        rc = zmq_msg_send(&msg3, socket, 0);
+                        if (rc == -1) {
+                            L_WARN("Error sending metadata msg. %s", strerror(errno));
+                            zmq_msg_close(&msg3);
+                        }
+                    }
+
+                    sendOk = rc > 0;
+                } else {
+                    zmsg_t *msg = zmsg_new();
+                    zmsg_addmem(msg, headerData, headerLength);
+                    zmsg_addmem(msg, payloadData, payloadLength);
+                    if (metadataLength > 0) {
+                        zmsg_addmem(msg, metadataData, metadataLength);
+                    }
+                    int rc = zmsg_send(&msg, sender->zmq.socket);
+                    sendOk = rc == 0;
 
-                if (!sendOk) {
-                    zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+                    if (!sendOk) {
+                        zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+                    }
+
+                    free(headerData);
+                    free(payloadData);
+                    free(metadataData);
                 }
 
-                free(headerData);
-                free(payloadData);
-                free(metadataData);
-            }
+                pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
 
-            celix_properties_destroy(message.metadata.metadata);
+                celix_properties_destroy(metadata);
 
-            celixThreadMutex_unlock(&entry->sendLock);
-            if (sendOk) {
-                sendCountUpdate = 1;
-            } else {
-                sendErrorUpdate = 1;
-                L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+                celixThreadMutex_unlock(&entry->sendLock);
+                if (sendOk) {
+                    sendCountUpdate = 1;
+                } else {
+                    sendErrorUpdate = 1;
+                    L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+                }
             }
         } else {
             serializationErrorUpdate = 1;
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt
index a19131c..03097ec 100644
--- a/bundles/pubsub/pubsub_spi/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -22,7 +22,7 @@ add_library(pubsub_spi STATIC
         src/pubsub_endpoint.c
         src/pubsub_utils.c
         src/pubsub_admin_metrics.c
-)
+        src/pubsub_interceptors_handler.c)
 
 set_target_properties(pubsub_spi PROPERTIES OUTPUT_NAME "celix_pubsub_spi")
 target_include_directories(pubsub_spi PUBLIC
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
new file mode 100644
index 0000000..57765ee
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef __PUBSUB_INTERCEPTOR_H
+#define __PUBSUB_INTERCEPTOR_H
+
+#include <stdlib.h>
+
+#include "celix_properties.h"
+
+#define PUBSUB_INTERCEPTOR_SERVICE_NAME     "pubsub.interceptor"
+#define PUBSUB_INTERCEPTOR_SERVICE_VERSION  "1.0.0"
+
+typedef struct pubsub_interceptor_properties {
+    const char *scope;
+    const char *topic;
+} pubsub_interceptor_properties_t;
+
+struct pubsub_interceptor {
+    void *handle;
+
+    bool (*preSend)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+    void (*postSend)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+    bool (*preReceive)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+    void (*postReceive)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+};
+
+typedef struct pubsub_interceptor pubsub_interceptor_t;
+
+#endif //__PUBSUB_INTERCEPTOR_H
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
new file mode 100644
index 0000000..60461f8
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PUBSUB_INTERCEPTORS_HANDLER_H
+#define PUBSUB_INTERCEPTORS_HANDLER_H
+
+#include "celix_errno.h"
+#include "celix_array_list.h"
+#include "pubsub_interceptor.h"
+#include "celix_properties.h"
+
+typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t;
+
+celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler);
+celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
+
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+
+#endif //PUBSUB_INTERCEPTORS_HANDLER_H
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
new file mode 100644
index 0000000..afd17e7
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "celix_bundle_context.h"
+#include "celix_constants.h"
+#include "utils.h"
+
+#include "pubsub_interceptors_handler.h"
+
+typedef struct entry {
+    const celix_properties_t *properties;
+    pubsub_interceptor_t *interceptor;
+} entry_t;
+
+struct pubsub_interceptors_handler {
+    pubsub_interceptor_properties_t properties;
+
+    celix_array_list_t *interceptors;
+
+    long interceptorsTrackerId;
+
+    celix_bundle_context_t *ctx;
+
+    celix_thread_mutex_t mutex;
+};
+
+static int referenceCompare(const void *a, const void *b);
+
+static void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props);
+static void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, const celix_properties_t *props);
+
+celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *handler = calloc(1, sizeof(**handler));
+    if (!*handler) {
+        status = CELIX_ENOMEM;
+    } else {
+        (*handler)->ctx = ctx;
+
+        (*handler)->properties.scope = scope;
+        (*handler)->properties.topic = topic;
+
+        (*handler)->interceptors = celix_arrayList_create();
+
+        celixThreadMutex_create(&(*handler)->mutex, NULL);
+
+        // Create service tracker here, and not in the activator
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = *handler;
+        opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor;
+        opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor;
+        (*handler)->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+    return status;
+}
+
+celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) {
+    celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId);
+
+    free(handler->interceptors);
+    free(handler);
+
+    return CELIX_SUCCESS;
+}
+
+void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_interceptors_handler_t *handler = handle;
+    celixThreadMutex_lock(&handler->mutex);
+
+    bool exists = false;
+    for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+        if (entry->interceptor == svc) {
+            exists = true;
+        }
+    }
+    if (!exists) {
+        entry_t *entry = calloc(1, sizeof(*entry));
+        entry->properties = props;
+        entry->interceptor = svc;
+        celix_arrayList_add(handler->interceptors, entry);
+
+        celix_arrayList_sort(handler->interceptors, referenceCompare);
+    }
+
+    celixThreadMutex_unlock(&handler->mutex);
+}
+
+void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attribute__((unused)) const celix_properties_t *props) {
+    pubsub_interceptors_handler_t *handler = handle;
+    for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+        if (entry->interceptor == svc) {
+            arrayList_remove(handler->interceptors, i);
+            break;
+        }
+    }
+}
+
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+    bool cont = true;
+    for (int i = arrayList_size(handler->interceptors) - 1; i >= 0; i--) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+
+        cont = entry->interceptor->preSend(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+        if (!cont) {
+            break;
+        }
+    }
+
+    return cont;
+}
+
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+    for (int i = arrayList_size(handler->interceptors) - 1; i >= 0; i--) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+
+        entry->interceptor->postSend(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+    }
+}
+
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+    bool cont = true;
+    for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+
+        cont = entry->interceptor->preReceive(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+        if (!cont) {
+            break;
+        }
+    }
+
+    return cont;
+}
+
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+    for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+
+        entry->interceptor->postReceive(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+    }
+}
+
+int referenceCompare(const void *a, const void *b) {
+    const entry_t *aEntry = a;
+    const entry_t *bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
\ No newline at end of file
diff --git a/libs/utils/include/celix_array_list.h b/libs/utils/include/celix_array_list.h
index aada9f4..ea4fc4a 100644
--- a/libs/utils/include/celix_array_list.h
+++ b/libs/utils/include/celix_array_list.h
@@ -45,6 +45,8 @@ typedef struct celix_array_list celix_array_list_t;
 
 typedef bool (*celix_arrayList_equals_fp)(celix_array_list_entry_t, celix_array_list_entry_t);
 
+typedef int (*celix_arrayList_sort_fp)(const void *, const void *);
+
 
 celix_array_list_t* celix_arrayList_create();
 
@@ -99,6 +101,8 @@ void celix_arrayList_removeDouble(celix_array_list_t *list, double val);
 void celix_arrayList_removeBool(celix_array_list_t *list, bool val);
 void celix_arrayList_removeSize(celix_array_list_t *list, size_t val);
 
+void celix_arrayList_sort(celix_array_list_t *list, celix_arrayList_sort_fp sortFp);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/libs/utils/src/array_list.c b/libs/utils/src/array_list.c
index 808d3b1..665b0af 100644
--- a/libs/utils/src/array_list.c
+++ b/libs/utils/src/array_list.c
@@ -564,3 +564,24 @@ void celix_arrayList_clear(celix_array_list_t *list) {
     }
     list->size = 0;
 }
+
+#if defined(__APPLE__)
+static int celix_arrayList_compare(void *arg, const void * a, const void *b) {
+#elif
+static int celix_arrayList_compare(const void * a, const void *b, void *arg) {
+#endif
+    const celix_array_list_entry_t *aEntry = a;
+    const celix_array_list_entry_t *bEntry = b;
+
+    celix_arrayList_sort_fp sort = arg;
+
+    return sort(aEntry->voidPtrVal, bEntry->voidPtrVal);
+}
+
+void celix_arrayList_sort(celix_array_list_t *list, celix_arrayList_sort_fp sortFp) {
+#if defined(__APPLE__)
+    qsort_r(list->elementData, list->size, sizeof(celix_array_list_entry_t), sortFp, celix_arrayList_compare);
+#elif
+    qsort_r(list->elementData, list->size, sizeof(celix_array_list_entry_t), celix_arrayList_compare, sortFp);
+#endif
+}