You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by ab...@apache.org on 2020/02/24 10:16:37 UTC

[celix] 01/01: Added protocol service API and wire protocol implementation to be used by admin (sender/receiver). Actual used service is matched, similar to serializer.

This is an automated email from the ASF dual-hosted git repository.

abroekhuis pushed a commit to branch feature/pubsubadmin_protocol
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 1a1ffe3112567d008a939089ce30f4eded6e33f0
Author: Alexander Broekhuis <al...@luminis.eu>
AuthorDate: Mon Feb 24 11:16:17 2020 +0100

    Added protocol service API and wire protocol implementation to be used by admin (sender/receiver). Actual used service is matched, similar to serializer.
---
 bundles/pubsub/CMakeLists.txt                      |   1 +
 bundles/pubsub/examples/CMakeLists.txt             |   2 +
 .../publisher/private/src/pubsub_publisher.c       |   5 +-
 .../private/include/pubsub_websocket_private.h     |   2 +-
 .../private/src/pubsub_websocket_example.c         |   4 +-
 .../private/include/pubsub_subscriber_private.h    |   2 +-
 .../subscriber/private/src/pubsub_subscriber.c     |  14 +-
 bundles/pubsub/mock/src/publisher_mock.cc          |   4 +-
 bundles/pubsub/mock/tst/pubsubmock_test.cc         |   5 +-
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c |  18 +-
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h |   8 +-
 .../src/pubsub_tcp_topic_receiver.c                |   2 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |   4 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   |  18 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h   |   8 +-
 .../src/pubsub_udpmc_topic_receiver.c              |   4 +-
 .../src/pubsub_udpmc_topic_sender.c                |   4 +-
 .../src/pubsub_websocket_admin.c                   |  18 +-
 .../src/pubsub_websocket_admin.h                   |   8 +-
 .../src/pubsub_websocket_topic_receiver.c          |   2 +-
 .../src/pubsub_websocket_topic_sender.c            |   4 +-
 bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt     |   1 -
 .../pubsub/pubsub_admin_zmq/src/psa_activator.c    |  16 ++
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 164 ++++++++++--
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h |  11 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_common.c       | 132 ----------
 .../pubsub_admin_zmq/src/pubsub_zmq_common.h       |  59 -----
 .../src/pubsub_zmq_topic_receiver.c                | 191 ++++++++------
 .../src/pubsub_zmq_topic_receiver.h                |   7 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 118 ++++++---
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h |   3 +
 .../pubsub/pubsub_api/include/pubsub/publisher.h   |   4 +-
 .../pubsub/pubsub_api/include/pubsub/subscriber.h  |   4 +-
 .../pubsub_discovery/src/pubsub_discovery_impl.c   |  14 +-
 .../pubsub/pubsub_protocol_wire_v1/CMakeLists.txt  |  36 +++
 .../src/ps_wire_protocol_activator.c               |  64 +++++
 .../src/pubsub_wire_protocol_common.c              |  59 +++++
 .../src/pubsub_wire_protocol_common.h}             |  29 +--
 .../src/pubsub_wire_protocol_impl.c                | 287 +++++++++++++++++++++
 .../src/pubsub_wire_protocol_impl.h                |  43 +++
 bundles/pubsub/pubsub_spi/include/pubsub_admin.h   |   8 +-
 .../pubsub/pubsub_spi/include/pubsub_constants.h   |   5 +-
 .../pubsub/pubsub_spi/include/pubsub_endpoint.h    |   3 +-
 .../pubsub/pubsub_spi/include/pubsub_protocol.h    |  77 ++++++
 bundles/pubsub/pubsub_spi/include/pubsub_utils.h   |  12 +-
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    |  15 +-
 bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c | 103 +++++++-
 .../src/pubsub_topology_manager.c                  |  33 ++-
 .../src/pubsub_topology_manager.h                  |   1 +
 bundles/pubsub/test/test/loopback_activator.c      |   6 +-
 bundles/pubsub/test/test/sut_activator.c           |   4 +-
 bundles/pubsub/test/test/sut_endpoint_activator.c  |   3 +-
 bundles/pubsub/test/test/tst_activator.c           |   4 +-
 bundles/pubsub/test/test/tst_endpoint_activator.c  |   4 +-
 libs/utils/include/celix_properties.h              |   2 +
 libs/utils/src/properties.c                        |  15 ++
 56 files changed, 1227 insertions(+), 447 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 1941e49..ca026fa 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -30,6 +30,7 @@ if (PUBSUB)
     add_subdirectory(pubsub_discovery)
     add_subdirectory(pubsub_serializer_json)
     add_subdirectory(pubsub_serializer_avrobin)
+    add_subdirectory(pubsub_protocol_wire_v1)
     add_subdirectory(pubsub_admin_zmq)
     add_subdirectory(pubsub_admin_tcp)
     add_subdirectory(pubsub_admin_udp_mc)
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index b19e327..74a4b57 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -227,6 +227,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::shell
             Celix::shell_tui
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v1
             Celix::pubsub_discovery_etcd
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_zmq
@@ -246,6 +247,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::shell
             Celix::shell_tui
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v1
             Celix::pubsub_discovery_etcd
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_zmq
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 42b3197..e326f01 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -33,6 +33,7 @@
 #include "celix_threads.h"
 
 #include "poi.h"
+#include "hash_map.h"
 
 #include "pubsub_publisher_private.h"
 
@@ -84,7 +85,9 @@ static void* send_thread(void* arg) {
             }
             place->data[nr_char - 1] = '\0';
             if (publish_svc->send) {
-                if (publish_svc->send(publish_svc->handle, msgId, place) == 0) {
+                celix_properties_t *metadata = celix_properties_create();
+                celix_properties_set(metadata, "Key", "Value");
+                if (publish_svc->send(publish_svc->handle, msgId, place, metadata) == 0) {
                     printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
                            place->position.lat, place->position.lon, place->name, place->description, nr_char);
                 }
diff --git a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
index 938b6f4..5a26a68 100644
--- a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
@@ -91,7 +91,7 @@ void subscriber_start(pubsub_receiver_t* client);
 void subscriber_stop(pubsub_receiver_t* client);
 void subscriber_destroy(pubsub_receiver_t* client);
 
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release);
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, const celix_properties_t *metadata, bool* release);
 
 
 #endif /* PUBSUB_WEBSOCKET_PRIVATE_H_ */
diff --git a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
index 7b820f0..ad8c0d0 100644
--- a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
@@ -87,7 +87,7 @@ static void* send_thread(void* arg) {
                 }
                 place->data[nr_char - 1] = '\0';
                 if (publish_svc->send) {
-                    if (publish_svc->send(publish_svc->handle, msgId, place) == 0) {
+                    if (publish_svc->send(publish_svc->handle, msgId, place, NULL) == 0) {
                         printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
                                place->position.lat, place->position.lon, place->name, place->description, nr_char);
                     }
@@ -193,7 +193,7 @@ void subscriber_destroy(pubsub_receiver_t *subscriber) {
     free(subscriber);
 }
 
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release) {
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, const celix_properties_t *metadata, bool* release) {
     poi_cmd_t *cmd = (poi_cmd_t *) msg;
     pubsub_info_t *pubsub = (pubsub_info_t *) handle;
     printf("Received command %s\n", cmd->command);
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
index 77da5d2..feb1f55 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -45,7 +45,7 @@ void subscriber_start(pubsub_receiver_t* client);
 void subscriber_stop(pubsub_receiver_t* client);
 void subscriber_destroy(pubsub_receiver_t* client);
 
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release);
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, const celix_properties_t *metadata, bool* release);
 
 
 #endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index 9f3ca5e..272ff74 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -26,6 +26,8 @@
 
 #include <stdlib.h>
 #include <stdio.h>
+#include <hash_map.h>
+#include <celix_properties.h>
 
 #include "poi.h"
 #include "pubsub_subscriber_private.h"
@@ -53,9 +55,19 @@ void subscriber_destroy(pubsub_receiver_t *subscriber) {
     free(subscriber);
 }
 
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release) {
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, const celix_properties_t *metadata, bool* release) {
     location_t place = (location_t)msg;
     printf("Recv (%s): [%f, %f] (%s, %s, %s, len data %li)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, (long)(strlen(place->data) + 1));
 
+    if (metadata == NULL || celix_properties_size(metadata) == 0) {
+        printf("No metadata\n");
+    } else {
+        const char *key;
+        CELIX_PROPERTIES_FOR_EACH(metadata, key) {
+            const char *val = celix_properties_get(metadata, key, "!Error!");
+            printf("%s=%s\n", key, val);
+        }
+    }
+
     return 0;
 }
diff --git a/bundles/pubsub/mock/src/publisher_mock.cc b/bundles/pubsub/mock/src/publisher_mock.cc
index 0cf2e1f..8d6f7e5 100644
--- a/bundles/pubsub/mock/src/publisher_mock.cc
+++ b/bundles/pubsub/mock/src/publisher_mock.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include <celix_properties.h>
 #include "pubsub/publisher_mock.h"
 #include "CppUTest/TestHarness.h"
 #include "CppUTestExt/MockSupport.h"
@@ -36,12 +37,13 @@ static int pubsub__publisherMock_localMsgTypeIdForMsgType(void *handle, const ch
 /*============================================================================
   MOCK - mock function for pubsub_publisher->send
   ============================================================================*/
-static int pubsub__publisherMock_send(void *handle, unsigned int msgTypeId, const void *msg) {
+static int pubsub__publisherMock_send(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata) {
     return mock(PUBSUB_PUBLISHERMOCK_SCOPE)
         .actualCall(PUBSUB_PUBLISHERMOCK_SEND_METHOD)
         .withPointerParameter("handle", handle)
         .withParameter("msgTypeId", msgTypeId)
         .withPointerParameter("msg", (void*)msg)
+        .withPointerParameter("metadata", (void*)metadata)
         .returnIntValue();
 }
 
diff --git a/bundles/pubsub/mock/tst/pubsubmock_test.cc b/bundles/pubsub/mock/tst/pubsubmock_test.cc
index 462bb2c..64a8afb 100644
--- a/bundles/pubsub/mock/tst/pubsubmock_test.cc
+++ b/bundles/pubsub/mock/tst/pubsubmock_test.cc
@@ -28,6 +28,7 @@
 
 #include "pubsub/publisher_mock.h"
 
+#include "celix_utils_api.h"
 
 static pubsub_publisher_t mockSrv;
 static void* mockHandle = (void*)0x42;
@@ -69,8 +70,8 @@ TEST(pubsubmock, publishermock) {
     CHECK(msgId != 0);
 
     //set msg
-    void *dummyMsg = (void*)0x43; 
-    srv->send(srv->handle, msgId, dummyMsg); //should satisfy the expectOneCalls
+    void *dummyMsg = (void*)0x43;
+    srv->send(srv->handle, msgId, dummyMsg, NULL); //should satisfy the expectOneCalls
     //srv->send(srv->handle, msgId, dummyMsg); //enabling this should fail the test
 
 }
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index 5c871de..3df67c2 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -298,23 +298,23 @@ void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
     }
 }
 
-celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
     pubsub_tcp_admin_t *psa = handle;
     L_DEBUG("[PSA_TCP] pubsub_tcpAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_TCP_ADMIN_TYPE,
-                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, false, topicProperties, outSerializerSvcId, outProtocolSvcId);
     *outScore = score;
 
     return status;
 }
 
-celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
     pubsub_tcp_admin_t *psa = handle;
     L_DEBUG("[PSA_TCP] pubsub_tcpAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_TCP_ADMIN_TYPE,
-            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, false, topicProperties, outSerializerSvcId, outProtocolSvcId);
     if (outScore != NULL) {
         *outScore = score;
     }
@@ -325,14 +325,14 @@ celix_status_t pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix
     pubsub_tcp_admin_t *psa = handle;
     L_DEBUG("[PSA_TCP] pubsub_tcpAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
-    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_TCP_ADMIN_TYPE, NULL);
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_TCP_ADMIN_TYPE, false, NULL, NULL);
     if (outMatch != NULL) {
         *outMatch = match;
     }
     return status;
 }
 
-celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_tcp_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
@@ -368,7 +368,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
         if (sender != NULL) {
             const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
             const char *serType = serEntry->serType;
-            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
             celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
 
             //if configured use a static discover url
@@ -432,7 +432,7 @@ celix_status_t pubsub_tcpAdmin_teardownTopicSender(void *handle, const char *sco
     return status;
 }
 
-celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_tcp_admin_t *psa = handle;
 
     celix_properties_t *newEndpoint = NULL;
@@ -453,7 +453,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
             const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
             const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h
index 7ed1702..87a1fe7 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h
@@ -30,14 +30,14 @@ typedef struct pubsub_tcp_admin pubsub_tcp_admin_t;
 pubsub_tcp_admin_t* pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
 void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa);
 
-celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
+celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
 celix_status_t pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
 
-celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
 celix_status_t pubsub_tcpAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
 celix_status_t pubsub_tcpAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
 celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 23af6c3..8e61291 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -498,7 +498,7 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
             }
             if (status == CELIX_SUCCESS) {
                 bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
+                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
                 if (release) {
                     msgSer->freeMsg(msgSer->handle, deserializedMsg);
                 }
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 6795af1..fe4b8fa 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -112,7 +112,7 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
 static unsigned int rand_range(unsigned int min, unsigned int max);
 static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender);
 static void *psa_tcp_sendThread(void *data);
-static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg);
+static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
 
 pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         celix_bundle_context_t *ctx,
@@ -477,7 +477,7 @@ pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_se
     return result;
 }
 
-static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) {
+static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
     int status = CELIX_SUCCESS;
     psa_tcp_bounded_service_entry_t *bound = handle;
     pubsub_tcp_topic_sender_t *sender = bound->parent;
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 7dfd00f..87e4e63 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -254,23 +254,23 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
     free(psa);
 }
 
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
     pubsub_udpmc_admin_t *psa = handle;
     L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
-                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, false, topicProperties, outSerializerSvcId, outProtocolSvcId);
     *outScore = score;
 
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
     pubsub_udpmc_admin_t *psa = handle;
     L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
-            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, false, topicProperties, outSerializerSvcId, outProtocolSvcId);
     if (outScore != NULL) {
         *outScore = score;
     }
@@ -281,14 +281,14 @@ celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_propert
     pubsub_udpmc_admin_t *psa = handle;
     L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
-    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, NULL);
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, false, NULL, NULL);
     if (outMatch != NULL) {
         *outMatch = match;
     }
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_udpmc_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
@@ -311,7 +311,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
         if (sender != NULL) {
             const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
             const char *serType = serEntry->serType;
-            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
             celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
             celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
             //if available also set container name
@@ -369,7 +369,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_udpmc_admin_t *psa = handle;
 
     celix_properties_t *newEndpoint = NULL;
@@ -387,7 +387,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
             const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
             const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 6a42c5e..ac28c63 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -29,14 +29,14 @@ typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
 pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
 void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
 
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
 celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
 
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
 celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
 celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
 void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index 0cf9b7e..faf40eb 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -32,6 +32,8 @@
 #include "large_udp.h"
 #include "pubsub_udpmc_common.h"
 
+#include "hash_map.h"
+
 #define MAX_EPOLL_EVENTS        10
 #define RECV_THREAD_TIMEOUT     5
 #define UDP_BUFFER_SIZE         65535
@@ -447,7 +449,7 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
                 if (status == CELIX_SUCCESS) {
                     bool release = true;
                     pubsub_subscriber_t *svc = entry->svc;
-                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &release);
+                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
 
                     if (release) {
                         msgSer->freeMsg(msgSer->handle, msgInst);
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 97c62eb..876700e 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -81,7 +81,7 @@ typedef struct pubsub_msg {
 static int psa_udpmc_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
-static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
+static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata);
 static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_udp_msg_t* msg);
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
@@ -267,7 +267,7 @@ static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 }
 
-static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
     psa_udpmc_bounded_service_entry_t *entry = handle;
     int status = 0;
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
index 6fa32b7..5d61e82 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -231,25 +231,25 @@ void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const ce
     }
 }
 
-celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
     pubsub_websocket_admin_t *psa = handle;
     L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE,
                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore,
-                                               topicProperties, outSerializerSvcId);
+                                               false, topicProperties, outSerializerSvcId, outProtocolSvcId);
     *outScore = score;
 
     return status;
 }
 
-celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
     pubsub_websocket_admin_t *psa = handle;
     L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE,
                                                 psa->qosSampleScore, psa->qosControlScore, psa->defaultScore,
-                                                topicProperties, outSerializerSvcId);
+                                                false, topicProperties, outSerializerSvcId, outProtocolSvcId);
     if (outScore != NULL) {
         *outScore = score;
     }
@@ -260,14 +260,14 @@ celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const
     pubsub_websocket_admin_t *psa = handle;
     L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
-    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_WEBSOCKET_ADMIN_TYPE, NULL);
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_WEBSOCKET_ADMIN_TYPE, false, NULL, NULL);
     if (outMatch != NULL) {
         *outMatch = match;
     }
     return status;
 }
 
-celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_websocket_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
@@ -292,7 +292,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *
             const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
             const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
-                                                serType, NULL);
+                                                serType, NULL, NULL);
 
             //Set endpoint visibility to local because the http server handles discovery
             celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
@@ -345,7 +345,7 @@ celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const cha
     return status;
 }
 
-celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_websocket_admin_t *psa = handle;
 
     celix_properties_t *newEndpoint = NULL;
@@ -365,7 +365,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
             const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
             const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
 
             //Set endpoint visibility to local because the http server handles discovery
             celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
index c718a3b..36495dc 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
@@ -30,14 +30,14 @@ typedef struct pubsub_websocket_admin pubsub_websocket_admin_t;
 pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
 void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa);
 
-celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
 celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
 
-celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
 celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
 celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
 celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index b6dfa71..c45ce57 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -477,7 +477,7 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
 
             if (status == CELIX_SUCCESS) {
                 bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
+                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
                 if (release) {
                     msgSer->freeMsg(msgSer->handle, deserializedMsg);
                 }
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
index b533567..af10a1b 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
@@ -92,7 +92,7 @@ static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_
 static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t *sender);
 
-static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
 
 static void psa_websocketTopicSender_ready(struct mg_connection *connection, void *handle);
 static void psa_websocketTopicSender_close(const struct mg_connection *connection, void *handle);
@@ -295,7 +295,7 @@ static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 }
 
-static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
     int status = CELIX_SERVICE_EXCEPTION;
     psa_websocket_bounded_service_entry_t *bound = handle;
     pubsub_websocket_topic_sender_t *sender = bound->parent;
diff --git a/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 3aeba26..ecd94d6 100644
--- a/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -42,7 +42,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             src/pubsub_zmq_admin.c
             src/pubsub_zmq_topic_sender.c
             src/pubsub_zmq_topic_receiver.c
-            src/pubsub_zmq_common.c
             ${ZMQ_CRYPTO_C}
     )
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index 3e66f42..a70763c 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -21,6 +21,7 @@
 
 #include "celix_api.h"
 #include "pubsub_serializer.h"
+#include "pubsub_protocol.h"
 #include "log_helper.h"
 
 #include "pubsub_admin.h"
@@ -35,6 +36,8 @@ typedef struct psa_zmq_activator {
 
     long serializersTrackerId;
 
+    long protocolsTrackerId;
+
     pubsub_admin_service_t adminService;
     long adminSvcId;
 
@@ -49,6 +52,7 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
     act->adminSvcId = -1L;
     act->cmdSvcId = -1L;
     act->serializersTrackerId = -1L;
+    act->protocolsTrackerId = -1L;
 
     logHelper_create(ctx, &act->logHelper);
     logHelper_start(act->logHelper);
@@ -67,6 +71,17 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
         act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
     }
 
+    //track protocols
+    if (status == CELIX_SUCCESS) {
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.serviceName = PUBSUB_PROTOCOL_SERVICE_NAME;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = act->admin;
+        opts.addWithProperties = pubsub_zmqAdmin_addProtocolSvc;
+        opts.removeWithProperties = pubsub_zmqAdmin_removeProtocolSvc;
+        act->protocolsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
     //register pubsub admin service
     if (status == CELIX_SUCCESS) {
         pubsub_admin_service_t *psaSvc = &act->adminService;
@@ -116,6 +131,7 @@ int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
     celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
     celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
     celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+    celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
     pubsub_zmqAdmin_destroy(act->admin);
 
     logHelper_stop(act->logHelper);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index e7241e6..b251cae 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -26,6 +26,7 @@
 #include <pubsub_endpoint.h>
 #include <czmq.h>
 #include <pubsub_serializer.h>
+#include <pubsub_protocol.h>
 #include <ip_utils.h>
 
 #include "pubsub_utils.h"
@@ -67,6 +68,11 @@ struct pubsub_zmq_admin {
 
     struct {
         celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = svcId, value = psa_zmq_protocol_entry_t*
+    } protocols;
+
+    struct {
+        celix_thread_mutex_t mutex;
         hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t*
     } topicSenders;
 
@@ -88,6 +94,12 @@ typedef struct psa_zmq_serializer_entry {
     pubsub_serializer_service_t *svc;
 } psa_zmq_serializer_entry_t;
 
+typedef struct psa_zmq_protocol_entry {
+    const char *protType;
+    long svcId;
+    pubsub_protocol_service_t *svc;
+} psa_zmq_protocol_entry_t;
+
 static celix_status_t zmq_getIpAddress(const char* interface, char** ip);
 static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
 static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
@@ -177,6 +189,9 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_help
     celixThreadMutex_create(&psa->serializers.mutex, NULL);
     psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
 
+    celixThreadMutex_create(&psa->protocols.mutex, NULL);
+    psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
     psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
@@ -228,6 +243,14 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->serializers.mutex);
 
+    celixThreadMutex_lock(&psa->protocols.mutex);
+    iter = hashMapIterator_construct(psa->protocols.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_zmq_protocol_entry_t *entry = hashMapIterator_nextValue(&iter);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&psa->protocols.mutex);
+
     celixThreadMutex_destroy(&psa->topicSenders.mutex);
     hashMap_destroy(psa->topicSenders.map, true, false);
 
@@ -240,6 +263,9 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
     celixThreadMutex_destroy(&psa->serializers.mutex);
     hashMap_destroy(psa->serializers.map, false, false);
 
+    celixThreadMutex_destroy(&psa->protocols.mutex);
+    hashMap_destroy(psa->protocols.map, false, false);
+
     if (psa->zmq_auth != NULL) {
         zactor_destroy(&psa->zmq_auth);
     }
@@ -319,23 +345,93 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
     }
 }
 
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_zmq_admin_t *psa = handle;
+
+    const char *protType = celix_properties_get(props, PUBSUB_PROTOCOL_TYPE_KEY, NULL);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    if (protType == NULL) {
+        L_INFO("[PSA_ZMQ] Ignoring protocol service without %s property", PUBSUB_PROTOCOL_TYPE_KEY);
+        return;
+    }
+
+    celixThreadMutex_lock(&psa->protocols.mutex);
+    psa_zmq_protocol_entry_t *entry = hashMap_get(psa->protocols.map, (void*)svcId);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->protType = protType;
+        entry->svcId = svcId;
+        entry->svc = svc;
+        hashMap_put(psa->protocols.map, (void*)svcId, entry);
+    }
+    celixThreadMutex_unlock(&psa->protocols.mutex);
+}
+
+void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_zmq_admin_t *psa = handle;
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    //remove protocol
+    // 1) First find entry and
+    // 2) loop and destroy all topic sender using the protocol and
+    // 3) loop and destroy all topic receivers using the protocol
+    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+    celixThreadMutex_lock(&psa->protocols.mutex);
+    psa_zmq_protocol_entry_t *entry = hashMap_remove(psa->protocols.map, (void*)svcId);
+    celixThreadMutex_unlock(&psa->protocols.mutex);
+
+    if (entry != NULL) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_zmq_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+            if (sender != NULL && entry->svcId == pubsub_zmqTopicSender_protocolSvcId(sender)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_zmqTopicSender_destroy(sender);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+            if (receiver != NULL && entry->svcId == pubsub_zmqTopicReceiver_protocolSvcId(receiver)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_zmqTopicReceiver_destroy(receiver);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+        free(entry);
+    }
+}
+
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_ZMQ_ADMIN_TYPE,
-                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, false, outSerializerSvcId, outProtocolSvcId);
     *outScore = score;
 
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_ZMQ_ADMIN_TYPE,
-            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, false, outSerializerSvcId, outProtocolSvcId);
     if (outScore != NULL) {
         *outScore = score;
     }
@@ -346,14 +442,14 @@ celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
-    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_ZMQ_ADMIN_TYPE, NULL);
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_ZMQ_ADMIN_TYPE, true, NULL, NULL);
     if (outMatch != NULL) {
         *outMatch = match;
     }
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_zmq_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
@@ -369,19 +465,22 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
     celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     pubsub_zmq_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
     if (sender == NULL) {
         psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
-        if (serEntry != NULL) {
+        psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
+        if (serEntry != NULL && protEntry != NULL) {
             sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
-                    psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
+                    protocolSvcId, protEntry->svc, psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
         }
         if (sender != NULL) {
             const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
             const char *serType = serEntry->serType;
+            const char *protType = protEntry->protType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
-                                                serType, NULL);
+                                                serType, protType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
 
             //if configured use a static discover url
@@ -414,6 +513,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
         L_ERROR("[PSA_ZMQ] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->protocols.mutex);
     celixThreadMutex_unlock(&psa->serializers.mutex);
 
     if (sender != NULL && newEndpoint != NULL) {
@@ -452,27 +552,30 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_zmq_admin_t *psa = handle;
 
     celix_properties_t *newEndpoint = NULL;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     pubsub_zmq_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
     if (receiver == NULL) {
         psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
-        if (serEntry != NULL) {
-            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc);
+        psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
+        if (serEntry != NULL && protEntry != NULL) {
+            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc, protocolSvcId, protEntry->svc);
         } else {
-            L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
+            L_ERROR("[PSA_ZMQ] Cannot find serializer or protocol for TopicSender %s/%s", scope, topic);
         }
         if (receiver != NULL) {
             const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
             const char *serType = serEntry->serType;
+            const char *protType = protEntry->protType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {
@@ -488,6 +591,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
         L_ERROR("[PSA_ZMQ] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->protocols.mutex);
     celixThreadMutex_unlock(&psa->serializers.mutex);
 
     if (receiver != NULL && newEndpoint != NULL) {
@@ -551,16 +655,24 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
         if (serializerEntry != NULL) {
             serializer = serializerEntry->serType;
         }
+        const char *protocol = NULL;
+        long protocolSvcId = pubsub_zmqTopicReceiver_protocolSvcId(receiver);
+        psa_zmq_protocol_entry_t *protocolEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
+        if (protocolEntry != NULL) {
+            protocol = protocolEntry->protType;
+        }
 
         const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
         const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
         const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+        const char *eProtocol = celix_properties_get(endpoint, PUBSUB_ENDPOINT_PROTOCOL, NULL);
 
-        if (scope != NULL && topic != NULL && serializer != NULL
-                        && eScope != NULL && eTopic != NULL && eSerializer != NULL
+        if (scope != NULL && topic != NULL && serializer != NULL && protocol != NULL
+                        && eScope != NULL && eTopic != NULL && eSerializer != NULL && eProtocol != NULL
                         && strncmp(eScope, scope, 1024*1024) == 0
                         && strncmp(eTopic, topic, 1024*1024) == 0
-                        && strncmp(eSerializer, serializer, 1024*1024) == 0) {
+                        && strncmp(eSerializer, serializer, 1024*1024) == 0
+                        && strncmp(eProtocol, protocol, 1024*1024) == 0) {
             pubsub_zmqTopicReceiver_connectTo(receiver, url);
         }
     }
@@ -641,34 +753,50 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attr
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
     celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_zmq_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+
         long serSvcId = pubsub_zmqTopicSender_serializerSvcId(sender);
         psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+
+        long protSvcId = pubsub_zmqTopicSender_protocolSvcId(sender);
+        psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protSvcId);
+
         const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *protType = protEntry == NULL ? "!Error!" : protEntry->protType;
         const char *scope = pubsub_zmqTopicSender_scope(sender);
         const char *topic = pubsub_zmqTopicSender_topic(sender);
         const char *url = pubsub_zmqTopicSender_url(sender);
         const char *postUrl = pubsub_zmqTopicSender_isStatic(sender) ? " (static)" : "";
         fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
         fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- protocol type = %s\n", protType);
         fprintf(out, "   |- url            = %s%s\n", url, postUrl);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->protocols.mutex);
     celixThreadMutex_unlock(&psa->serializers.mutex);
 
     fprintf(out, "\n");
     fprintf(out, "\nTopic Receivers:\n");
     celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->protocols.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
         long serSvcId = pubsub_zmqTopicReceiver_serializerSvcId(receiver);
         psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+
+        long protSvcId = pubsub_zmqTopicReceiver_protocolSvcId(receiver);
+        psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protSvcId);
+
+
         const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *protType = protEntry == NULL ? "!Error!" : protEntry->protType;
         const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
         const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
 
@@ -678,6 +806,7 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attr
 
         fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
         fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- protocol type = %s\n", protType);
         for (int i = 0; i < celix_arrayList_size(connected); ++i) {
             char *url = celix_arrayList_get(connected, i);
             fprintf(out, "   |- connected url   = %s\n", url);
@@ -692,6 +821,7 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attr
         celix_arrayList_destroy(unconnected);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->protocols.mutex);
     celixThreadMutex_unlock(&psa->serializers.mutex);
     fprintf(out, "\n");
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index 8d8c7c0..dd28eca 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -30,14 +30,14 @@ typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
 pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
 void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
 
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *ProtocolSvcId);
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *ProtocolSvcId);
 celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
 
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
 celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
 celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
 celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
@@ -46,6 +46,9 @@ celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celi
 void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
 void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
 
+void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
+
 bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
 
 pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
deleted file mode 100644
index 6e7c4d8..0000000
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <memory.h>
-#include <assert.h>
-#include "pubsub_zmq_common.h"
-
-bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t *hdr) {
-    bool check=false;
-    int major=0,minor=0;
-
-    if (hdr->major == 0 && hdr->minor == 0) {
-        //no check
-        return true;
-    }
-
-    if (msgVersion!=NULL) {
-        version_getMajor(msgVersion,&major);
-        version_getMinor(msgVersion,&minor);
-        if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
-            check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
-        }
-    }
-
-    return check;
-}
-
-void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter) {
-    for (int i = 0; i < 5; ++i) {
-        filter[i] = '\0';
-    }
-    if (scope != NULL && strnlen(scope, 3) >= 2)  {
-        filter[0] = scope[0];
-        filter[1] = scope[1];
-    }
-    if (topic != NULL && strnlen(topic, 3) >= 2)  {
-        filter[2] = topic[0];
-        filter[3] = topic[1];
-    }
-}
-
-static int readInt(const unsigned char *data, int offset, uint32_t *val) {
-    *val = ((data[offset+0] << 24) | (data[offset+1] << 16) | (data[offset+2] << 8) | (data[offset+3] << 0));
-    return offset + 4;
-}
-
-static int readLong(const unsigned char *data, int offset, uint64_t *val) {
-    *val = (
-            ((int64_t)data[offset+0] << 56) |
-            ((int64_t)data[offset+1] << 48) |
-            ((int64_t)data[offset+2] << 40) |
-            ((int64_t)data[offset+3] << 32) |
-            ((int64_t)data[offset+4] << 24) |
-            ((int64_t)data[offset+5] << 16) |
-            ((int64_t)data[offset+6] << 8 ) |
-            ((int64_t)data[offset+7] << 0 )
-    );
-    return offset + 8;
-}
-
-celix_status_t psa_zmq_decodeHeader(const unsigned char *data, size_t dataLen, pubsub_zmq_msg_header_t *header) {
-    int status = CELIX_ILLEGAL_ARGUMENT;
-    if (dataLen == sizeof(pubsub_zmq_msg_header_t)) {
-        int index = 0;
-        index = readInt(data, index, &header->type);
-        header->major = (unsigned char) data[index++];
-        header->minor = (unsigned char) data[index++];
-
-        index = readInt(data, index, &header->seqNr);
-        for (int i = 0; i < 16; ++i) {
-            header->originUUID[i] = data[index+i];
-        }
-        index += 16;
-        index = readLong(data, index, &header->sendtimeSeconds);
-        readLong(data, index, &header->sendTimeNanoseconds);
-
-        status = CELIX_SUCCESS;
-    }
-    return status;
-}
-
-
-static int writeInt(unsigned char *data, int offset, int32_t val) {
-    data[offset+0] = (unsigned char)((val >> 24) & 0xFF);
-    data[offset+1] = (unsigned char)((val >> 16) & 0xFF);
-    data[offset+2] = (unsigned char)((val >> 8 ) & 0xFF);
-    data[offset+3] = (unsigned char)((val >> 0 ) & 0xFF);
-    return offset + 4;
-}
-
-static int writeLong(unsigned char *data, int offset, int64_t val) {
-    data[offset+0] = (unsigned char)((val >> 56) & 0xFF);
-    data[offset+1] = (unsigned char)((val >> 48) & 0xFF);
-    data[offset+2] = (unsigned char)((val >> 40) & 0xFF);
-    data[offset+3] = (unsigned char)((val >> 32) & 0xFF);
-    data[offset+4] = (unsigned char)((val >> 24) & 0xFF);
-    data[offset+5] = (unsigned char)((val >> 16) & 0xFF);
-    data[offset+6] = (unsigned char)((val >> 8 ) & 0xFF);
-    data[offset+7] = (unsigned char)((val >> 0 ) & 0xFF);
-    return offset + 8;
-}
-
-void psa_zmq_encodeHeader(const pubsub_zmq_msg_header_t *msgHeader, unsigned char *data, size_t dataLen) {
-    assert(dataLen == sizeof(*msgHeader));
-    int index = 0;
-    index = writeInt(data, index, msgHeader->type);
-    data[index++] = (unsigned char)msgHeader->major;
-    data[index++] = (unsigned char)msgHeader->minor;
-    index = writeInt(data, index, msgHeader->seqNr);
-    for (int i = 0; i < 16; ++i) {
-        data[index+i] = msgHeader->originUUID[i];
-    }
-    index += 16;
-    index = writeLong(data, index, msgHeader->sendtimeSeconds);
-    writeLong(data, index, msgHeader->sendTimeNanoseconds);
-}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
deleted file mode 100644
index 044520c..0000000
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef CELIX_PUBSUB_ZMQ_COMMON_H
-#define CELIX_PUBSUB_ZMQ_COMMON_H
-
-#include <utils.h>
-#include <stdint.h>
-
-#include "version.h"
-
-
-/*
- * NOTE zmq is used by first sending three frames:
- * 1) A subscription filter.
- * This is a 5 char string of the first two chars of scope and topic combined and terminated with a '\0'.
- *
- * 2) The pubsub_zmq_msg_header_t is send containg the type id and major/minor version
- *
- * 3) The actual payload
- */
-
-
-struct pubsub_zmq_msg_header {
-    uint32_t type; //msg type id (hash of fqn)
-    uint8_t major;
-    uint8_t minor;
-    uint32_t seqNr;
-    unsigned char originUUID[16];
-    uint64_t sendtimeSeconds; //seconds since epoch
-    uint64_t sendTimeNanoseconds; //ns since epoch
-};
-
-typedef struct pubsub_zmq_msg_header pubsub_zmq_msg_header_t;
-
-
-void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter);
-
-bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t *hdr);
-
-celix_status_t psa_zmq_decodeHeader(const unsigned char *data, size_t dataLen, pubsub_zmq_msg_header_t *header);
-void psa_zmq_encodeHeader(const pubsub_zmq_msg_header_t *msgHeader, unsigned char *data, size_t dataLen);
-#endif //CELIX_PUBSUB_ZMQ_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index d636867..2f80d31 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -18,6 +18,7 @@
  */
 
 #include <pubsub_serializer.h>
+#include <pubsub_protocol.h>
 #include <stdlib.h>
 #include <pubsub/subscriber.h>
 #include <memory.h>
@@ -30,11 +31,12 @@
 #include <log_helper.h>
 #include "pubsub_zmq_topic_receiver.h"
 #include "pubsub_psa_zmq_constants.h"
-#include "pubsub_zmq_common.h"
 
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 
+#include "celix_utils_api.h"
+
 #define PSA_ZMQ_RECV_TIMEOUT 1000
 
 #ifndef UUID_STR_LEN
@@ -56,9 +58,10 @@ struct pubsub_zmq_topic_receiver {
     log_helper_t *logHelper;
     long serializerSvcId;
     pubsub_serializer_service_t *serializer;
+    long protocolSvcId;
+    pubsub_protocol_service_t *protocol;
     char *scope;
     char *topic;
-    char scopeAndTopicFilter[5];
     bool metricsEnabled;
 
     void *zmqCtx;
@@ -122,7 +125,7 @@ static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t
 static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
 static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
 static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
-
+static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
 
 
 pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -131,15 +134,18 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
                                                               const char *topic,
                                                               const celix_properties_t *topicProperties,
                                                               long serializerSvcId,
-                                                              pubsub_serializer_service_t *serializer) {
+                                                              pubsub_serializer_service_t *serializer,
+                                                              long protocolSvcId,
+                                                              pubsub_protocol_service_t *protocol) {
     pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
     receiver->serializerSvcId = serializerSvcId;
     receiver->serializer = serializer;
+    receiver->protocolSvcId = protocolSvcId;
+    receiver->protocol = protocol;
     receiver->scope = strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
-    psa_zmq_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
 
@@ -329,6 +335,10 @@ long pubsub_zmqTopicReceiver_serializerSvcId(pubsub_zmq_topic_receiver_t *receiv
     return receiver->serializerSvcId;
 }
 
+long pubsub_zmqTopicReceiver_protocolSvcId(pubsub_zmq_topic_receiver_t *receiver) {
+    return receiver->protocolSvcId;
+}
+
 void pubsub_zmqTopicReceiver_listConnections(pubsub_zmq_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) {
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
@@ -456,9 +466,9 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, const pubsub_zmq_msg_header_t *hdr, const byte* payload, size_t payloadSize, struct timespec *receiveTime) {
+static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
     //NOTE receiver->subscribers.mutex locked
-    pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type));
+    pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(message->header.msgId));
     pubsub_subscriber_t *svc = entry->svc;
     bool monitor = receiver->metricsEnabled;
 
@@ -470,18 +480,18 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
 
     if (msgSer!= NULL) {
         void *deserializedMsg = NULL;
-        bool validVersion = psa_zmq_checkVersion(msgSer->msgVersion, hdr);
+        bool validVersion = psa_zmq_checkVersion(msgSer->msgVersion, message->header.msgMajorVersion, message->header.msgMinorVersion);
         if (validVersion) {
             if (monitor) {
                 clock_gettime(CLOCK_REALTIME, &beginSer);
             }
-            celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg);
+            celix_status_t status = msgSer->deserialize(msgSer->handle, message->payload.payload, message->payload.length, &deserializedMsg);
             if (monitor) {
                 clock_gettime(CLOCK_REALTIME, &endSer);
             }
             if (status == CELIX_SUCCESS) {
                 bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
+                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, message->metadata.metadata, &release);
                 if (release) {
                     msgSer->freeMsg(msgSer->handle, deserializedMsg);
                 }
@@ -492,68 +502,69 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
             }
         }
     } else {
-        L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", hdr->type);
+        L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId);
     }
 
     if (msgSer != NULL && monitor) {
-        hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )hdr->type);
-        char uuidStr[UUID_STR_LEN+1];
-        uuid_unparse(hdr->originUUID, uuidStr);
-        psa_zmq_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
-
-        if (metrics == NULL) {
-            metrics = calloc(1, sizeof(*metrics));
-            hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
-            uuid_copy(metrics->origin, hdr->originUUID);
-            metrics->msgTypeId = hdr->type;
-            metrics->maxDelayInSeconds = -INFINITY;
-            metrics->minDelayInSeconds = INFINITY;
-            metrics->lastSeqNr = 0;
-        }
-
-        double diff = celix_difftime(&beginSer, &endSer);
-        long n = metrics->nrOfMessagesReceived;
-        metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
-
-        diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
-        n = metrics->nrOfMessagesReceived;
-        if (metrics->nrOfMessagesReceived >= 1) {
-            metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
-        }
-        metrics->lastMessageReceived = *receiveTime;
-
-
-        int incr = hdr->seqNr - metrics->lastSeqNr;
-        if (metrics->lastSeqNr >0 && incr > 1) {
-            metrics->nrOfMissingSeqNumbers += (incr - 1);
-            L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
-        }
-        metrics->lastSeqNr = hdr->seqNr;
-
-        struct timespec sendTime;
-        sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
-        sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
-        diff = celix_difftime(&sendTime, receiveTime);
-        metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1);
-        if (diff < metrics->minDelayInSeconds) {
-            metrics->minDelayInSeconds = diff;
-        }
-        if (diff > metrics->maxDelayInSeconds) {
-            metrics->maxDelayInSeconds = diff;
-        }
-
-        metrics->nrOfMessagesReceived += updateReceiveCount;
-        metrics->nrOfSerializationErrors += updateSerError;
+        // TODO disabled for now, should move to an interceptor?
+//        hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )message->header.msgId);
+//        char uuidStr[UUID_STR_LEN+1];
+//        uuid_unparse(hdr->originUUID, uuidStr);
+//        psa_zmq_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
+//
+//        if (metrics == NULL) {
+//            metrics = calloc(1, sizeof(*metrics));
+//            hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
+//            uuid_copy(metrics->origin, hdr->originUUID);
+//            metrics->msgTypeId = hdr->type;
+//            metrics->maxDelayInSeconds = -INFINITY;
+//            metrics->minDelayInSeconds = INFINITY;
+//            metrics->lastSeqNr = 0;
+//        }
+//
+//        double diff = celix_difftime(&beginSer, &endSer);
+//        long n = metrics->nrOfMessagesReceived;
+//        metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
+//
+//        diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
+//        n = metrics->nrOfMessagesReceived;
+//        if (metrics->nrOfMessagesReceived >= 1) {
+//            metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
+//        }
+//        metrics->lastMessageReceived = *receiveTime;
+//
+//
+//        int incr = hdr->seqNr - metrics->lastSeqNr;
+//        if (metrics->lastSeqNr >0 && incr > 1) {
+//            metrics->nrOfMissingSeqNumbers += (incr - 1);
+//            L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
+//        }
+//        metrics->lastSeqNr = hdr->seqNr;
+//
+//        struct timespec sendTime;
+//        sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
+//        sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
+//        diff = celix_difftime(&sendTime, receiveTime);
+//        metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1);
+//        if (diff < metrics->minDelayInSeconds) {
+//            metrics->minDelayInSeconds = diff;
+//        }
+//        if (diff > metrics->maxDelayInSeconds) {
+//            metrics->maxDelayInSeconds = diff;
+//        }
+//
+//        metrics->nrOfMessagesReceived += updateReceiveCount;
+//        metrics->nrOfSerializationErrors += updateSerError;
     }
 }
 
-static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, const pubsub_zmq_msg_header_t *hdr, const byte *payload, size_t payloadSize, struct timespec *receiveTime) {
+static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
         if (entry != NULL) {
-            processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize, receiveTime);
+            processMsgForSubscriberEntry(receiver, entry, message, receiveTime);
         }
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -574,7 +585,6 @@ static void* psa_zmq_recvThread(void * data) {
     bool allInitialized = receiver->subscribers.allInitialized;
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
-
     while (running) {
         if (!allConnected) {
             psa_zmq_connectToAllRequestedConnections(receiver);
@@ -585,24 +595,32 @@ static void* psa_zmq_recvThread(void * data) {
 
         zmsg_t *zmsg = zmsg_recv(receiver->zmqSock);
         if (zmsg != NULL) {
-            if (zmsg_size(zmsg) != 3) {
-                L_WARN("[PSA_ZMQ_TR] Always expecting 3 frames per zmsg (filter + header + payload), got %i frames", (int)zmsg_size(zmsg));
+            if (zmsg_size(zmsg) < 2) {
+                L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata)), got %i frames", (int)zmsg_size(zmsg));
             } else {
-                zframe_t *filter = zmsg_pop(zmsg); //char[5] filter
-                zframe_t *header = zmsg_pop(zmsg); //pubsub_zmq_msg_header_t
-                zframe_t *payload = zmsg_pop(zmsg); //serialized payload
-                if (filter != NULL && strncmp(receiver->scopeAndTopicFilter, (char*)zframe_data(filter), zframe_size(filter)) != 0 ) {
-                    L_ERROR("[PSA_ZMQ_TR] Invalid ZQM filter, Found '%4s'. Expected %s\n", (char*)zframe_data(filter), receiver->scopeAndTopicFilter);
-                } else if (header != NULL && payload != NULL) {
+                zframe_t *header = zmsg_pop(zmsg); // header
+                zframe_t *payload = NULL;
+                zframe_t *metadata = NULL;
+
+                pubsub_protocol_message_t message;
+                receiver->protocol->decodeHeader(receiver->protocol->handle, zframe_data(header), zframe_size(header), &message);
+                if (message.header.payloadSize > 0) {
+                    payload = zmsg_pop(zmsg);
+                    receiver->protocol->decodePayload(receiver->protocol->handle, zframe_data(payload), zframe_size(payload), &message);
+                }
+                if (message.header.metadataSize > 0) {
+                    metadata = zmsg_pop(zmsg);
+                    receiver->protocol->decodeMetadata(receiver->protocol->handle, zframe_data(metadata), zframe_size(metadata), &message);
+                }
+                if (header != NULL && payload != NULL) {
                     struct timespec receiveTime;
                     clock_gettime(CLOCK_REALTIME, &receiveTime);
-                    pubsub_zmq_msg_header_t msgHeader;
-                    psa_zmq_decodeHeader(zframe_data(header), zframe_size(header), &msgHeader);
-                    processMsg(receiver, &msgHeader, zframe_data(payload), zframe_size(payload), &receiveTime);
+                    processMsg(receiver, &message, &receiveTime);
                 }
-                zframe_destroy(&filter);
+                celix_properties_destroy(message.metadata.metadata);
                 zframe_destroy(&header);
                 zframe_destroy(&payload);
+                zframe_destroy(&metadata);
             }
             zmsg_destroy(&zmsg);
         } else {
@@ -800,10 +818,33 @@ static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const
     zcert_apply (sub_cert, zmq_s);
     zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
 #endif
-    zsock_set_subscribe(receiver->zmqSock, receiver->scopeAndTopicFilter);
+    char sync[5];
+    receiver->protocol->getSyncHeader(receiver->protocol->handle, sync);
+    zsock_set_subscribe(receiver->zmqSock, sync);
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
     ts->zmq_cert = sub_cert;
     ts->zmq_pub_cert = pub_cert;
 #endif
 }
+
+static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
+    bool check=false;
+
+    if (major == 0 && minor == 0) {
+        //no check
+        return true;
+    }
+
+    int versionMajor;
+    int versionMinor;
+    if (msgVersion!=NULL) {
+        version_getMajor(msgVersion, &versionMajor);
+        version_getMinor(msgVersion, &versionMinor);
+        if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
+            check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+        }
+    }
+
+    return check;
+}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index b9ab962..1792c48 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -27,17 +27,20 @@ typedef struct pubsub_zmq_topic_receiver pubsub_zmq_topic_receiver_t;
 
 pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
         log_helper_t *logHelper,
-        const char *scope, 
+        const char *scope,
         const char *topic,
         const celix_properties_t *topicProperties,
         long serializerSvcId,
-        pubsub_serializer_service_t *serializer);
+        pubsub_serializer_service_t *serializer,
+        long protocolSvcId,
+        pubsub_protocol_service_t *protocol);
 void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);
 
 const char* pubsub_zmqTopicReceiver_scope(pubsub_zmq_topic_receiver_t *receiver);
 const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver);
 
 long pubsub_zmqTopicReceiver_serializerSvcId(pubsub_zmq_topic_receiver_t *receiver);
+long pubsub_zmqTopicReceiver_protocolSvcId(pubsub_zmq_topic_receiver_t *receiver);
 void pubsub_zmqTopicReceiver_listConnections(pubsub_zmq_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls);
 
 void pubsub_zmqTopicReceiver_connectTo(pubsub_zmq_topic_receiver_t *receiver, const char *url);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 96b6461..d6f3604 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -18,6 +18,7 @@
  */
 
 #include <pubsub_serializer.h>
+#include <pubsub_protocol.h>
 #include <stdlib.h>
 #include <memory.h>
 #include <pubsub_constants.h>
@@ -29,7 +30,6 @@
 #include <log_helper.h>
 #include "pubsub_zmq_topic_sender.h"
 #include "pubsub_psa_zmq_constants.h"
-#include "pubsub_zmq_common.h"
 #include <uuid/uuid.h>
 #include "celix_constants.h"
 
@@ -50,13 +50,14 @@ struct pubsub_zmq_topic_sender {
     log_helper_t *logHelper;
     long serializerSvcId;
     pubsub_serializer_service_t *serializer;
+    long protocolSvcId;
+    pubsub_protocol_service_t *protocol;
     uuid_t fwUUID;
     bool metricsEnabled;
     bool zeroCopyEnabled;
 
     char *scope;
     char *topic;
-    char scopeAndTopicFilter[5];
     char *url;
     bool isStatic;
 
@@ -78,8 +79,12 @@ struct pubsub_zmq_topic_sender {
 };
 
 typedef struct psa_zmq_send_msg_entry {
-    pubsub_zmq_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
+    uint32_t type; //msg type id (hash of fqn)
+    uint8_t major;
+    uint8_t minor;
+    unsigned char originUUID[16];
     pubsub_msg_serializer_t *msgSer;
+    pubsub_protocol_service_t *protSer;
     celix_thread_mutex_t sendLock; //protects send & Seqnr
     unsigned int seqNr;
     struct {
@@ -108,7 +113,7 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
 static unsigned int rand_range(unsigned int min, unsigned int max);
 static void delay_first_send_for_late_joiners(pubsub_zmq_topic_sender_t *sender);
 
-static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
+static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
 
 pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         celix_bundle_context_t *ctx,
@@ -117,6 +122,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         const char *topic,
         long serializerSvcId,
         pubsub_serializer_service_t *ser,
+        long protocolSvcId,
+        pubsub_protocol_service_t *prot,
         const char *bindIP,
         const char *staticBindUrl,
         unsigned int basePort,
@@ -126,7 +133,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     sender->logHelper = logHelper;
     sender->serializerSvcId = serializerSvcId;
     sender->serializer = ser;
-    psa_zmq_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+    sender->protocolSvcId = protocolSvcId;
+    sender->protocol = prot;
     const char* uuid = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
@@ -321,6 +329,10 @@ long pubsub_zmqTopicSender_serializerSvcId(pubsub_zmq_topic_sender_t *sender) {
     return sender->serializerSvcId;
 }
 
+long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) {
+    return sender->protocolSvcId;
+}
+
 const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender) {
     return sender->scope;
 }
@@ -375,14 +387,15 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
                 void *key = hashMapEntry_getKey(hashMapEntry);
                 psa_zmq_send_msg_entry_t *sendEntry = calloc(1, sizeof(*sendEntry));
                 sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
-                sendEntry->header.type = (int32_t)sendEntry->msgSer->msgId;
+                sendEntry->protSer = sender->protocol;
+                sendEntry->type = (int32_t)sendEntry->msgSer->msgId;
                 int major;
                 int minor;
                 version_getMajor(sendEntry->msgSer->msgVersion, &major);
                 version_getMinor(sendEntry->msgSer->msgVersion, &minor);
-                sendEntry->header.major = (uint8_t)major;
-                sendEntry->header.minor = (uint8_t)minor;
-                uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
+                sendEntry->major = (uint8_t)major;
+                sendEntry->minor = (uint8_t)minor;
+                uuid_copy(sendEntry->originUUID, sender->fwUUID);
                 celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
                 hashMap_put(entry->msgEntries, key, sendEntry);
                 hashMap_put(entry->msgTypeIds, strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t) sendEntry->msgSer->msgId);
@@ -464,7 +477,7 @@ pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
             result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
             result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
             result->msgMetrics[i].bndId = entry->bndId;
-            result->msgMetrics[i].typeId = mEntry->header.type;
+            result->msgMetrics[i].typeId = mEntry->type;
             snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName);
             i += 1;
             celixThreadMutex_unlock(&mEntry->metrics.mutex);
@@ -480,7 +493,7 @@ static void psa_zmq_freeMsg(void *msg, void *hint __attribute__((unused))) {
     free(msg);
 }
 
-static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
     int status = CELIX_SUCCESS;
     psa_zmq_bounded_service_entry_t *bound = handle;
     pubsub_zmq_topic_sender_t *sender = bound->parent;
@@ -515,55 +528,70 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
         }
 
         if (status == CELIX_SUCCESS /*ser ok*/) {
-            unsigned char *hdr = calloc(sizeof(pubsub_zmq_msg_header_t), sizeof(unsigned char));
+            pubsub_protocol_message_t message;
+            message.payload.payload = serializedOutput;
+            message.payload.length = serializedOutputLen;
+
+            void *payloadData = NULL;
+            size_t payloadLength = 0;
+            entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
+
+            void *metadataData = NULL;
+            size_t metadataLength = 0;
+            if (metadata != NULL) {
+                message.metadata.metadata = metadata;
+                entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength);
+            }
 
-            celixThreadMutex_lock(&entry->sendLock);
+            message.header.msgId = msgTypeId;
+            message.header.msgMajorVersion = 0;
+            message.header.msgMinorVersion = 0;
+            message.header.payloadSize = payloadLength;
+            message.header.metadataSize = metadataLength;
 
-            pubsub_zmq_msg_header_t msg_hdr = entry->header;
-            msg_hdr.seqNr = 0;
-            msg_hdr.sendtimeSeconds = 0;
-            msg_hdr.sendTimeNanoseconds = 0;
-            if (monitor) {
-                clock_gettime(CLOCK_REALTIME, &sendTime);
-                msg_hdr.sendtimeSeconds = (uint64_t) sendTime.tv_sec;
-                msg_hdr.sendTimeNanoseconds = (uint64_t) sendTime.tv_nsec;
-                msg_hdr.seqNr = entry->seqNr++;
-            }
-            psa_zmq_encodeHeader(&msg_hdr, hdr, sizeof(pubsub_zmq_msg_header_t));
+            void *headerData = NULL;
+            size_t headerLength = 0;
+
+            entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
+
+            celixThreadMutex_lock(&entry->sendLock);
 
             errno = 0;
             bool sendOk;
 
             if (bound->parent->zeroCopyEnabled) {
-                zmq_msg_t msg1; //filter
-                zmq_msg_t msg2; //header
-                zmq_msg_t msg3; //payload
+                zmq_msg_t msg1; // Header
+                zmq_msg_t msg2; // Payload
+                zmq_msg_t msg3; // Metadata
                 void *socket = zsock_resolve(sender->zmq.socket);
 
-                zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
-                //send filter
+                zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound);
+                //send header
                 int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
                 if (rc == -1) {
-                    L_WARN("Error sending filter msg. %s", strerror(errno));
+                    L_WARN("Error sending header msg. %s", strerror(errno));
                     zmq_msg_close(&msg1);
                 }
 
                 //send header
                 if (rc > 0) {
-                    zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), psa_zmq_freeMsg, bound);
-                    rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
+                    zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound);
+                    int flags = ZMQ_SNDMORE;
+                    if (metadataLength > 0) {
+                        flags = 0;
+                    }
+                    rc = zmq_msg_send(&msg2, socket, flags);
                     if (rc == -1) {
-                        L_WARN("Error sending header msg. %s", strerror(errno));
+                        L_WARN("Error sending payload msg. %s", strerror(errno));
                         zmq_msg_close(&msg2);
                     }
                 }
 
-
-                if (rc > 0) {
-                    zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
+                if (rc > 0 && metadataLength > 0) {
+                    zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound);
                     rc = zmq_msg_send(&msg3, socket, 0);
                     if (rc == -1) {
-                        L_WARN("Error sending payload msg. %s", strerror(errno));
+                        L_WARN("Error sending metadata msg. %s", strerror(errno));
                         zmq_msg_close(&msg3);
                     }
                 }
@@ -571,18 +599,24 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 sendOk = rc > 0;
             } else {
                 zmsg_t *msg = zmsg_new();
-                zmsg_addstr(msg, sender->scopeAndTopicFilter);
-                zmsg_addmem(msg, hdr, sizeof(pubsub_zmq_msg_header_t));
-                zmsg_addmem(msg, serializedOutput, serializedOutputLen);
+                zmsg_addmem(msg, headerData, headerLength);
+                zmsg_addmem(msg, payloadData, payloadLength);
+                if (metadataLength > 0) {
+                    zmsg_addmem(msg, metadataData, metadataLength);
+                }
                 int rc = zmsg_send(&msg, sender->zmq.socket);
                 sendOk = rc == 0;
-                free(serializedOutput);
-                free(hdr);
+
                 if (!sendOk) {
                     zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
                 }
             }
 
+            celix_properties_destroy(message.metadata.metadata);
+            free(headerData);
+            free(payloadData);
+            free(metadataData);
+
             celixThreadMutex_unlock(&entry->sendLock);
             if (sendOk) {
                 sendCountUpdate = 1;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
index c5c69a4..4923c62 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -32,6 +32,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         const char *topic,
         long serializerSvcId,
         pubsub_serializer_service_t *ser,
+        long protocolSvcId,
+        pubsub_protocol_service_t *prot,
         const char *bindIP,
         const char *staticBindUrl,
         unsigned int basePort,
@@ -44,6 +46,7 @@ const char* pubsub_zmqTopicSender_url(pubsub_zmq_topic_sender_t *sender);
 bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender);
 
 long pubsub_zmqTopicSender_serializerSvcId(pubsub_zmq_topic_sender_t *sender);
+long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender);
 
 void pubsub_zmqTopicSender_connectTo(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
 void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 3324c51..fad5a0d 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -29,6 +29,8 @@
 
 #include <stdlib.h>
 
+#include "hash_map.h"
+
 #define PUBSUB_PUBLISHER_SERVICE_NAME           "pubsub.publisher"
 #define PUBSUB_PUBLISHER_SERVICE_VERSION        "3.0.0"
  
@@ -59,7 +61,7 @@ struct pubsub_publisher {
      * send is a async function, but the msg can be safely deleted after send returns.
      * Returns 0 on success.
      */
-    int (*send)(void *handle, unsigned int msgTypeId, const void *msg);
+    int (*send)(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
  
 };
 typedef struct pubsub_publisher pubsub_publisher_t;
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index 71b6956..9f5433f 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -29,6 +29,8 @@
 
 #include <stdbool.h>
 
+#include "celix_properties.h"
+
 #define PUBSUB_SUBSCRIBER_SERVICE_NAME          "pubsub.subscriber"
 #define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "3.0.0"
  
@@ -62,7 +64,7 @@ struct pubsub_subscriber_struct {
      *
      * this method can be  NULL.
      */
-    int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+    int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, const celix_properties_t *metadata, bool *release);
 
 };
 typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 964afd9..283db2f 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -481,8 +481,9 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel
             const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
             const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
             const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
-            L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s.\n",
-                   uuid, type, admin, ser);
+            const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
+            L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol is %s.\n",
+                   uuid, type, admin, ser, prot);
         }
 
         celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
@@ -511,8 +512,9 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
         const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
         const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
         const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
-        L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s.\n",
-               uuid, type, admin, ser);
+        const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
+        L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol = %s.\n",
+               uuid, type, admin, ser, prot);
     }
 
     if (endpoint != NULL) {
@@ -607,6 +609,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
         const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
         const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!");
         fprintf(os, "Endpoint %s:\n", uuid);
         fprintf(os, "   |- type          = %s\n", type);
@@ -614,6 +617,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
         fprintf(os, "   |- topic         = %s\n", topic);
         fprintf(os, "   |- admin type    = %s\n", adminType);
         fprintf(os, "   |- serializer    = %s\n", serType);
+        fprintf(os, "   |- protocol      = %s\n", protType);
     }
     celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
 
@@ -628,6 +632,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
         const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
         const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE, "!Error!");
         int age = (int)(now.tv_sec - entry->createTime.tv_sec);
         fprintf(os, "Endpoint %s:\n", uuid);
@@ -636,6 +641,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
         fprintf(os, "   |- topic         = %s\n", topic);
         fprintf(os, "   |- admin type    = %s\n", adminType);
         fprintf(os, "   |- serializer    = %s\n", serType);
+        fprintf(os, "   |- protocol      = %s\n", protType);
         fprintf(os, "   |- age           = %ds\n", age);
         fprintf(os, "   |- is set        = %s\n", entry->isSet ? "true" : "false");
         if (disc->verbose) {
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt b/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
new file mode 100644
index 0000000..5e734e9
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_celix_bundle(celix_pubsub_protocol_wire_v1
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_protocol_wire_v1"
+    VERSION "1.0.0"
+    GROUP "Celix/PubSub"
+    SOURCES
+        src/ps_wire_protocol_activator.c
+        src/pubsub_wire_protocol_impl.c
+        src/pubsub_wire_protocol_common.c
+)
+target_include_directories(celix_pubsub_protocol_wire_v1 PRIVATE
+    src
+)
+set_target_properties(celix_pubsub_protocol_wire_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_protocol_wire_v1 PRIVATE Celix::pubsub_spi Celix::framework)
+
+install_celix_bundle(celix_pubsub_protocol_wire_v1 EXPORT celix COMPONENT pubsub)
+
+add_library(Celix::pubsub_protocol_wire_v1 ALIAS celix_pubsub_protocol_wire_v1)
+
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
new file mode 100644
index 0000000..8d99969
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <pubsub_constants.h>
+
+#include "celix_api.h"
+#include "pubsub_wire_protocol_impl.h"
+
+typedef struct ps_wp_activator {
+    pubsub_protocol_wire_v1_t *wireprotocol;
+
+    pubsub_protocol_service_t protocolSvc;
+    long wireProtocolSvcId;
+} ps_wp_activator_t;
+
+static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
+    act->wireProtocolSvcId = -1L;
+
+    celix_status_t status = pubsubProtocol_create(ctx, &(act->wireprotocol));
+    if (status == CELIX_SUCCESS) {
+        /* Set serializertype */
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_PROTOCOL_TYPE_KEY, PUBSUB_WIRE_PROTOCOL_TYPE);
+
+        act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader;
+        
+        act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader;
+        act->protocolSvc.encodePayload = pubsubProtocol_encodePayload;
+        act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata;
+
+        act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader;
+        act->protocolSvc.decodePayload = pubsubProtocol_decodePayload;
+        act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata;
+
+        act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props);
+    }
+    return status;
+}
+
+static int ps_wp_stop(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
+    celix_bundleContext_unregisterService(ctx, act->wireProtocolSvcId);
+    act->wireProtocolSvcId = -1L;
+    pubsubProtocol_destroy(act->wireprotocol);
+    return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(ps_wp_activator_t, ps_wp_start, ps_wp_stop)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c
new file mode 100644
index 0000000..dbfd29e
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "pubsub_wire_protocol_common.h"
+
+#include <string.h>
+#include <endian.h>
+
+int readShort(const unsigned char *data, int offset, uint16_t *val) {
+    memcpy(val, data + offset, sizeof(uint16_t));
+    *val = be16toh(*val);
+    return offset + sizeof(uint16_t);
+}
+
+int readInt(const unsigned char *data, int offset, uint32_t *val) {
+    memcpy(val, data + offset, sizeof(uint32_t));
+    *val = be32toh(*val);
+    return offset + sizeof(uint32_t);
+}
+
+int readLong(const unsigned char *data, int offset, uint64_t *val) {
+    memcpy(val, data + offset, sizeof(uint64_t));
+    *val = be64toh(*val);
+    return offset + sizeof(uint64_t);
+}
+
+int writeShort(unsigned char *data, int offset, uint16_t val) {
+    uint16_t nVal = htobe16(val);
+    memcpy(data + offset, &nVal, sizeof(uint16_t));
+    return offset + sizeof(uint16_t);
+}
+
+int writeInt(unsigned char *data, int offset, uint32_t val) {
+    uint32_t nVal = htobe32(val);
+    memcpy(data + offset, &nVal, sizeof(uint32_t));
+    return offset + sizeof(uint32_t);
+}
+
+int writeLong(unsigned char *data, int offset, uint64_t val) {
+    uint64_t nVal = htobe64(val);
+    memcpy(data + offset, &nVal, sizeof(uint64_t));
+    return offset + sizeof(uint64_t);
+}
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
similarity index 55%
copy from bundles/pubsub/pubsub_spi/include/pubsub_constants.h
copy to bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
index bbbe5d3..8edf788 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
@@ -17,25 +17,20 @@
  * under the License.
  */
 
-#ifndef PUBSUB_CONSTANTS_H_
-#define PUBSUB_CONSTANTS_H_
+#include <stdint.h>
 
-#define PUBSUB_ADMIN_TYPE_KEY      "pubsub.config"
-#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer"
+#ifndef CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
+#define CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
 
-/**
- * Endpoints with the system visibility should be discoverable through the complete system
- */
-#define PUBSUB_ENDPOINT_SYSTEM_VISIBILITY    "system"
+static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
 
-/**
- * Endpoints with the system visibility are discoverable for a single host (i.e. IPC)
- */
-#define PUBSUB_ENDPOINT_HOST_VISIBILITY      "host"
+int readShort(const unsigned char *data, int offset, uint16_t *val);
+int readInt(const unsigned char *data, int offset, uint32_t *val);
+int readLong(const unsigned char *data, int offset, uint64_t *val);
 
-/**
- * Endpoints which are only visible within a single process
- */
-#define PUBSUB_ENDPOINT_LOCAL_VISIBILITY     "local"
+int writeShort(unsigned char *data, int offset, uint16_t val);
+int writeInt(unsigned char *data, int offset, uint32_t val);
+int writeLong(unsigned char *data, int offset, uint64_t val);
 
-#endif /* PUBSUB_CONSTANTS_H_ */
+#endif //CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
new file mode 100644
index 0000000..4f40e84
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <math.h>
+#include <string.h>
+
+#include "utils.h"
+#include "celix_properties.h"
+
+#include "pubsub_wire_protocol_impl.h"
+#include "pubsub_wire_protocol_common.h"
+
+#define NETSTRING_ERROR_TOO_LONG     -1
+#define NETSTRING_ERROR_NO_COLON     -2
+#define NETSTRING_ERROR_TOO_SHORT    -3
+#define NETSTRING_ERROR_NO_COMMA     -4
+#define NETSTRING_ERROR_LEADING_ZERO -5
+#define NETSTRING_ERROR_NO_LENGTH    -6
+
+struct pubsub_protocol_wire_v1 {
+    celix_bundle_context_t *bundle_context;
+};
+
+static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut);
+static int pubsubProtocol_parseNetstring(char *buffer, size_t buffer_length,
+                                  char **netstring_start, size_t *netstring_length);
+
+celix_status_t pubsubProtocol_create(celix_bundle_context_t *context, pubsub_protocol_wire_v1_t **protocol) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *protocol = calloc(1, sizeof(**protocol));
+
+    if (!*protocol) {
+        status = CELIX_ENOMEM;
+    }
+    else {
+        (*protocol)->bundle_context = context;
+    }
+
+    return status;
+}
+
+celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    free(protocol);
+
+    return status;
+}
+
+celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
+    for (int i = 0; i < 5; ++i) {
+        ((char *) syncHeader)[i] = '\0';
+    }
+    writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC);
+
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *outBuffer = calloc(1, 24);
+    if (*outBuffer == NULL) {
+        status = CELIX_ENOMEM;
+    } else {
+        int idx = 0;
+        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC);
+        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_ENVELOPE_VERSION);
+        idx = writeInt(*outBuffer, idx, message->header.msgId);
+        idx = writeShort(*outBuffer, idx, message->header.msgMajorVersion);
+        idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
+        idx = writeInt(*outBuffer, idx, message->header.payloadSize);
+        idx = writeInt(*outBuffer, idx, message->header.metadataSize);
+
+        *outLength = idx;
+    }
+
+    return status;
+}
+
+celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    *outBuffer = message->payload.payload;
+    *outLength = message->payload.length;
+
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    char *line = calloc(1, 1);
+    size_t len = 0;
+
+    const char *key;
+    if (message->metadata.metadata != NULL || celix_properties_size(message->metadata.metadata) > 0) {
+        CELIX_PROPERTIES_FOR_EACH(message->metadata.metadata, key) {
+            const char *val = celix_properties_get(message->metadata.metadata, key, "!Error!");
+            char *keyNetString = NULL;
+            char *valueNetString = NULL;
+
+            pubsubProtocol_createNetstring(key, &keyNetString);
+            pubsubProtocol_createNetstring(val, &valueNetString);
+
+            len += strlen(keyNetString);
+            len += strlen(valueNetString);
+            char *tmp = realloc(line, len + 1);
+            if (!tmp) {
+                free(line);
+                status = CELIX_ENOMEM;
+            }
+            line = tmp;
+
+            strncat(line, keyNetString, strlen(keyNetString));
+            strncat(line, valueNetString, strlen(valueNetString));
+
+            free(keyNetString);
+            free(valueNetString);
+        }
+    }
+
+    *outBuffer = line;
+    *outLength = len;
+
+    return status;
+}
+
+celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    int idx = 0;
+    unsigned int sync;
+    idx = readInt(data, idx, &sync);
+    if (sync != PROTOCOL_WIRE_SYNC) {
+        status = CELIX_ILLEGAL_ARGUMENT;
+    } else {
+        unsigned int envelopeVersion;
+        idx = readInt(data, idx, &envelopeVersion);
+        if (envelopeVersion != PROTOCOL_WIRE_ENVELOPE_VERSION) {
+            status = CELIX_ILLEGAL_ARGUMENT;
+        } else {
+            idx = readInt(data, idx, &message->header.msgId);
+            idx = readShort(data, idx, &message->header.msgMajorVersion);
+            idx = readShort(data, idx, &message->header.msgMinorVersion);
+            idx = readInt(data, idx, &message->header.payloadSize);
+            readInt(data, idx, &message->header.metadataSize);
+        }
+    }
+
+    return status;
+}
+
+celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message){
+    message->payload.payload = data;
+    message->payload.length = length;
+
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    char *netstring = data;
+
+    message->metadata.metadata = celix_properties_create();
+    while (strlen(netstring) > 0) {
+        size_t outlen;
+        pubsubProtocol_parseNetstring(netstring, length, &netstring, &outlen);
+        char *key = strndup(netstring, outlen);
+        netstring += outlen + 1;
+
+        pubsubProtocol_parseNetstring(netstring, length, &netstring, &outlen);
+        char *value = strndup(netstring, outlen);
+        netstring += outlen + 1;
+
+        celix_properties_setWithoutCopy(message->metadata.metadata, key, value);
+    }
+
+    return status;
+}
+
+static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    size_t str_len = strlen(string);
+    if (str_len == 0) {
+        // 0:,
+        *netstringOut = calloc(1, 4);
+        if (*netstringOut == NULL) {
+            status = CELIX_ENOMEM;
+        } else {
+            *netstringOut[0] = '0';
+            *netstringOut[1] = ':';
+            *netstringOut[2] = ',';
+            *netstringOut[3] = '\0';
+        }
+    } else {
+        size_t numlen = ceil(log10(str_len + 1));
+        *netstringOut = calloc(1, numlen + str_len + 3);
+        if (*netstringOut == NULL) {
+            status = CELIX_ENOMEM;
+        } else {
+            sprintf(*netstringOut, "%zu:%s,", str_len, string);
+        }
+    }
+
+    return status;
+}
+
+/* Reads a netstring from a `buffer` of length `buffer_length`. Writes
+   to `netstring_start` a pointer to the beginning of the string in
+   the buffer, and to `netstring_length` the length of the
+   string. Does not allocate any memory. If it reads successfully,
+   then it returns 0. If there is an error, then the return value will
+   be negative. The error values are:
+   NETSTRING_ERROR_TOO_LONG      More than 999999999 bytes in a field
+   NETSTRING_ERROR_NO_COLON      No colon was found after the number
+   NETSTRING_ERROR_TOO_SHORT     Number of bytes greater than buffer length
+   NETSTRING_ERROR_NO_COMMA      No comma was found at the end
+   NETSTRING_ERROR_LEADING_ZERO  Leading zeros are not allowed
+   NETSTRING_ERROR_NO_LENGTH     Length not given at start of netstring
+   If you're sending messages with more than 999999999 bytes -- about
+   2 GB -- then you probably should not be doing so in the form of a
+   single netstring. This restriction is in place partially to protect
+   from malicious or erroneous input, and partly to be compatible with
+   D. J. Bernstein's reference implementation.
+   Example:
+      if (netstring_read("3:foo,", 6, &str, &len) < 0) explode_and_die();
+ */
+static int pubsubProtocol_parseNetstring(char *buffer, size_t buffer_length,
+                   char **netstring_start, size_t *netstring_length) {
+    int i;
+    size_t len = 0;
+
+    /* Write default values for outputs */
+    *netstring_start = NULL; *netstring_length = 0;
+
+    /* Make sure buffer is big enough. Minimum size is 3. */
+    if (buffer_length < 3) return NETSTRING_ERROR_TOO_SHORT;
+
+    /* No leading zeros allowed! */
+    if (buffer[0] == '0' && isdigit(buffer[1]))
+        return NETSTRING_ERROR_LEADING_ZERO;
+
+    /* The netstring must start with a number */
+    if (!isdigit(buffer[0])) return NETSTRING_ERROR_NO_LENGTH;
+
+    /* Read the number of bytes */
+    for (i = 0; i < buffer_length && isdigit(buffer[i]); i++) {
+        /* Error if more than 9 digits */
+        if (i >= 9) return NETSTRING_ERROR_TOO_LONG;
+        /* Accumulate each digit, assuming ASCII. */
+        len = len*10 + (buffer[i] - '0');
+    }
+
+    /* Check buffer length once and for all. Specifically, we make sure
+       that the buffer is longer than the number we've read, the length
+       of the string itself, and the colon and comma. */
+    if (i + len + 1 >= buffer_length) return NETSTRING_ERROR_TOO_SHORT;
+
+    /* Read the colon */
+    if (buffer[i++] != ':') return NETSTRING_ERROR_NO_COLON;
+
+    /* Test for the trailing comma, and set the return values */
+    if (buffer[i + len] != ',') return NETSTRING_ERROR_NO_COMMA;
+    *netstring_start = &buffer[i]; *netstring_length = len;
+
+    return 0;
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
new file mode 100644
index 0000000..7705585
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PROTOCOL_WIRE_H_
+#define PUBSUB_PROTOCOL_WIRE_H_
+
+#include "pubsub_protocol.h"
+
+#define PUBSUB_WIRE_PROTOCOL_TYPE "wire"
+
+typedef struct pubsub_protocol_wire_v1 pubsub_protocol_wire_v1_t;
+
+celix_status_t pubsubProtocol_create(celix_bundle_context_t *context, pubsub_protocol_wire_v1_t **protocol);
+celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol);
+
+celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader);
+
+celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+
+#endif /* PUBSUB_PROTOCOL_WIRE_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index cd172cc..15d5d16 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -44,16 +44,16 @@
 struct pubsub_admin_service {
     void *handle;
 
-    celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
-    celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
+    celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId);
+    celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId);
     celix_status_t (*matchDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
 
     //note endpoint is owned by caller
-    celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+    celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
     celix_status_t (*teardownTopicSender)(void *handle, const char *scope, const char *topic);
 
     //note endpoint is owned by caller
-    celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+    celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
     celix_status_t (*teardownTopicReceiver)(void *handle, const char *scope, const char *topic);
 
     celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
index bbbe5d3..671b874 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -20,8 +20,9 @@
 #ifndef PUBSUB_CONSTANTS_H_
 #define PUBSUB_CONSTANTS_H_
 
-#define PUBSUB_ADMIN_TYPE_KEY      "pubsub.config"
-#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer"
+#define PUBSUB_ADMIN_TYPE_KEY       "pubsub.config"
+#define PUBSUB_SERIALIZER_TYPE_KEY  "pubsub.serializer"
+#define PUBSUB_PROTOCOL_TYPE_KEY    "pubsub.protocol"
 
 /**
  * Endpoints with the system visibility should be discoverable through the complete system
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index 8e51474..5ba178b 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -41,6 +41,7 @@ extern "C" {
 #define PUBSUB_ENDPOINT_VISIBILITY      "pubsub.endpoint.visibility" //local, host or system. e.g. for IPC host
 #define PUBSUB_ENDPOINT_ADMIN_TYPE       PUBSUB_ADMIN_TYPE_KEY
 #define PUBSUB_ENDPOINT_SERIALIZER       PUBSUB_SERIALIZER_TYPE_KEY
+#define PUBSUB_ENDPOINT_PROTOCOL         PUBSUB_PROTOCOL_TYPE_KEY
 
 
 #define PUBSUB_PUBLISHER_ENDPOINT_TYPE      "publisher"
@@ -50,7 +51,7 @@ extern "C" {
 
 celix_properties_t *
 pubsubEndpoint_create(const char *fwUUID, const char *scope, const char *topic, const char *pubsubType,
-                      const char *adminType, const char *serType, celix_properties_t *topic_props);
+                      const char *adminType, const char *serType, const char *protType, celix_properties_t *topic_props);
 
 celix_properties_t *
 pubsubEndpoint_createFromSubscriberSvc(bundle_context_t *ctx, long svcBndId, const celix_properties_t *svcProps);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
new file mode 100644
index 0000000..44575e4
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PROTOCOL_SERVICE_H_
+#define PUBSUB_PROTOCOL_SERVICE_H_
+
+#include "celix_properties.h"
+#include "version.h"
+#include "celix_bundle.h"
+
+#define PUBSUB_PROTOCOL_SERVICE_NAME      "pubsub_protocol"
+#define PUBSUB_PROTOCOL_SERVICE_VERSION   "1.0.0"
+#define PUBSUB_PROTOCOL_SERVICE_RANGE     "[1,2)"
+
+typedef struct pubsub_protocol_header pubsub_protocol_header_t;
+
+struct pubsub_protocol_header {
+    unsigned int msgId;
+    unsigned short msgMajorVersion;
+    unsigned short msgMinorVersion;
+
+    unsigned int payloadSize;
+    unsigned int metadataSize;
+};
+
+typedef struct pubsub_protocol_payload pubsub_protocol_payload_t;
+
+struct pubsub_protocol_payload {
+    void *payload;
+    size_t length;
+};
+
+typedef struct pubsub_protocol_metadata pubsub_protocol_metadata_t;
+
+struct pubsub_protocol_metadata {
+    celix_properties_t *metadata;
+};
+
+typedef struct pubsub_protocol_message pubsub_protocol_message_t;
+
+struct pubsub_protocol_message {
+    pubsub_protocol_header_t header;
+    pubsub_protocol_payload_t payload;
+    pubsub_protocol_metadata_t metadata;
+};
+
+typedef struct pubsub_protocol_service {
+    void* handle;
+
+    celix_status_t (*getSyncHeader)(void *handle, void *sync);
+
+    celix_status_t (*encodeHeader)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+    celix_status_t (*encodePayload)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+    celix_status_t (*encodeMetadata)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+    celix_status_t (*decodeHeader)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+    celix_status_t (*decodePayload)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+    celix_status_t (*decodeMetadata)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+} pubsub_protocol_service_t;
+
+#endif /* PUBSUB_PROTOCOL_SERVICE_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 557014c..47bb957 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -86,8 +86,10 @@ double pubsub_utils_matchPublisher(
         double sampleScore,
         double controlScore,
         double defaultScore,
+        bool matchProtocol,
         celix_properties_t **outTopicProperties,
-        long *outSerializerSvcId);
+        long *outSerializerSvcId,
+        long *outProtocolSvcId);
 
 /**
  * Match a subscriber for a provided bnd (using the bundleId) and provided service properties.
@@ -127,8 +129,10 @@ double pubsub_utils_matchSubscriber(
         double sampleScore,
         double controlScore,
         double defaultScore,
+        bool matchProtocol,
         celix_properties_t **outTopicProperties,
-        long *outSerializerSvcId);
+        long *outSerializerSvcId,
+        long *outProtocolSvcId);
 
 /**
  * Match an endpoint (subscriber or publisher endpoint) for the provided admin type.
@@ -146,7 +150,9 @@ bool pubsub_utils_matchEndpoint(
         celix_bundle_context_t *ctx,
         const celix_properties_t *endpoint,
         const char *adminType,
-        long *outSerializerSvcId);
+        bool matchProtocol,
+        long *outSerializerSvcId,
+        long *outProtocolSvcId);
 
 
 /**
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 91a01a3..f9e74b2 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -39,9 +39,9 @@
 #include "pubsub_utils.h"
 
 
-static void pubsubEndpoint_setFields(celix_properties_t *psEp, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *adminType, const char *serType, const celix_properties_t *topic_props);
+static void pubsubEndpoint_setFields(celix_properties_t *psEp, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *adminType, const char *serType, const char *protType, const celix_properties_t *topic_props);
 
-static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *adminType, const char *serType, const celix_properties_t *topic_props) {
+static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *adminType, const char *serType, const char *protType, const celix_properties_t *topic_props) {
     assert(ep != NULL);
 
     //copy topic properties
@@ -82,6 +82,10 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID,
     if (serType != NULL) {
         celix_properties_set(ep, PUBSUB_ENDPOINT_SERIALIZER, serType);
     }
+
+    if (protType != NULL) {
+        celix_properties_set(ep, PUBSUB_ENDPOINT_PROTOCOL, protType);
+    }
 }
 
 celix_properties_t* pubsubEndpoint_create(
@@ -91,9 +95,10 @@ celix_properties_t* pubsubEndpoint_create(
         const char* pubsubType,
         const char* adminType,
         const char *serType,
+        const char *protType,
         celix_properties_t *topic_props) {
     celix_properties_t *ep = celix_properties_create();
-    pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, adminType, serType, topic_props);
+    pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, adminType, serType, protType, topic_props);
     if (!pubsubEndpoint_isValid(ep, true, true)) {
         celix_properties_destroy(ep);
         ep = NULL;
@@ -128,7 +133,7 @@ celix_properties_t* pubsubEndpoint_createFromSubscriberSvc(bundle_context_t* ctx
 
     const char *pubsubType = PUBSUB_SUBSCRIBER_ENDPOINT_TYPE;
 
-    pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, NULL, NULL, data.props);
+    pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, NULL, NULL, NULL, data.props);
 
     if (data.props != NULL) {
         celix_properties_destroy(data.props); //Can be deleted since setFields invokes properties_copy
@@ -161,7 +166,7 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context
     celix_bundleContext_useBundle(ctx, bundleId, &data, retrieveTopicProperties);
 
     if (data.props != NULL) {
-        pubsubEndpoint_setFields(ep, fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, NULL, NULL, data.props);
+        pubsubEndpoint_setFields(ep, fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, NULL, NULL, NULL, data.props);
         celix_properties_destroy(data.props); //safe to delete, properties are copied in pubsubEndpoint_setFields
     }
 
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index 7196dfb..226b6ed 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -21,6 +21,7 @@
 #include <limits.h>
 #include <pubsub_endpoint.h>
 #include <pubsub_serializer.h>
+#include <pubsub_protocol.h>
 
 #include "service_reference.h"
 
@@ -98,6 +99,50 @@ static long getPSASerializer(celix_bundle_context_t *ctx, const char *requested_
     return svcId;
 }
 
+struct psa_protocol_selection_data {
+    const char *requested_protocol;
+    long matchingSvcId;
+};
+
+void psa_protocol_selection_callback(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
+    struct psa_protocol_selection_data *data = handle;
+    const char *serType = celix_properties_get(props, PUBSUB_PROTOCOL_TYPE_KEY, NULL);
+    if (serType == NULL) {
+        fprintf(stderr, "Warning found protocol without mandatory protocol type key (%s)\n", PUBSUB_PROTOCOL_TYPE_KEY);
+    } else {
+        if (strncmp(data->requested_protocol, serType, 1024 * 1024) == 0) {
+            data->matchingSvcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+        }
+    }
+}
+
+static long getPSAProtocol(celix_bundle_context_t *ctx, const char *requested_protocol) {
+    long svcId;
+
+    if (requested_protocol != NULL) {
+        struct psa_protocol_selection_data data;
+        data.requested_protocol = requested_protocol;
+        data.matchingSvcId = -1L;
+
+        celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+        opts.filter.serviceName = PUBSUB_PROTOCOL_SERVICE_NAME;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = &data;
+        opts.useWithProperties = psa_protocol_selection_callback;
+        celix_bundleContext_useServicesWithOptions(ctx, &opts);
+        svcId = data.matchingSvcId;
+    } else {
+        celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
+        opts.serviceName = PUBSUB_PROTOCOL_SERVICE_NAME;
+        opts.ignoreServiceLanguage = true;
+
+        //note findService will automatically return the highest ranking service id
+        svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
+    }
+
+    return svcId;
+}
+
 double pubsub_utils_matchPublisher(
         celix_bundle_context_t *ctx,
         long bundleId,
@@ -106,8 +151,10 @@ double pubsub_utils_matchPublisher(
         double sampleScore,
         double controlScore,
         double defaultScore,
+        bool matchProtocol,
         celix_properties_t **outTopicProperties,
-        long *outSerializerSvcId) {
+        long *outSerializerSvcId,
+        long *outProtocolSvcId) {
 
     celix_properties_t *ep = pubsubEndpoint_createFromPublisherTrackerInfo(ctx, bundleId, filter);
     const char *requested_admin         = NULL;
@@ -126,12 +173,23 @@ double pubsub_utils_matchPublisher(
         score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
     }
 
-//    printf("Score publisher service for psa type %s is %f\n", adminType, score);
-
     if (outSerializerSvcId != NULL) {
         *outSerializerSvcId = serializerSvcId;
     }
 
+    if (matchProtocol) {
+        const char *requested_protocol = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, NULL);
+        long protocolSvcId = getPSAProtocol(ctx, requested_protocol);
+
+        if (protocolSvcId < 0) {
+            score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+        }
+
+        if (outProtocolSvcId != NULL) {
+            *outProtocolSvcId = protocolSvcId;
+        }
+    }
+
     if (outTopicProperties != NULL) {
         *outTopicProperties = ep;
     } else if (ep != NULL) {
@@ -161,8 +219,10 @@ double pubsub_utils_matchSubscriber(
         double sampleScore,
         double controlScore,
         double defaultScore,
+        bool matchProtocol,
         celix_properties_t **outTopicProperties,
-        long *outSerializerSvcId) {
+        long *outSerializerSvcId,
+        long *outProtocolSvcId) {
 
     pubsub_get_topic_properties_data_t data;
     data.isPublisher = false;
@@ -174,10 +234,14 @@ double pubsub_utils_matchSubscriber(
     const char *requested_admin         = NULL;
     const char *requested_qos            = NULL;
     const char *requested_serializer     = NULL;
+    const char *requested_protocol = NULL;
     if (ep != NULL) {
         requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
         requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
         requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+        if (matchProtocol) {
+            requested_protocol = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, NULL);
+        }
     }
 
     double score = getPSAScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
@@ -191,6 +255,17 @@ double pubsub_utils_matchSubscriber(
         *outSerializerSvcId = serializerSvcId;
     }
 
+    if (matchProtocol) {
+        long protocolSvcId = getPSAProtocol(ctx, requested_protocol);
+        if (protocolSvcId < 0) {
+            score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no protocol, no match
+        }
+
+        if (outProtocolSvcId != NULL) {
+            *outProtocolSvcId = protocolSvcId;
+        }
+    }
+
     if (outTopicProperties != NULL) {
         *outTopicProperties = ep;
     } else if (ep != NULL) {
@@ -204,7 +279,9 @@ bool pubsub_utils_matchEndpoint(
         celix_bundle_context_t *ctx,
         const celix_properties_t *ep,
         const char *adminType,
-        long *outSerializerSvcId) {
+        bool matchProtocol,
+        long *outSerializerSvcId,
+        long *outProtocolSvcId) {
 
     bool psaMatch = false;
     const char *configured_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
@@ -221,7 +298,21 @@ bool pubsub_utils_matchEndpoint(
     }
 
     bool match = psaMatch && serMatch;
-//    printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
+
+    if (matchProtocol) {
+        bool protMatch = false;
+        long protocolSvcId = -1L;
+        if (psaMatch) {
+            const char *configured_protocol = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, NULL);
+            protocolSvcId = getPSAProtocol(ctx, configured_protocol);
+            protMatch = protocolSvcId >= 0;
+        }
+        match = match && protMatch;
+
+        if (outProtocolSvcId != NULL) {
+            *outProtocolSvcId = protocolSvcId;
+        }
+    }
 
     if (outSerializerSvcId != NULL) {
         *outSerializerSvcId = serializerSvcId;
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index a8388bb..5ec5140 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -263,6 +263,7 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
 
             entry->needsMatch = true;
             entry->selectedSerializerSvcId = -1L;
+            entry->selectedProtocolSvcId = -1L;
             entry->selectedPsaSvcId = -1L;
             if (entry->endpoint != NULL) {
                 celix_properties_destroy(entry->endpoint);
@@ -290,6 +291,7 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
 
             entry->needsMatch = true;
             entry->selectedSerializerSvcId = -1L;
+            entry->selectedProtocolSvcId = -1L;
             entry->selectedPsaSvcId = -1L;
             if (entry->endpoint != NULL) {
                 celix_properties_destroy(entry->endpoint);
@@ -458,6 +460,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
         entry = calloc(1, sizeof(*entry));
         entry->usageCount = 1;
         entry->selectedSerializerSvcId = -1L;
+        entry->selectedProtocolSvcId = -1L;
         entry->selectedPsaSvcId = -1L;
         entry->scope = scope; //taking ownership
         entry->topic = topic; //taking ownership
@@ -673,6 +676,7 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
                 entry->endpoint = NULL;
                 entry->selectedPsaSvcId = -1L;
                 entry->selectedSerializerSvcId = -1L;
+                entry->selectedProtocolSvcId = -1L;
             }
         }
     }
@@ -737,6 +741,7 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
                 entry->endpoint = NULL;
                 entry->selectedPsaSvcId = -1L;
                 entry->selectedSerializerSvcId = -1L;
+                entry->selectedProtocolSvcId = -1L;
             }
         }
     }
@@ -789,7 +794,7 @@ static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
 static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
     pstm_topic_receiver_or_sender_entry_t *entry = handle;
     pubsub_admin_service_t *psa = svc;
-    psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint);
+    psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpoint);
 }
 
 static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
@@ -801,6 +806,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
             //new topic sender needed, requesting match with current psa
             double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
             long serializerSvcId = -1L;
+            long protocolSvcId = -1L;
             long selectedPsaSvcId = -1L;
             celix_properties_t *topicPropertiesForHighestMatch = NULL;
 
@@ -812,14 +818,16 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
                 pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
                 double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
                 long serSvcId = -1L;
+                long protSvcId = -1L;
                 celix_properties_t *topicProps = NULL;
-                psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId);
+                psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId, &protSvcId);
                 if (score > highestScore) {
                     if (topicPropertiesForHighestMatch != NULL) {
                         celix_properties_destroy(topicPropertiesForHighestMatch);
                     }
                     highestScore = score;
                     serializerSvcId = serSvcId;
+                    protocolSvcId = protSvcId;
                     selectedPsaSvcId = svcId;
                     topicPropertiesForHighestMatch = topicProps;
                 } else if (topicProps != NULL) {
@@ -831,6 +839,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
             if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
                 entry->selectedPsaSvcId = selectedPsaSvcId;
                 entry->selectedSerializerSvcId = serializerSvcId;
+                entry->selectedProtocolSvcId = protocolSvcId;
                 entry->topicProperties = topicPropertiesForHighestMatch;
                 bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
 
@@ -858,7 +867,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
 static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
     pstm_topic_receiver_or_sender_entry_t *entry = handle;
     pubsub_admin_service_t *psa = svc;
-    psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint);
+    psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpoint);
 }
 
 static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
@@ -870,6 +879,7 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
 
             double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
             long serializerSvcId = -1L;
+            long protocolSvcId = -1L;
             long selectedPsaSvcId = -1L;
             celix_properties_t *highestMatchTopicProperties = NULL;
 
@@ -881,15 +891,17 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
                 pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
                 double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
                 long serSvcId = -1L;
+                long protSvcId = -1L;
                 celix_properties_t *topicProps = NULL;
 
-                psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score, &serSvcId);
+                psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score, &serSvcId, &protSvcId);
                 if (score > highestScore) {
                     if (highestMatchTopicProperties != NULL) {
                         celix_properties_destroy(highestMatchTopicProperties);
                     }
                     highestScore = score;
                     serializerSvcId = serSvcId;
+                    protocolSvcId = protSvcId;
                     selectedPsaSvcId = svcId;
                     highestMatchTopicProperties = topicProps;
                 } else if (topicProps != NULL) {
@@ -901,6 +913,7 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
             if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
                 entry->selectedPsaSvcId = selectedPsaSvcId;
                 entry->selectedSerializerSvcId = serializerSvcId;
+                entry->selectedProtocolSvcId = protocolSvcId;
                 entry->topicProperties = highestMatchTopicProperties;
 
                 bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
@@ -987,6 +1000,7 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
         const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
         fprintf(os, "   |- container name = %s\n", cn);
         fprintf(os, "   |- fw uuid        = %s\n", fwuuid);
@@ -995,6 +1009,7 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         fprintf(os, "   |- topic          = %s\n", topic);
         fprintf(os, "   |- admin type     = %s\n", adminType);
         fprintf(os, "   |- serializer     = %s\n", serType);
+        fprintf(os, "   |- protocol       = %s\n", protType);
         if (manager->verbose) {
             fprintf(os, "   |- psa svc id     = %li\n", discovered->selectedPsaSvcId);
             fprintf(os, "   |- usage count    = %i\n", discovered->usageCount);
@@ -1017,14 +1032,17 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
         const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
         const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
         fprintf(os, "   |- scope       = %s\n", entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);
+        fprintf(os, "   |- protocol    = %s\n", protType);
         if (manager->verbose) {
             fprintf(os, "   |- psa svc id  = %li\n", entry->selectedPsaSvcId);
             fprintf(os, "   |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
+            fprintf(os, "   |- prot svc id = %li\n", entry->selectedProtocolSvcId);
             fprintf(os, "   |- usage count = %i\n", entry->usageCount);
         }
     }
@@ -1044,14 +1062,17 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
         const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
         const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
         fprintf(os, "   |- scope       = %s\n", entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);
+        fprintf(os, "   |- protocol    = %s\n", protType);
         if (manager->verbose) {
             fprintf(os, "    |- psa svc id  = %li\n", entry->selectedPsaSvcId);
             fprintf(os, "    |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
+            fprintf(os, "    |- prot svc id = %li\n", entry->selectedProtocolSvcId);
             fprintf(os, "    |- usage count = %i\n", entry->usageCount);
         }
     }
@@ -1069,9 +1090,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
                 const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)");
                 const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
                 const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)");
+                const char *requestedProt = celix_properties_get(entry->topicProperties, PUBSUB_PROTOCOL_TYPE_KEY, "(None)");
                 fprintf(os, "    |- requested qos        = %s\n", requestedQos);
                 fprintf(os, "    |- requested config     = %s\n", requestedConfig);
                 fprintf(os, "    |- requested serializer = %s\n", requestedSer);
+                fprintf(os, "    |- requested protocol   = %s\n", requestedProt);
                 if (manager->verbose) {
                     fprintf(os, "    |- usage count          = %i\n", entry->usageCount);
                 }
@@ -1092,9 +1115,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
                 const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)");
                 const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
                 const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)");
+                const char *requestedProt = celix_properties_get(entry->topicProperties, PUBSUB_PROTOCOL_TYPE_KEY, "(None)");
                 fprintf(os, "    |- requested qos        = %s\n", requestedQos);
                 fprintf(os, "    |- requested config     = %s\n", requestedConfig);
                 fprintf(os, "    |- requested serializer = %s\n", requestedSer);
+                fprintf(os, "    |- requested protocol   = %s\n", requestedProt);
                 if (manager->verbose) {
                     fprintf(os, "    |- usage count          = %i\n", entry->usageCount);
                 }
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 8e88641..817098e 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -96,6 +96,7 @@ typedef struct pstm_topic_receiver_or_sender_entry {
     int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
     long selectedPsaSvcId;
     long selectedSerializerSvcId;
+    long selectedProtocolSvcId;
     long bndId;
     celix_properties_t *topicProperties; //found in META-INF/(pub|sub)/(topic).properties
 
diff --git a/bundles/pubsub/test/test/loopback_activator.c b/bundles/pubsub/test/test/loopback_activator.c
index ad46879..44aa35b 100644
--- a/bundles/pubsub/test/test/loopback_activator.c
+++ b/bundles/pubsub/test/test/loopback_activator.c
@@ -26,7 +26,7 @@
 #include "pubsub/api.h"
 #include "msg.h"
 
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, celix_properties_t *metadata, bool *release);
 static void sut_pubSet(void *handle, void *service);
 
 struct activator {
@@ -81,7 +81,7 @@ static void sut_pubSet(void *handle, void *service) {
 }
 
 
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void * voidMsg, bool *release) {
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void * voidMsg, celix_properties_t *metadata, bool *release) {
   struct activator *act =handle;
   msg_t *msg = voidMsg;
   msg_t send_msg = *msg;
@@ -90,7 +90,7 @@ static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId
     if (act->count == 0) {
       act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &act->msgId);
     }
-    act->pubSvc->send(act->pubSvc->handle, act->msgId, &send_msg);
+    act->pubSvc->send(act->pubSvc->handle, act->msgId, &send_msg, metadata);
     act->count += 1;
   }
   pthread_mutex_unlock(&act->mutex);
diff --git a/bundles/pubsub/test/test/sut_activator.c b/bundles/pubsub/test/test/sut_activator.c
index bd0da8c..80dc1a9 100644
--- a/bundles/pubsub/test/test/sut_activator.c
+++ b/bundles/pubsub/test/test/sut_activator.c
@@ -93,8 +93,8 @@ static void* sut_sendThread(void *data) {
             if (msgId == 0) {
                 act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
             }
-
-            act->pubSvc->send(act->pubSvc->handle, msgId, &msg);
+            celix_properties_t *metadata = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals);
+            act->pubSvc->send(act->pubSvc->handle, msgId, &msg, metadata);
             if (msg.seqNr % 1000 == 0) {
                 printf("Send %i messages\n", msg.seqNr);
             }
diff --git a/bundles/pubsub/test/test/sut_endpoint_activator.c b/bundles/pubsub/test/test/sut_endpoint_activator.c
index 4140ae8..a9e6520 100644
--- a/bundles/pubsub/test/test/sut_endpoint_activator.c
+++ b/bundles/pubsub/test/test/sut_endpoint_activator.c
@@ -94,7 +94,8 @@ static void* sut_sendThread(void *data) {
 		        act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
 		    }
 
-			act->pubSvc->send(act->pubSvc->handle, msgId, &msg);
+            celix_properties_t *metadata = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals);
+            act->pubSvc->send(act->pubSvc->handle, msgId, &msg, metadata);
             if (msg.seqNr % 1000 == 0) {
                 printf("Send %i messages\n", msg.seqNr);
             }
diff --git a/bundles/pubsub/test/test/tst_activator.c b/bundles/pubsub/test/test/tst_activator.c
index 3a8bcce..40900c9 100644
--- a/bundles/pubsub/test/test/tst_activator.c
+++ b/bundles/pubsub/test/test/tst_activator.c
@@ -28,7 +28,7 @@
 #include "msg.h"
 #include "receive_count_service.h"
 
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, celix_properties_t *metadata, bool *release);
 static size_t tst_count(void *handle);
 
 struct activator {
@@ -72,7 +72,7 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
 CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
 
 
-static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, bool *release  __attribute__((unused))) {
+static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, celix_properties_t *metadata  __attribute__((unused)), bool *release  __attribute__((unused))) {
     struct activator *act = handle;
 
     msg_t *msg = voidMsg;
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.c b/bundles/pubsub/test/test/tst_endpoint_activator.c
index 8e501e4..ef3ce2f 100644
--- a/bundles/pubsub/test/test/tst_endpoint_activator.c
+++ b/bundles/pubsub/test/test/tst_endpoint_activator.c
@@ -29,7 +29,7 @@
 #include "receive_count_service.h"
 
 
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, celix_properties_t *metadata, bool *release);
 static size_t tst_count(void *handle);
 
 struct activator {
@@ -75,7 +75,7 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
 CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
 
 
-static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, bool *release  __attribute__((unused))) {
+static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, celix_properties_t *metadata  __attribute__((unused)), bool *release  __attribute__((unused))) {
     struct activator *act = handle;
 
     msg_t *msg = voidMsg;
diff --git a/libs/utils/include/celix_properties.h b/libs/utils/include/celix_properties.h
index 5648a7c..7c0a9ea 100644
--- a/libs/utils/include/celix_properties.h
+++ b/libs/utils/include/celix_properties.h
@@ -56,6 +56,8 @@ const char* celix_properties_get(const celix_properties_t *properties, const cha
 
 void celix_properties_set(celix_properties_t *properties, const char *key, const char *value);
 
+void celix_properties_setWithoutCopy(celix_properties_t *properties, char *key, char *value);
+
 void celix_properties_unset(celix_properties_t *properties, const char *key);
 
 celix_properties_t* celix_properties_copy(const celix_properties_t *properties);
diff --git a/libs/utils/src/properties.c b/libs/utils/src/properties.c
index b28b5c1..67f164b 100644
--- a/libs/utils/src/properties.c
+++ b/libs/utils/src/properties.c
@@ -382,6 +382,21 @@ void celix_properties_set(celix_properties_t *properties, const char *key, const
     }
 }
 
+void celix_properties_setWithoutCopy(celix_properties_t *properties, char *key, char *value) {
+    if (properties != NULL) {
+        hash_map_entry_pt entry = hashMap_getEntry(properties, key);
+        char *oldVal = NULL;
+        if (entry != NULL) {
+            char *oldKey = hashMapEntry_getKey(entry);
+            oldVal = hashMapEntry_getValue(entry);
+            hashMap_put(properties, oldKey, value);
+        } else {
+            hashMap_put(properties, key, value);
+        }
+        free(oldVal);
+    }
+}
+
 void celix_properties_unset(celix_properties_t *properties, const char *key) {
     char* oldValue = hashMap_remove(properties, key);
     free(oldValue);