You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by ab...@apache.org on 2020/03/25 07:11:37 UTC

[celix] branch feature/pubsub_inteceptors created (now 6025682)

This is an automated email from the ASF dual-hosted git repository.

abroekhuis pushed a change to branch feature/pubsub_inteceptors
in repository https://gitbox.apache.org/repos/asf/celix.git.


      at 6025682  Added interceptor service and handling for PubSub ZMQ. Updated arraylist to have a sort function.

This branch includes the following new commits:

     new 6025682  Added interceptor service and handling for PubSub ZMQ. Updated arraylist to have a sort function.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[celix] 01/01: Added interceptor service and handling for PubSub ZMQ. Updated arraylist to have a sort function.

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abroekhuis pushed a commit to branch feature/pubsub_inteceptors
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 6025682e24786bdceffe14a4b6e85fc6e3fdc303
Author: Alexander Broekhuis <al...@luminis.eu>
AuthorDate: Wed Mar 25 08:11:26 2020 +0100

    Added interceptor service and handling for PubSub ZMQ. Updated arraylist to have a sort function.
---
 bundles/pubsub/examples/CMakeLists.txt             |   2 +
 bundles/pubsub/examples/pubsub/CMakeLists.txt      |   1 +
 .../pubsub/{ => interceptors}/CMakeLists.txt       |  19 +--
 .../include/first_interceptor_private.h            |  43 +++++
 .../include/second_interceptor_private.h           |  36 +++++
 .../pubsub/interceptors/src/first_interceptor.c    |  71 +++++++++
 .../interceptors/src/ps_interceptor_activator.c    |  94 +++++++++++
 .../pubsub/interceptors/src/second_interceptor.c   |  58 +++++++
 .../src/pubsub_zmq_topic_receiver.c                |  28 +++-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 174 +++++++++++----------
 bundles/pubsub/pubsub_spi/CMakeLists.txt           |   2 +-
 .../pubsub/pubsub_spi/include/pubsub_interceptor.h |  46 ++++++
 .../include/pubsub_interceptors_handler.h          |  37 +++++
 .../pubsub_spi/src/pubsub_interceptors_handler.c   | 174 +++++++++++++++++++++
 libs/utils/include/celix_array_list.h              |   4 +
 libs/utils/src/array_list.c                        |  21 +++
 16 files changed, 716 insertions(+), 94 deletions(-)

diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index be55811..ccb674c 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -239,6 +239,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::pubsub_admin_zmq
             celix_pubsub_poi_publisher
             celix_pubsub_poi_publisher2
+            celix_pubsub_interceptors_example
         PROPERTIES
             PSA_ZMQ_VERBOSE=true
             PUBSUB_ETCD_DISCOVERY_VERBOSE=true
@@ -258,6 +259,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_zmq
             celix_pubsub_poi_subscriber
+            celix_pubsub_interceptors_example
         PROPERTIES
             PSA_ZMQ_VERBOSE=true
             PUBSUB_ETCD_DISCOVERY_VERBOSE=true
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt b/bundles/pubsub/examples/pubsub/CMakeLists.txt
index 8b5c653..427dbd1 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/CMakeLists.txt
@@ -17,6 +17,7 @@
 
 include_directories("common/include")
 
+add_subdirectory(interceptors)
 add_subdirectory(publisher)
 add_subdirectory(publisher2)
 if (BUILD_PUBSUB_PSA_WS)
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt b/bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt
similarity index 65%
copy from bundles/pubsub/examples/pubsub/CMakeLists.txt
copy to bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt
index 8b5c653..1bf920b 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt
@@ -15,13 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-include_directories("common/include")
-
-add_subdirectory(publisher)
-add_subdirectory(publisher2)
-if (BUILD_PUBSUB_PSA_WS)
-    add_subdirectory(pubsub_websocket)
-endif()
-add_subdirectory(subscriber)
-
+add_celix_bundle(celix_pubsub_interceptors_example
+    SYMBOLIC_NAME "celix_pubsub_interceptors_example"
+    VERSION "1.0.0"
+    SOURCES 
+        src/ps_interceptor_activator.c
+        src/first_interceptor.c
+        src/second_interceptor.c
+)
 
+target_link_libraries(celix_pubsub_interceptors_example PRIVATE Celix::framework Celix::pubsub_spi)
+target_include_directories(celix_pubsub_interceptors_example PRIVATE include)
\ No newline at end of file
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
new file mode 100644
index 0000000..c7dd87a
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CELIX_FIRST_INTERCEPTOR_PRIVATE_H
+#define CELIX_FIRST_INTERCEPTOR_PRIVATE_H
+
+#include <celix_threads.h>
+#include "pubsub_interceptor.h"
+
+typedef struct first_interceptor {
+
+    celix_thread_mutex_t mutex;
+
+    uint64_t sequenceNumber;
+
+} first_interceptor_t;
+
+static const char *const SEQUENCE_NUMBER = "sequence.number";
+
+celix_status_t firstInterceptor_create(first_interceptor_t **interceptor);
+celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor);
+
+bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+
+#endif //CELIX_FIRST_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
new file mode 100644
index 0000000..979b2c7
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CELIX_SECOND_INTERCEPTOR_PRIVATE_H
+#define CELIX_SECOND_INTERCEPTOR_PRIVATE_H
+
+#include "pubsub_interceptor.h"
+
+typedef struct second_interceptor {
+
+} second_interceptor_t;
+
+celix_status_t secondInterceptor_create(second_interceptor_t **interceptor);
+celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor);
+
+bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+
+#endif //CELIX_SECOND_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
new file mode 100644
index 0000000..64c63fe
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "first_interceptor_private.h"
+
+celix_status_t firstInterceptor_create(first_interceptor_t **interceptor) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *interceptor = calloc(1, sizeof(**interceptor));
+    if (!*interceptor) {
+        status = CELIX_ENOMEM;
+    } else {
+        (*interceptor)->sequenceNumber = 0;
+
+        status = celixThreadMutex_create(&(*interceptor)->mutex, NULL);
+    }
+
+    return status;
+}
+
+celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor) {
+    free(interceptor);
+    return CELIX_SUCCESS;
+}
+
+
+bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    first_interceptor_t *interceptor = handle;
+    celixThreadMutex_lock(&interceptor->mutex);
+
+    printf("Invoked preSend on first interceptor\n");
+
+    celix_properties_setLong(metadata, SEQUENCE_NUMBER, interceptor->sequenceNumber++);
+
+    celixThreadMutex_unlock(&interceptor->mutex);
+
+    return true;
+}
+
+void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
+    printf("Invoked postSend on first interceptor, for message with sequenceNumber [%llu]\n", sequence);
+}
+
+bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
+    printf("Invoked preReceive on first interceptor, for message with sequenceNumber [%llu]\n", sequence);
+
+    return true;
+}
+
+void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
+    printf("Invoked postReceive on first interceptor, for message with sequenceNumber [%llu]\n", sequence);
+}
+
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
new file mode 100644
index 0000000..12a055c
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "celix_api.h"
+
+#include "first_interceptor_private.h"
+#include "second_interceptor_private.h"
+
+#include <string.h>
+
+struct interceptorActivator {
+    first_interceptor_t *interceptor;
+    uint64_t interceptorSvcId;
+
+    second_interceptor_t *secondInterceptor;
+    uint64_t secondInterceptorSvcId;
+};
+
+static int interceptor_start(struct interceptorActivator *act, celix_bundle_context_t *ctx) {
+    pubsub_interceptor_t *interceptorSvc = calloc(1,sizeof(*interceptorSvc));
+    first_interceptor_t *interceptor = NULL;
+    firstInterceptor_create(&interceptor);
+
+    interceptorSvc->handle = interceptor;
+    interceptorSvc->preSend = firstInterceptor_preSend;
+    interceptorSvc->postSend = firstInterceptor_postSend;
+    interceptorSvc->preReceive = firstInterceptor_preReceive;
+    interceptorSvc->postReceive = firstInterceptor_postReceive;
+
+    act->interceptor = interceptor;
+
+    celix_properties_t *props = celix_properties_create();
+    celix_properties_setLong(props, OSGI_FRAMEWORK_SERVICE_RANKING, 10);
+
+    celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+    opts.svc = interceptorSvc;
+    opts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+    opts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
+    opts.properties = props;
+
+    act->interceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+
+    pubsub_interceptor_t *secondInterceptorSvc = calloc(1, sizeof(*secondInterceptorSvc));
+    second_interceptor_t *secondInterceptor = NULL;
+    secondInterceptor_create(&secondInterceptor);
+
+    secondInterceptorSvc->handle = secondInterceptor;
+    secondInterceptorSvc->preSend = secondInterceptor_preSend;
+    secondInterceptorSvc->postSend = secondInterceptor_postSend;
+    secondInterceptorSvc->preReceive = secondInterceptor_preReceive;
+    secondInterceptorSvc->postReceive = secondInterceptor_postReceive;
+
+    act->secondInterceptor = secondInterceptor;
+
+    celix_properties_t *secondProps = celix_properties_create();
+    celix_properties_setLong(secondProps, OSGI_FRAMEWORK_SERVICE_RANKING, 20);
+
+    celix_service_registration_options_t secondOpts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+    secondOpts.svc = secondInterceptorSvc;
+    secondOpts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+    secondOpts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
+    secondOpts.properties = secondProps;
+
+    act->secondInterceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &secondOpts);
+
+    return 0;
+}
+
+static int interceptor_stop(struct interceptorActivator *act, celix_bundle_context_t *ctx) {
+    celix_bundleContext_unregisterService(ctx, act->interceptorSvcId);
+    firstInterceptor_destroy(act->interceptor);
+
+    celix_bundleContext_unregisterService(ctx, act->secondInterceptorSvcId);
+    secondInterceptor_destroy(act->secondInterceptor);
+
+    return 0;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(struct interceptorActivator, interceptor_start, interceptor_stop)
\ No newline at end of file
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
new file mode 100644
index 0000000..33f0dd3
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "second_interceptor_private.h"
+
+celix_status_t secondInterceptor_create(second_interceptor_t **interceptor) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *interceptor = calloc(1, sizeof(**interceptor));
+    if (!*interceptor) {
+        status = CELIX_ENOMEM;
+    } else {
+    }
+
+    return status;
+}
+
+celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor) {
+    free(interceptor);
+    return CELIX_SUCCESS;
+}
+
+
+bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    printf("Invoked preSend on second interceptor\n");
+
+    return true;
+}
+
+void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    printf("Invoked postSend on second interceptor\n");
+}
+
+bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    printf("Invoked preReceive on second interceptor\n");
+
+    return true;
+}
+
+void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
+    printf("Invoked postReceive on second interceptor\n");
+}
+
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 979d373..cbc2cf9 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -37,6 +37,8 @@
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 
+#include "pubsub_interceptors_handler.h"
+
 #include "celix_utils_api.h"
 
 #define PSA_ZMQ_RECV_TIMEOUT 1000
@@ -66,6 +68,8 @@ struct pubsub_zmq_topic_receiver {
     char *topic;
     bool metricsEnabled;
 
+    pubsub_interceptors_handler_t *interceptorsHandler;
+
     void *zmqCtx;
     void *zmqSock;
 
@@ -150,6 +154,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
     char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
@@ -320,6 +325,8 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
         zmq_close(receiver->zmqSock);
         zmq_ctx_term(receiver->zmqCtx);
 
+        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
+
         free(receiver->scope);
         free(receiver->topic);
     }
@@ -492,12 +499,23 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                 clock_gettime(CLOCK_REALTIME, &endSer);
             }
             if (status == CELIX_SUCCESS) {
-                bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, message->metadata.metadata, &release);
-                if (release) {
-                    msgSer->freeMsg(msgSer->handle, deserializedMsg);
+
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
+                if (cont) {
+                    bool release = true;
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
+                                 metadata, &release);
+                    if (release) {
+                        msgSer->freeMsg(msgSer->handle, deserializedMsg);
+                    }
+
+                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
+
+                    updateReceiveCount += 1;
                 }
-                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 50a7879..646c6d9 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -32,6 +32,7 @@
 #include "pubsub_psa_zmq_constants.h"
 #include <uuid/uuid.h>
 #include "celix_constants.h"
+#include "pubsub_interceptors_handler.h"
 
 #define FIRST_SEND_DELAY_IN_SECONDS             2
 #define ZMQ_BIND_MAX_RETRY                      10
@@ -56,6 +57,8 @@ struct pubsub_zmq_topic_sender {
     bool metricsEnabled;
     bool zeroCopyEnabled;
 
+    pubsub_interceptors_handler_t *interceptorsHandler;
+
     char *scope;
     char *topic;
     char *url;
@@ -142,6 +145,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
     sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+
     //setting up zmq socket for ZMQ TopicSender
     {
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -318,6 +323,8 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->zmq.mutex);
 
+        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
+
         free(sender->scope);
         free(sender->topic);
         free(sender->url);
@@ -528,102 +535,111 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
         }
 
         if (status == CELIX_SUCCESS /*ser ok*/) {
-            pubsub_protocol_message_t message;
-            message.payload.payload = serializedOutput;
-            message.payload.length = serializedOutputLen;
-
-            void *payloadData = NULL;
-            size_t payloadLength = 0;
-            entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
-
-            void *metadataData = NULL;
-            size_t metadataLength = 0;
-            if (metadata != NULL) {
-                message.metadata.metadata = metadata;
-                entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength);
+            if (metadata == NULL) {
+                metadata = celix_properties_create();
             }
+            celixThreadMutex_lock(&entry->sendLock);
 
-            message.header.msgId = msgTypeId;
-            message.header.msgMajorVersion = 0;
-            message.header.msgMinorVersion = 0;
-            message.header.payloadSize = payloadLength;
-            message.header.metadataSize = metadataLength;
+            bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
+            if (cont) {
+                pubsub_protocol_message_t message;
+                message.payload.payload = serializedOutput;
+                message.payload.length = serializedOutputLen;
+
+                void *payloadData = NULL;
+                size_t payloadLength = 0;
+                entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
+
+                void *metadataData = NULL;
+                size_t metadataLength = 0;
+                if (metadata != NULL) {
+                    message.metadata.metadata = metadata;
+                    entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength);
+                }
 
-            void *headerData = NULL;
-            size_t headerLength = 0;
+                message.header.msgId = msgTypeId;
+                message.header.msgMajorVersion = 0;
+                message.header.msgMinorVersion = 0;
+                message.header.payloadSize = payloadLength;
+                message.header.metadataSize = metadataLength;
 
-            entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
+                void *headerData = NULL;
+                size_t headerLength = 0;
 
-            celixThreadMutex_lock(&entry->sendLock);
+                entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
 
-            errno = 0;
-            bool sendOk;
-
-            if (bound->parent->zeroCopyEnabled) {
-                zmq_msg_t msg1; // Header
-                zmq_msg_t msg2; // Payload
-                zmq_msg_t msg3; // Metadata
-                void *socket = zsock_resolve(sender->zmq.socket);
-
-                zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound);
-                //send header
-                int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
-                if (rc == -1) {
-                    L_WARN("Error sending header msg. %s", strerror(errno));
-                    zmq_msg_close(&msg1);
-                }
 
-                //send header
-                if (rc > 0) {
-                    zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound);
-                    int flags = 0;
-                    if (metadataLength > 0) {
-                        flags = ZMQ_SNDMORE;
-                    }
-                    rc = zmq_msg_send(&msg2, socket, flags);
+                errno = 0;
+                bool sendOk;
+
+                if (bound->parent->zeroCopyEnabled) {
+                    zmq_msg_t msg1; // Header
+                    zmq_msg_t msg2; // Payload
+                    zmq_msg_t msg3; // Metadata
+                    void *socket = zsock_resolve(sender->zmq.socket);
+
+                    zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound);
+                    //send header
+                    int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
                     if (rc == -1) {
-                        L_WARN("Error sending payload msg. %s", strerror(errno));
-                        zmq_msg_close(&msg2);
+                        L_WARN("Error sending header msg. %s", strerror(errno));
+                        zmq_msg_close(&msg1);
                     }
-                }
 
-                if (rc > 0 && metadataLength > 0) {
-                    zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound);
-                    rc = zmq_msg_send(&msg3, socket, 0);
-                    if (rc == -1) {
-                        L_WARN("Error sending metadata msg. %s", strerror(errno));
-                        zmq_msg_close(&msg3);
+                    //send header
+                    if (rc > 0) {
+                        zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound);
+                        int flags = 0;
+                        if (metadataLength > 0) {
+                            flags = ZMQ_SNDMORE;
+                        }
+                        rc = zmq_msg_send(&msg2, socket, flags);
+                        if (rc == -1) {
+                            L_WARN("Error sending payload msg. %s", strerror(errno));
+                            zmq_msg_close(&msg2);
+                        }
                     }
-                }
 
-                sendOk = rc > 0;
-            } else {
-                zmsg_t *msg = zmsg_new();
-                zmsg_addmem(msg, headerData, headerLength);
-                zmsg_addmem(msg, payloadData, payloadLength);
-                if (metadataLength > 0) {
-                    zmsg_addmem(msg, metadataData, metadataLength);
-                }
-                int rc = zmsg_send(&msg, sender->zmq.socket);
-                sendOk = rc == 0;
+                    if (rc > 0 && metadataLength > 0) {
+                        zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound);
+                        rc = zmq_msg_send(&msg3, socket, 0);
+                        if (rc == -1) {
+                            L_WARN("Error sending metadata msg. %s", strerror(errno));
+                            zmq_msg_close(&msg3);
+                        }
+                    }
+
+                    sendOk = rc > 0;
+                } else {
+                    zmsg_t *msg = zmsg_new();
+                    zmsg_addmem(msg, headerData, headerLength);
+                    zmsg_addmem(msg, payloadData, payloadLength);
+                    if (metadataLength > 0) {
+                        zmsg_addmem(msg, metadataData, metadataLength);
+                    }
+                    int rc = zmsg_send(&msg, sender->zmq.socket);
+                    sendOk = rc == 0;
 
-                if (!sendOk) {
-                    zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+                    if (!sendOk) {
+                        zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+                    }
+
+                    free(headerData);
+                    free(payloadData);
+                    free(metadataData);
                 }
 
-                free(headerData);
-                free(payloadData);
-                free(metadataData);
-            }
+                pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
 
-            celix_properties_destroy(message.metadata.metadata);
+                celix_properties_destroy(metadata);
 
-            celixThreadMutex_unlock(&entry->sendLock);
-            if (sendOk) {
-                sendCountUpdate = 1;
-            } else {
-                sendErrorUpdate = 1;
-                L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+                celixThreadMutex_unlock(&entry->sendLock);
+                if (sendOk) {
+                    sendCountUpdate = 1;
+                } else {
+                    sendErrorUpdate = 1;
+                    L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+                }
             }
         } else {
             serializationErrorUpdate = 1;
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt
index a19131c..03097ec 100644
--- a/bundles/pubsub/pubsub_spi/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -22,7 +22,7 @@ add_library(pubsub_spi STATIC
         src/pubsub_endpoint.c
         src/pubsub_utils.c
         src/pubsub_admin_metrics.c
-)
+        src/pubsub_interceptors_handler.c)
 
 set_target_properties(pubsub_spi PROPERTIES OUTPUT_NAME "celix_pubsub_spi")
 target_include_directories(pubsub_spi PUBLIC
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
new file mode 100644
index 0000000..57765ee
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef __PUBSUB_INTERCEPTOR_H
+#define __PUBSUB_INTERCEPTOR_H
+
+#include <stdlib.h>
+
+#include "celix_properties.h"
+
+#define PUBSUB_INTERCEPTOR_SERVICE_NAME     "pubsub.interceptor"
+#define PUBSUB_INTERCEPTOR_SERVICE_VERSION  "1.0.0"
+
+typedef struct pubsub_interceptor_properties {
+    const char *scope;
+    const char *topic;
+} pubsub_interceptor_properties_t;
+
+struct pubsub_interceptor {
+    void *handle;
+
+    bool (*preSend)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+    void (*postSend)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+    bool (*preReceive)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+    void (*postReceive)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+};
+
+typedef struct pubsub_interceptor pubsub_interceptor_t;
+
+#endif //__PUBSUB_INTERCEPTOR_H
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
new file mode 100644
index 0000000..60461f8
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PUBSUB_INTERCEPTORS_HANDLER_H
+#define PUBSUB_INTERCEPTORS_HANDLER_H
+
+#include "celix_errno.h"
+#include "celix_array_list.h"
+#include "pubsub_interceptor.h"
+#include "celix_properties.h"
+
+typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t;
+
+celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler);
+celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
+
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+
+#endif //PUBSUB_INTERCEPTORS_HANDLER_H
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
new file mode 100644
index 0000000..afd17e7
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "celix_bundle_context.h"
+#include "celix_constants.h"
+#include "utils.h"
+
+#include "pubsub_interceptors_handler.h"
+
+typedef struct entry {
+    const celix_properties_t *properties;
+    pubsub_interceptor_t *interceptor;
+} entry_t;
+
+struct pubsub_interceptors_handler {
+    pubsub_interceptor_properties_t properties;
+
+    celix_array_list_t *interceptors;
+
+    long interceptorsTrackerId;
+
+    celix_bundle_context_t *ctx;
+
+    celix_thread_mutex_t mutex;
+};
+
+static int referenceCompare(const void *a, const void *b);
+
+static void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props);
+static void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, const celix_properties_t *props);
+
+celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *handler = calloc(1, sizeof(**handler));
+    if (!*handler) {
+        status = CELIX_ENOMEM;
+    } else {
+        (*handler)->ctx = ctx;
+
+        (*handler)->properties.scope = scope;
+        (*handler)->properties.topic = topic;
+
+        (*handler)->interceptors = celix_arrayList_create();
+
+        celixThreadMutex_create(&(*handler)->mutex, NULL);
+
+        // Create service tracker here, and not in the activator
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = *handler;
+        opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor;
+        opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor;
+        (*handler)->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+    return status;
+}
+
+celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) {
+    celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId);
+
+    free(handler->interceptors);
+    free(handler);
+
+    return CELIX_SUCCESS;
+}
+
+void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_interceptors_handler_t *handler = handle;
+    celixThreadMutex_lock(&handler->mutex);
+
+    bool exists = false;
+    for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+        if (entry->interceptor == svc) {
+            exists = true;
+        }
+    }
+    if (!exists) {
+        entry_t *entry = calloc(1, sizeof(*entry));
+        entry->properties = props;
+        entry->interceptor = svc;
+        celix_arrayList_add(handler->interceptors, entry);
+
+        celix_arrayList_sort(handler->interceptors, referenceCompare);
+    }
+
+    celixThreadMutex_unlock(&handler->mutex);
+}
+
+void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attribute__((unused)) const celix_properties_t *props) {
+    pubsub_interceptors_handler_t *handler = handle;
+    for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+        if (entry->interceptor == svc) {
+            arrayList_remove(handler->interceptors, i);
+            break;
+        }
+    }
+}
+
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+    bool cont = true;
+    for (int i = arrayList_size(handler->interceptors) - 1; i >= 0; i--) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+
+        cont = entry->interceptor->preSend(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+        if (!cont) {
+            break;
+        }
+    }
+
+    return cont;
+}
+
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+    for (int i = arrayList_size(handler->interceptors) - 1; i >= 0; i--) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+
+        entry->interceptor->postSend(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+    }
+}
+
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+    bool cont = true;
+    for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+
+        cont = entry->interceptor->preReceive(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+        if (!cont) {
+            break;
+        }
+    }
+
+    return cont;
+}
+
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+    for (int i = 0; i < arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = arrayList_get(handler->interceptors, i);
+
+        entry->interceptor->postReceive(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata);
+    }
+}
+
+int referenceCompare(const void *a, const void *b) {
+    const entry_t *aEntry = a;
+    const entry_t *bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
\ No newline at end of file
diff --git a/libs/utils/include/celix_array_list.h b/libs/utils/include/celix_array_list.h
index aada9f4..ea4fc4a 100644
--- a/libs/utils/include/celix_array_list.h
+++ b/libs/utils/include/celix_array_list.h
@@ -45,6 +45,8 @@ typedef struct celix_array_list celix_array_list_t;
 
 typedef bool (*celix_arrayList_equals_fp)(celix_array_list_entry_t, celix_array_list_entry_t);
 
+typedef int (*celix_arrayList_sort_fp)(const void *, const void *);
+
 
 celix_array_list_t* celix_arrayList_create();
 
@@ -99,6 +101,8 @@ void celix_arrayList_removeDouble(celix_array_list_t *list, double val);
 void celix_arrayList_removeBool(celix_array_list_t *list, bool val);
 void celix_arrayList_removeSize(celix_array_list_t *list, size_t val);
 
+void celix_arrayList_sort(celix_array_list_t *list, celix_arrayList_sort_fp sortFp);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/libs/utils/src/array_list.c b/libs/utils/src/array_list.c
index 808d3b1..665b0af 100644
--- a/libs/utils/src/array_list.c
+++ b/libs/utils/src/array_list.c
@@ -564,3 +564,24 @@ void celix_arrayList_clear(celix_array_list_t *list) {
     }
     list->size = 0;
 }
+
+#if defined(__APPLE__)
+static int celix_arrayList_compare(void *arg, const void * a, const void *b) {
+#elif
+static int celix_arrayList_compare(const void * a, const void *b, void *arg) {
+#endif
+    const celix_array_list_entry_t *aEntry = a;
+    const celix_array_list_entry_t *bEntry = b;
+
+    celix_arrayList_sort_fp sort = arg;
+
+    return sort(aEntry->voidPtrVal, bEntry->voidPtrVal);
+}
+
+void celix_arrayList_sort(celix_array_list_t *list, celix_arrayList_sort_fp sortFp) {
+#if defined(__APPLE__)
+    qsort_r(list->elementData, list->size, sizeof(celix_array_list_entry_t), sortFp, celix_arrayList_compare);
+#elif
+    qsort_r(list->elementData, list->size, sizeof(celix_array_list_entry_t), celix_arrayList_compare, sortFp);
+#endif
+}