You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/23 15:20:36 UTC

[celix] branch master updated: Feature/proposal protocol footer (#253)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/master by this push:
     new 01dc53e  Feature/proposal protocol footer (#253)
01dc53e is described below

commit 01dc53ea7c4731f5709d0c9d6fae0e254749aa63
Author: rbulter <ro...@gmail.com>
AuthorDate: Tue Jun 23 17:20:25 2020 +0200

    Feature/proposal protocol footer (#253)
    
    * adds wire_v2 protocol with footer
    
    
    Co-authored-by: Pepijn Noltes <pe...@gmail.com>
---
 bundles/pubsub/CMakeLists.txt                      |   2 +-
 .../src/pubsub_psa_tcp_constants.h                 |   1 -
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      |  71 +++--
 .../src/pubsub_tcp_topic_receiver.c                |   6 +-
 .../src/pubsub_zmq_topic_receiver.c                |  10 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c |  47 ++-
 .../gtest => pubsub_protocol}/CMakeLists.txt       |  17 +-
 .../pubsub_protocol_lib}/CMakeLists.txt            |  20 +-
 .../include/pubsub_wire_protocol_common.h          |  66 +++++
 .../src/pubsub_wire_protocol_common.c}             | 323 ++++++++-------------
 .../pubsub_protocol_wire_v1/CMakeLists.txt         |   5 +-
 .../pubsub_protocol_wire_v1/gtest/CMakeLists.txt   |   0
 .../gtest/src/PS_WP_tests.cc                       |  38 ++-
 .../pubsub_protocol_wire_v1/gtest/src/main.cc      |   0
 .../src/ps_wire_protocol_activator.c               |  11 +-
 .../src/pubsub_wire_protocol_impl.c                | 171 +++++++++++
 .../src/pubsub_wire_protocol_impl.h                |  11 +-
 .../pubsub_protocol_wire_v2}/CMakeLists.txt        |  27 +-
 .../pubsub_protocol_wire_v2}/gtest/CMakeLists.txt  |  12 +-
 .../gtest/src/PS_WP_v2_tests.cc                    | 249 ++++++++++++++++
 .../pubsub_protocol_wire_v2}/gtest/src/main.cc     |   0
 .../src/ps_wire_v2_protocol_activator.c}           |  37 +--
 .../src/pubsub_wire_v2_protocol_impl.c             | 205 +++++++++++++
 .../src/pubsub_wire_v2_protocol_impl.h             |  58 ++++
 .../src/pubsub_wire_protocol_common.c              |  72 -----
 .../src/pubsub_wire_protocol_common.h              |  44 ---
 .../pubsub/pubsub_spi/include/pubsub_protocol.h    |  41 +++
 bundles/pubsub/test/CMakeLists.txt                 |  46 ++-
 libs/framework/src/celix_log.c                     |   4 +
 libs/utils/include/celix_byteswap.h                |  51 ++++
 30 files changed, 1205 insertions(+), 440 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 90d762a..e4e78ab 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -46,7 +46,7 @@ if (PUBSUB)
     add_subdirectory(pubsub_discovery)
     add_subdirectory(pubsub_serializer_json)
     add_subdirectory(pubsub_serializer_avrobin)
-    add_subdirectory(pubsub_protocol_wire_v1)
+    add_subdirectory(pubsub_protocol)
     add_subdirectory(keygen)
     add_subdirectory(mock)
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 6026212..302c9f6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -65,7 +65,6 @@
 
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY      "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT  5.0
-#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT  0.0
 
 #define PUBSUB_TCP_PSA_IP_KEY                   "PSA_IP"
 #define PUBSUB_TCP_ADMIN_TYPE                   "tcp"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 91eb97a..3bb31cd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -82,6 +82,7 @@ typedef struct psa_tcp_connection_entry {
     unsigned int headerSize;
     unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
     void *headerBuffer;
+    unsigned int footerSize;
     void *footerBuffer;
     unsigned int bufferSize;
     void *buffer;
@@ -91,7 +92,6 @@ typedef struct psa_tcp_connection_entry {
     struct msghdr msg;
     size_t msg_iovlen;        /* Number of elements in the vector.  */
     unsigned int retryCount;
-    unsigned int seqNr;
 } psa_tcp_connection_entry_t;
 
 //
@@ -331,6 +331,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
         entry->headerBufferSize = size;
         handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
         entry->syncSize = size;
+        handle->protocol->getFooterSize(handle->protocol->handle, &size);
+        entry->footerSize = size;
         entry->bufferSize = handle->bufferSize;
         entry->connected = false;
         entry->msg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX);
@@ -340,8 +342,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
             entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
             entry->msg_iovlen++;
         }
-        entry->footerBuffer = calloc(sizeof(char), entry->headerSize);
-        entry->buffer = calloc(sizeof(char), entry->bufferSize);
+        if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+        if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
         entry->msg_iovlen++;
@@ -825,7 +827,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
 
     // Message buffer is to small, reallocate to make it bigger
     if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
-        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
         if (entry->buffer) free(entry->buffer);
             entry->buffer = malloc((size_t) handle->bufferSize);
             entry->bufferSize = handle->bufferSize;
@@ -836,14 +838,13 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
     if (nbytes > 0) {
         // Check header message buffer
-        if (handle->protocol->decodeHeader(handle->protocol->handle,
-                                           header_buffer,
-                                           entry->headerSize,
-                                           &entry->header) != CELIX_SUCCESS) {
+        if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
             // Did not receive correct header
             // skip sync word and try to read next header
             nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
-            if (!entry->headerError) L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+            if (!entry->headerError) {
+                L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+            }
             entry->headerError = true;
             entry->bufferReadSize = 0;
         } else {
@@ -899,21 +900,21 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
                         L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
                     }
                 }
-                // Check for end of message using, header of next message. Because of streaming protocol
-                // TODO: Add to protocol service to decode/EncodeFooter with unique sync word(different then header)
+                // Check for end of message using, footer of message. Because of streaming protocol
                 if (nbytes > 0) {
-                    pubsub_protocol_message_t header;
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->headerSize, MSG_PEEK);
-                    if (handle->protocol->decodeHeader(handle->protocol->handle,
-                                                 entry->footerBuffer,
-                                                 entry->headerSize,
-                                                 &header) == CELIX_SUCCESS) {
-                        // valid header for next buffer, this means that the message is valid
-                        validMsg = true;
+                    if (entry->footerSize > 0) {
+                        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0);
+                        if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
+                            // valid footer, this means that the message is valid
+                            validMsg = true;
+                        } else {
+                            // Did not receive correct header
+                            L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+                            entry->bufferReadSize = 0;
+                        }
                     } else {
-                        // Did not receive correct header
-                        L_ERROR("[TCP Socket] Failed to decode next message header seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
-                        entry->bufferReadSize = 0;
+                        // No Footer, then complete message is received
+                        validMsg = true;
                     }
                 }
             }
@@ -1037,11 +1038,11 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                     payloadSize += msgIoVec[i].iov_len;
                 }
             }
-
-            message->header.seqNr = entry->seqNr;
+            message->header.convertEndianess = 0;
             message->header.payloadSize = payloadSize;
             message->header.payloadPartSize = payloadSize;
             message->header.payloadOffset = 0;
+            message->header.isLastSegment = 1;
 
             void *metadataData = NULL;
             size_t metadataSize = 0;
@@ -1052,6 +1053,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             }
             message->header.metadataSize = metadataSize;
 
+            void *footerData = NULL;
+            size_t footerDataSize = 0;
+            if (entry->footerSize) {
+                handle->protocol->encodeFooter(handle->protocol->handle, message,
+                                                 &footerData,
+                                                 &footerDataSize);
+            }
+
             size_t msgSize = 0;
             struct msghdr msg;
             struct iovec msg_iov[IOV_MAX];
@@ -1085,6 +1094,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
             }
 
+            // Write optional footerData in vector buffer
+            if (footerData && footerDataSize) {
+                msg.msg_iovlen++;
+                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+            }
+
             void *headerData = NULL;
             size_t headerSize = 0;
             // check if header is not part of the payload (=> headerBufferSize = 0)s
@@ -1128,7 +1145,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             } else if (msgSize) {
                 entry->retryCount = 0;
                 if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", entry->seqNr, msgSize, nbytes,  strerror(errno));
+                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes,  strerror(errno));
                 }
             }
             // Release data
@@ -1142,7 +1159,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             if (metadataData) {
                 free(metadataData);
             }
-            entry->seqNr++;
+            if (footerData) {
+                free(footerData);
+            }
         }
     }
     celixThreadRwlock_unlock(&handle->dbLock);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index e9add2d..eb6afbd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -156,7 +156,6 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
-    bool isEndpoint = false;
     bool isServerEndPoint = false;
 
     /* Check if it's a static endpoint */
@@ -167,7 +166,6 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
         const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
         if (endPointType != NULL) {
-            isEndpoint = true;
             if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
                         strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
                 staticClientEndPointUrls = staticConnectUrls;
@@ -208,9 +206,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
         long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
                                                    PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
-        double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, 
-                                                        (!isEndpoint) ? PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT :
-                                                                        PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT);
+        double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
         long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS,
                                                               PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
         long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 7e0fff3..088474a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -643,13 +643,16 @@ static void* psa_zmq_recvThread(void * data) {
         zmsg_t *zmsg = zmsg_recv(receiver->zmqSock);
         if (zmsg != NULL) {
             if (zmsg_size(zmsg) < 2) {
-                L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata)), got %i frames", (int)zmsg_size(zmsg));
+                L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata) (+ footer)), got %i frames", (int)zmsg_size(zmsg));
             } else {
                 zframe_t *header = zmsg_pop(zmsg); // header
                 zframe_t *payload = NULL;
                 zframe_t *metadata = NULL;
+                zframe_t *footer = NULL;
 
                 pubsub_protocol_message_t message;
+                size_t footerSize = 0;
+                receiver->protocol->getFooterSize(receiver->protocol->handle, &footerSize);
                 receiver->protocol->decodeHeader(receiver->protocol->handle, zframe_data(header), zframe_size(header), &message);
                 if (message.header.payloadSize > 0) {
                     payload = zmsg_pop(zmsg);
@@ -664,6 +667,10 @@ static void* psa_zmq_recvThread(void * data) {
                 } else {
                     message.metadata.metadata = NULL;
                 }
+                if (footerSize > 0) {
+                    footer = zmsg_pop(zmsg); // footer
+                    receiver->protocol->decodeFooter(receiver->protocol->handle, zframe_data(footer), zframe_size(footer), &message);
+                }
                 if (header != NULL && payload != NULL) {
                     struct timespec receiveTime;
                     clock_gettime(CLOCK_REALTIME, &receiveTime);
@@ -673,6 +680,7 @@ static void* psa_zmq_recvThread(void * data) {
                 zframe_destroy(&header);
                 zframe_destroy(&payload);
                 zframe_destroy(&metadata);
+                zframe_destroy(&footer);
             }
             zmsg_destroy(&zmsg);
         } else {
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 91fab55..413f1b3 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -578,21 +578,29 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 if(metadataLength > 1000000) {
                     L_WARN("ERR LARGE METADATA DETECTED\n");
                 }
+                void *footerData = NULL;
+                size_t footerLength = 0;
+                entry->protSer->encodeFooter(entry->protSer->handle, &message, &footerData, &footerLength);
 
                 message.header.msgId = msgTypeId;
+                message.header.seqNr = entry->seqNr;
                 message.header.msgMajorVersion = 0;
                 message.header.msgMinorVersion = 0;
                 message.header.payloadSize = payloadLength;
                 message.header.metadataSize = metadataLength;
                 message.header.payloadPartSize = payloadLength;
                 message.header.payloadOffset = 0;
+                message.header.isLastSegment = 1;
+                message.header.convertEndianess = 0;
+
+                // increase seqNr
+                entry->seqNr++;
 
                 void *headerData = NULL;
                 size_t headerLength = 0;
 
                 entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
 
-
                 errno = 0;
                 bool sendOk;
 
@@ -601,6 +609,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                     zmq_msg_t msg1; // Header
                     zmq_msg_t msg2; // Payload
                     zmq_msg_t msg3; // Metadata
+                    zmq_msg_t msg4; // Footer
                     void *socket = zsock_resolve(sender->zmq.socket);
                     psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
                     freeMsgEntry->msgSer = entry->msgSer;
@@ -617,16 +626,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
                     //send Payload
                     if (rc > 0) {
-                        if(metadataLength > 0) {
-                            zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, NULL);
-                        } else {
-                            zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
-                        }
-                        int flags = 0;
-                        if (metadataLength > 0) {
-                            flags = ZMQ_SNDMORE;
-                        }
-                        rc = zmq_msg_send(&msg2, socket, flags);
+                        int flag = ((metadataLength > 0)  || (footerLength > 0)) ? ZMQ_SNDMORE : 0;
+                        zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+                        rc = zmq_msg_send(&msg2, socket, flag);
                         if (rc == -1) {
                             L_WARN("Error sending payload msg. %s", strerror(errno));
                             zmq_msg_close(&msg2);
@@ -635,13 +637,25 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
                     //send MetaData
                     if (rc > 0 && metadataLength > 0) {
-                        zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, freeMsgEntry);
-                        rc = zmq_msg_send(&msg3, socket, 0);
+                        int flag = (footerLength > 0 ) ? ZMQ_SNDMORE : 0;
+                        zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, NULL);
+                        rc = zmq_msg_send(&msg3, socket, flag);
                         if (rc == -1) {
                             L_WARN("Error sending metadata msg. %s", strerror(errno));
                             zmq_msg_close(&msg3);
                         }
                     }
+
+                    //send Footer
+                    if (rc > 0 && footerLength > 0) {
+                        zmq_msg_init_data(&msg4, footerData, footerLength, psa_zmq_freeMsg, NULL);
+                        rc = zmq_msg_send(&msg4, socket, 0);
+                        if (rc == -1) {
+                            L_WARN("Error sending footer msg. %s", strerror(errno));
+                            zmq_msg_close(&msg4);
+                        }
+                    }
+
                     celixThreadMutex_unlock(&sender->zmq.mutex);
 
                     sendOk = rc > 0;
@@ -653,6 +667,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                     if (metadataLength > 0) {
                         zmsg_addmem(msg, metadataData, metadataLength);
                     }
+                    if (footerLength > 0) {
+                        zmsg_addmem(msg, footerData, footerLength);
+                    }
                     celixThreadMutex_lock(&sender->zmq.mutex);
                     int rc = zmsg_send(&msg, sender->zmq.socket);
                     celixThreadMutex_unlock(&sender->zmq.mutex);
@@ -672,8 +689,10 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                     if (metadataData) {
                         free(metadataData);
                     }
+                    if (footerData) {
+                        free(footerData);
+                    }
                 }
-
                 pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
 
                 if (message.metadata.metadata) {
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/CMakeLists.txt
similarity index 61%
copy from bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
copy to bundles/pubsub/pubsub_protocol/CMakeLists.txt
index f86a18e..ad935df 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/CMakeLists.txt
@@ -5,9 +5,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-#
+# 
 #   http://www.apache.org/licenses/LICENSE-2.0
-#
+# 
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -15,14 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-set(SOURCES
-        src/main.cc
-        src/PS_WP_tests.cc
-    )
-add_executable(celix_pswp_tests ${SOURCES})
-#target_include_directories(celix_cxx_pswp_tests SYSTEM PRIVATE gtest)
-target_include_directories(celix_pswp_tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
-target_link_libraries(celix_pswp_tests PRIVATE celix_wire_protocol_v1_impl GTest::gtest Celix::pubsub_spi)
 
-add_test(NAME celix_pswp_tests COMMAND celix_pswp_tests)
-setup_target_for_coverage(celix_pswp_tests SCAN_DIR ..)
\ No newline at end of file
+add_subdirectory(pubsub_protocol_lib)
+add_subdirectory(pubsub_protocol_wire_v1)
+add_subdirectory(pubsub_protocol_wire_v2)
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/CMakeLists.txt
similarity index 61%
copy from bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/CMakeLists.txt
index f86a18e..14fea70 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/CMakeLists.txt
@@ -5,9 +5,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-#
+# 
 #   http://www.apache.org/licenses/LICENSE-2.0
-#
+# 
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -15,14 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-set(SOURCES
-        src/main.cc
-        src/PS_WP_tests.cc
-    )
-add_executable(celix_pswp_tests ${SOURCES})
-#target_include_directories(celix_cxx_pswp_tests SYSTEM PRIVATE gtest)
-target_include_directories(celix_pswp_tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
-target_link_libraries(celix_pswp_tests PRIVATE celix_wire_protocol_v1_impl GTest::gtest Celix::pubsub_spi)
-
-add_test(NAME celix_pswp_tests COMMAND celix_pswp_tests)
-setup_target_for_coverage(celix_pswp_tests SCAN_DIR ..)
\ No newline at end of file
+add_library(celix_pubsub_protocol_lib STATIC
+        src/pubsub_wire_protocol_common.c
+)
+target_link_libraries(celix_pubsub_protocol_lib PUBLIC Celix::pubsub_spi)
+target_include_directories(celix_pubsub_protocol_lib PUBLIC include)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/include/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/include/pubsub_wire_protocol_common.h
new file mode 100644
index 0000000..7dd1ef2
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/include/pubsub_wire_protocol_common.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_PROTOCOL_COMMON_H
+#define CELIX_PUBSUB_PROTOCOL_COMMON_H
+
+#include <stdint.h>
+
+#include "celix_errno.h"
+#include "pubsub_protocol.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define NETSTRING_ERROR_TOO_LONG     -1
+#define NETSTRING_ERROR_NO_COLON     -2
+#define NETSTRING_ERROR_TOO_SHORT    -3
+#define NETSTRING_ERROR_NO_COMMA     -4
+#define NETSTRING_ERROR_LEADING_ZERO -5
+#define NETSTRING_ERROR_NO_LENGTH    -6
+
+static const unsigned int PROTOCOL_WIRE_V1_SYNC_HEADER = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_V1_ENVELOPE_VERSION = 1;
+
+static const unsigned int PROTOCOL_WIRE_V2_SYNC_HEADER = 0xABBADEAF;
+static const unsigned int PROTOCOL_WIRE_V2_SYNC_FOOTER = 0xDEAFABBA;
+static const unsigned int PROTOCOL_WIRE_V2_ENVELOPE_VERSION = 2;
+
+int pubsubProtocol_readChar(const unsigned char *data, int offset, uint8_t *val);
+int pubsubProtocol_readShort(const unsigned char *data, int offset, uint32_t convert, uint16_t *val);
+int pubsubProtocol_readInt(const unsigned char *data, int offset, uint32_t convert, uint32_t *val);
+int pubsubProtocol_readLong(const unsigned char *data, int offset, uint32_t convert, uint64_t *val);
+
+int pubsubProtocol_writeChar(unsigned char *data, int offset, uint8_t val);
+int pubsubProtocol_writeShort(unsigned char *data, int offset, uint32_t convert, uint16_t val);
+int pubsubProtocol_writeInt(unsigned char *data, int offset, uint32_t convert, uint32_t val);
+int pubsubProtocol_writeLong(unsigned char *data, int offset, uint32_t convert, uint64_t val);
+
+celix_status_t pubsubProtocol_encodePayload(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodeMetadata(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_decodePayload(void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodeMetadata(void *data, size_t length, pubsub_protocol_message_t *message);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //CELIX_PUBSUB_PROTOCOL_COMMON_H
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c
similarity index 59%
rename from bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c
index 5e265d9..7055db9 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c
@@ -17,114 +17,164 @@
  * under the License.
  */
 
-#include <stdio.h>
+#include "pubsub_wire_protocol_common.h"
+
 #include <stdlib.h>
-#include <math.h>
 #include <string.h>
+#include <math.h>
+#include <ctype.h>
+#include "celix_byteswap.h"
 
-#include "utils.h"
-#include "celix_properties.h"
-
-#include "pubsub_wire_protocol_impl.h"
-#include "pubsub_wire_protocol_common.h"
+static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) {
+    celix_status_t status = CELIX_SUCCESS;
 
-#define NETSTRING_ERROR_TOO_LONG     -1
-#define NETSTRING_ERROR_NO_COLON     -2
-#define NETSTRING_ERROR_TOO_SHORT    -3
-#define NETSTRING_ERROR_NO_COMMA     -4
-#define NETSTRING_ERROR_LEADING_ZERO -5
-#define NETSTRING_ERROR_NO_LENGTH    -6
+    size_t str_len = strlen(string);
+    if (str_len == 0) {
+        // 0:,
+        *netstringOut = calloc(1, 4);
+        if (*netstringOut == NULL) {
+            status = CELIX_ENOMEM;
+        } else {
+            *netstringOut[0] = '0';
+            *netstringOut[1] = ':';
+            *netstringOut[2] = ',';
+            *netstringOut[3] = '\0';
+        }
+    } else {
+        size_t numlen = ceil(log10(str_len + 1));
+        *netstringOut = calloc(1, numlen + str_len + 3);
+        if (*netstringOut == NULL) {
+            status = CELIX_ENOMEM;
+        } else {
+            sprintf(*netstringOut, "%zu:%s,", str_len, string);
+        }
+    }
 
-struct pubsub_protocol_wire_v1 {
-};
+    return status;
+}
 
-static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut);
+/* Reads a netstring from a `buffer` of length `buffer_length`. Writes
+   to `netstring_start` a pointer to the beginning of the string in
+   the buffer, and to `netstring_length` the length of the
+   string. Does not allocate any memory. If it reads successfully,
+   then it returns 0. If there is an error, then the return value will
+   be negative. The error values are:
+   NETSTRING_ERROR_TOO_LONG      More than 999999999 bytes in a field
+   NETSTRING_ERROR_NO_COLON      No colon was found after the number
+   NETSTRING_ERROR_TOO_SHORT     Number of bytes greater than buffer length
+   NETSTRING_ERROR_NO_COMMA      No comma was found at the end
+   NETSTRING_ERROR_LEADING_ZERO  Leading zeros are not allowed
+   NETSTRING_ERROR_NO_LENGTH     Length not given at start of netstring
+   If you're sending messages with more than 999999999 bytes -- about
+   2 GB -- then you probably should not be doing so in the form of a
+   single netstring. This restriction is in place partially to protect
+   from malicious or erroneous input, and partly to be compatible with
+   D. J. Bernstein's reference implementation.
+   Example:
+      if (netstring_read("3:foo,", 6, &str, &len) < 0) explode_and_die();
+ */
 static int pubsubProtocol_parseNetstring(unsigned char *buffer, size_t buffer_length,
-                                  unsigned char **netstring_start, size_t *netstring_length);
+                                         unsigned char **netstring_start, size_t *netstring_length) {
+    int i;
+    size_t len = 0;
 
-celix_status_t pubsubProtocol_create(pubsub_protocol_wire_v1_t **protocol) {
-    celix_status_t status = CELIX_SUCCESS;
+    /* Write default values for outputs */
+    *netstring_start = NULL; *netstring_length = 0;
 
-    *protocol = calloc(1, sizeof(**protocol));
+    /* Make sure buffer is big enough. Minimum size is 3. */
+    if (buffer_length < 3) return NETSTRING_ERROR_TOO_SHORT;
 
-    if (!*protocol) {
-        status = CELIX_ENOMEM;
-    }
-    else {
-        //
+    /* No leading zeros allowed! */
+    if (buffer[0] == '0' && isdigit(buffer[1]))
+        return NETSTRING_ERROR_LEADING_ZERO;
+
+    /* The netstring must start with a number */
+    if (!isdigit(buffer[0])) return NETSTRING_ERROR_NO_LENGTH;
+
+    /* Read the number of bytes */
+    for (i = 0; i < buffer_length && isdigit(buffer[i]); i++) {
+        /* Error if more than 9 digits */
+        if (i >= 9) return NETSTRING_ERROR_TOO_LONG;
+        /* Accumulate each digit, assuming ASCII. */
+        len = len*10 + (buffer[i] - '0');
     }
 
-    return status;
-}
+    /* Check buffer length once and for all. Specifically, we make sure
+       that the buffer is longer than the number we've read, the length
+       of the string itself, and the colon and comma. */
+    if (i + len + 1 >= buffer_length) return NETSTRING_ERROR_TOO_SHORT;
 
-celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) {
-    celix_status_t status = CELIX_SUCCESS;
+    /* Read the colon */
+    if (buffer[i++] != ':') return NETSTRING_ERROR_NO_COLON;
 
-    free(protocol);
+    /* Test for the trailing comma, and set the return values */
+    if (buffer[i + len] != ',') return NETSTRING_ERROR_NO_COMMA;
+    *netstring_start = &buffer[i]; *netstring_length = len;
 
-    return status;
+    return 0;
 }
 
-celix_status_t pubsubProtocol_getHeaderSize(void* handle, size_t *length) {
-    *length = sizeof(int) * 5 + sizeof(short) * 2; // header + sync + version = 24
+int pubsubProtocol_readChar(const unsigned char *data, int offset, uint8_t *val) {
+    memcpy(val, data + offset, sizeof(uint8_t));
+    *val = *val;
+    return offset + sizeof(uint16_t);
+}
 
-    return CELIX_SUCCESS;
+int pubsubProtocol_readShort(const unsigned char *data, int offset, uint32_t convert, uint16_t *val) {
+    memcpy(val, data + offset, sizeof(uint16_t));
+    if (convert) {
+        *val = bswap_16(*val);
+    }
+    return offset + sizeof(uint16_t);
 }
 
-celix_status_t pubsubProtocol_getHeaderBufferSize(void* handle, size_t *length) {
-    return pubsubProtocol_getHeaderSize(handle, length);
+int pubsubProtocol_readInt(const unsigned char *data, int offset, uint32_t convert, uint32_t *val) {
+    memcpy(val, data + offset, sizeof(uint32_t));
+    if (convert) {
+        *val = bswap_32(*val);
+    }
+    return offset + sizeof(uint32_t);
 }
 
-celix_status_t pubsubProtocol_getSyncHeaderSize(void* handle,  size_t *length) {
-    *length = sizeof(int);
-    return CELIX_SUCCESS;
+int pubsubProtocol_readLong(const unsigned char *data, int offset, uint32_t convert, uint64_t *val) {
+    memcpy(val, data + offset, sizeof(uint64_t));
+    if (convert) {
+        *val = bswap_64(*val);
+    }
+    return offset + sizeof(uint64_t);
 }
 
-celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
-    writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC);
-    return CELIX_SUCCESS;
+int pubsubProtocol_writeChar(unsigned char *data, int offset, uint8_t val) {
+    memcpy(data + offset, &val, sizeof(uint8_t));
+    return offset + sizeof(uint8_t);
 }
 
-celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported) {
-    *isSupported = false;
-    return CELIX_SUCCESS;
+int pubsubProtocol_writeShort(unsigned char *data, int offset, uint32_t convert, uint16_t val) {
+    uint16_t nVal = (convert) ? bswap_16(val) : val;
+    memcpy(data + offset, &nVal, sizeof(uint16_t));
+    return offset + sizeof(uint16_t);
 }
-celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
-    celix_status_t status = CELIX_SUCCESS;
-    // Get HeaderSize
-    size_t headerSize = 0;
-    pubsubProtocol_getHeaderSize(handle, &headerSize);
 
-    if (*outBuffer == NULL) {
-        *outBuffer = calloc(1, headerSize);
-        *outLength = headerSize;
-    }
-    if (*outBuffer == NULL) {
-        status = CELIX_ENOMEM;
-    } else {
-        int idx = 0;
-        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC);
-        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_ENVELOPE_VERSION);
-        idx = writeInt(*outBuffer, idx, message->header.msgId);
-        idx = writeShort(*outBuffer, idx, message->header.msgMajorVersion);
-        idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
-        idx = writeInt(*outBuffer, idx, message->header.payloadSize);
-        idx = writeInt(*outBuffer, idx, message->header.metadataSize);
-
-        *outLength = idx;
-    }
+int pubsubProtocol_writeInt(unsigned char *data, int offset, uint32_t convert, uint32_t val) {
+    uint32_t nVal = (convert) ? bswap_32(val)  : val;
+    memcpy(data + offset, &nVal, sizeof(uint32_t));
+    return offset + sizeof(uint32_t);
+}
 
-    return status;
+int pubsubProtocol_writeLong(unsigned char *data, int offset, uint32_t convert, uint64_t val) {
+    uint64_t nVal = (convert) ? bswap_64(val) : val;
+    memcpy(data + offset, &nVal, sizeof(uint64_t));
+    return offset + sizeof(uint64_t);
 }
 
-celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+celix_status_t pubsubProtocol_encodePayload(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
     *outBuffer = message->payload.payload;
     *outLength = message->payload.length;
 
     return CELIX_SUCCESS;
 }
 
-celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+celix_status_t pubsubProtocol_encodeMetadata(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
     celix_status_t status = CELIX_SUCCESS;
 
     unsigned char *line = calloc(1, 4);
@@ -167,7 +217,7 @@ celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_messa
         }
     }
     int size = celix_properties_size(message->metadata.metadata);
-    writeInt((unsigned char *) line, 0, size);
+    pubsubProtocol_writeInt((unsigned char *) line, 0, true, size);
 
     *outBuffer = line;
     *outLength = idx;
@@ -175,52 +225,18 @@ celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_messa
     return status;
 }
 
-celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    int idx = 0;
-    size_t headerSize = 0;
-    pubsubProtocol_getHeaderSize(handle, &headerSize);
-    if (length == headerSize) {
-        unsigned int sync;
-        idx = readInt(data, idx, &sync);
-        if (sync != PROTOCOL_WIRE_SYNC) {
-            status = CELIX_ILLEGAL_ARGUMENT;
-        } else {
-            unsigned int envelopeVersion;
-            idx = readInt(data, idx, &envelopeVersion);
-            if (envelopeVersion != PROTOCOL_WIRE_ENVELOPE_VERSION) {
-                status = CELIX_ILLEGAL_ARGUMENT;
-            } else {
-                idx = readInt(data, idx, &message->header.msgId);
-                idx = readShort(data, idx, &message->header.msgMajorVersion);
-                idx = readShort(data, idx, &message->header.msgMinorVersion);
-                idx = readInt(data, idx, &message->header.payloadSize);
-                readInt(data, idx, &message->header.metadataSize);
-                // Set message segmentation parameters to defaults
-                message->header.seqNr           = 0;
-                message->header.payloadPartSize = message->header.payloadSize;
-                message->header.payloadOffset   = 0;
-            }
-        }
-    } else {
-        status = CELIX_ILLEGAL_ARGUMENT;
-    }
-    return status;
-}
-
-celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message){
+celix_status_t pubsubProtocol_decodePayload(void *data, size_t length, pubsub_protocol_message_t *message){
     message->payload.payload = data;
     message->payload.length = length;
 
     return CELIX_SUCCESS;
 }
 
-celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+celix_status_t pubsubProtocol_decodeMetadata(void *data, size_t length, pubsub_protocol_message_t *message) {
     celix_status_t status = CELIX_SUCCESS;
 
     uint32_t nOfElements;
-    size_t idx = readInt(data, 0, &nOfElements);
+    size_t idx = pubsubProtocol_readInt(data, 0, true, &nOfElements);
     unsigned char *netstring = data + idx;
     int netstringLen = length - idx;
 
@@ -251,93 +267,4 @@ celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t le
     }
 
     return status;
-}
-
-static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    size_t str_len = strlen(string);
-    if (str_len == 0) {
-        // 0:,
-        *netstringOut = calloc(1, 4);
-        if (*netstringOut == NULL) {
-            status = CELIX_ENOMEM;
-        } else {
-            *netstringOut[0] = '0';
-            *netstringOut[1] = ':';
-            *netstringOut[2] = ',';
-            *netstringOut[3] = '\0';
-        }
-    } else {
-        size_t numlen = ceil(log10(str_len + 1));
-        *netstringOut = calloc(1, numlen + str_len + 3);
-        if (*netstringOut == NULL) {
-            status = CELIX_ENOMEM;
-        } else {
-            sprintf(*netstringOut, "%zu:%s,", str_len, string);
-        }
-    }
-
-    return status;
-}
-
-/* Reads a netstring from a `buffer` of length `buffer_length`. Writes
-   to `netstring_start` a pointer to the beginning of the string in
-   the buffer, and to `netstring_length` the length of the
-   string. Does not allocate any memory. If it reads successfully,
-   then it returns 0. If there is an error, then the return value will
-   be negative. The error values are:
-   NETSTRING_ERROR_TOO_LONG      More than 999999999 bytes in a field
-   NETSTRING_ERROR_NO_COLON      No colon was found after the number
-   NETSTRING_ERROR_TOO_SHORT     Number of bytes greater than buffer length
-   NETSTRING_ERROR_NO_COMMA      No comma was found at the end
-   NETSTRING_ERROR_LEADING_ZERO  Leading zeros are not allowed
-   NETSTRING_ERROR_NO_LENGTH     Length not given at start of netstring
-   If you're sending messages with more than 999999999 bytes -- about
-   2 GB -- then you probably should not be doing so in the form of a
-   single netstring. This restriction is in place partially to protect
-   from malicious or erroneous input, and partly to be compatible with
-   D. J. Bernstein's reference implementation.
-   Example:
-      if (netstring_read("3:foo,", 6, &str, &len) < 0) explode_and_die();
- */
-static int pubsubProtocol_parseNetstring(unsigned char *buffer, size_t buffer_length,
-                   unsigned char **netstring_start, size_t *netstring_length) {
-    int i;
-    size_t len = 0;
-
-    /* Write default values for outputs */
-    *netstring_start = NULL; *netstring_length = 0;
-
-    /* Make sure buffer is big enough. Minimum size is 3. */
-    if (buffer_length < 3) return NETSTRING_ERROR_TOO_SHORT;
-
-    /* No leading zeros allowed! */
-    if (buffer[0] == '0' && isdigit(buffer[1]))
-        return NETSTRING_ERROR_LEADING_ZERO;
-
-    /* The netstring must start with a number */
-    if (!isdigit(buffer[0])) return NETSTRING_ERROR_NO_LENGTH;
-
-    /* Read the number of bytes */
-    for (i = 0; i < buffer_length && isdigit(buffer[i]); i++) {
-        /* Error if more than 9 digits */
-        if (i >= 9) return NETSTRING_ERROR_TOO_LONG;
-        /* Accumulate each digit, assuming ASCII. */
-        len = len*10 + (buffer[i] - '0');
-    }
-
-    /* Check buffer length once and for all. Specifically, we make sure
-       that the buffer is longer than the number we've read, the length
-       of the string itself, and the colon and comma. */
-    if (i + len + 1 >= buffer_length) return NETSTRING_ERROR_TOO_SHORT;
-
-    /* Read the colon */
-    if (buffer[i++] != ':') return NETSTRING_ERROR_NO_COLON;
-
-    /* Test for the trailing comma, and set the return values */
-    if (buffer[i + len] != ',') return NETSTRING_ERROR_NO_COMMA;
-    *netstring_start = &buffer[i]; *netstring_length = len;
-
-    return 0;
 }
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt
similarity index 90%
copy from bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt
index 793ade8..d8a1ef8 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt
@@ -17,11 +17,10 @@
 
 add_library(celix_wire_protocol_v1_impl STATIC
     src/pubsub_wire_protocol_impl.c
-    src/pubsub_wire_protocol_common.c
 )
 target_include_directories(celix_wire_protocol_v1_impl PRIVATE src)
-target_link_libraries(celix_wire_protocol_v1_impl PRIVATE Celix::pubsub_spi )
-
+target_link_libraries(celix_wire_protocol_v1_impl PUBLIC Celix::pubsub_spi)
+target_link_libraries(celix_wire_protocol_v1_impl PUBLIC celix_pubsub_protocol_lib)
 
 add_celix_bundle(celix_pubsub_protocol_wire_v1
     BUNDLE_SYMBOLICNAME "apache_celix_pubsub_protocol_wire_v1"
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
similarity index 100%
copy from bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
similarity index 85%
rename from bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
index 5b9f8a0..c39604a 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
@@ -171,7 +171,7 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_EncodeMetadata_Test) { // NOLINT(c
 
     void *data = nullptr;
     size_t length = 0;
-    celix_status_t status = pubsubProtocol_encodeMetadata(nullptr, &message, &data, &length);
+    celix_status_t status = pubsubProtocol_v1_encodeMetadata(nullptr, &message, &data, &length);
 
     unsigned char exp[12];
     uint32_t s = htonl(1);
@@ -199,7 +199,7 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_Test) { // NOLINT(c
     memcpy(exp + 4, "1:a,1:b,", 8);
 
     pubsub_protocol_message_t message;
-    celix_status_t status = pubsubProtocol_decodeMetadata(nullptr, exp, 12, &message);
+    celix_status_t status = pubsubProtocol_v1_decodeMetadata(nullptr, exp, 12, &message);
 
     ASSERT_EQ(status, CELIX_SUCCESS);
     ASSERT_EQ(1, celix_properties_size(message.metadata.metadata));
@@ -221,7 +221,7 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_EmptyKey_Test) { //
     memcpy(exp + 4, "0:,1:b,", 7);
 
     pubsub_protocol_message_t message;
-    celix_status_t status = pubsubProtocol_decodeMetadata(nullptr, exp, 11, &message);
+    celix_status_t status = pubsubProtocol_v1_decodeMetadata(nullptr, exp, 11, &message);
 
     ASSERT_EQ(status, CELIX_SUCCESS);
     ASSERT_EQ(1, celix_properties_size(message.metadata.metadata));
@@ -242,7 +242,7 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_SpecialChars_Test)
     memcpy(exp + 4, "4:a,:l,1:b,", 11);
 
     pubsub_protocol_message_t message;
-    celix_status_t status = pubsubProtocol_decodeMetadata(nullptr, &exp, 15, &message);
+    celix_status_t status = pubsubProtocol_v1_decodeMetadata(nullptr, &exp, 15, &message);
 
     ASSERT_EQ(status, CELIX_SUCCESS);
     ASSERT_EQ(1, celix_properties_size(message.metadata.metadata));
@@ -253,3 +253,33 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_SpecialChars_Test)
     pubsubProtocol_destroy(wireprotocol);
 }
 
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_EncodeFooter_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v1_t *wireprotocol;
+    pubsubProtocol_create(&wireprotocol);
+
+    pubsub_protocol_message_t message;
+
+    void *footerData = nullptr;
+    size_t footerLength = 0;
+    celix_status_t status = pubsubProtocol_encodeFooter(nullptr, &message, &footerData, &footerLength);
+
+    ASSERT_EQ(status, CELIX_SUCCESS);
+    ASSERT_EQ(0, footerLength);
+    ASSERT_EQ(nullptr, footerData);
+    pubsubProtocol_destroy(wireprotocol);
+    free(footerData);
+}
+
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeFooter_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v1_t *wireprotocol;
+    pubsubProtocol_create(&wireprotocol);
+
+    unsigned char exp[4];
+    uint32_t s = 0xBAABABBA;
+    memcpy(exp, &s, sizeof(uint32_t));
+    pubsub_protocol_message_t message;
+
+    celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message);
+    ASSERT_EQ(CELIX_SUCCESS, status);
+    pubsubProtocol_destroy(wireprotocol);
+}
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/main.cc b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/main.cc
similarity index 100%
copy from bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/main.cc
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/main.cc
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
similarity index 83%
copy from bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
index 359ea8f..3aff467 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
@@ -43,15 +43,18 @@ static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
         act->protocolSvc.getHeaderBufferSize = pubsubProtocol_getHeaderBufferSize;
         act->protocolSvc.getSyncHeaderSize = pubsubProtocol_getSyncHeaderSize;
         act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader;
+        act->protocolSvc.getFooterSize = pubsubProtocol_getFooterSize;
         act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_isMessageSegmentationSupported;
         
         act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader;
-        act->protocolSvc.encodePayload = pubsubProtocol_encodePayload;
-        act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata;
+        act->protocolSvc.encodePayload = pubsubProtocol_v1_encodePayload;
+        act->protocolSvc.encodeMetadata = pubsubProtocol_v1_encodeMetadata;
+        act->protocolSvc.encodeFooter = pubsubProtocol_encodeFooter;
 
         act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader;
-        act->protocolSvc.decodePayload = pubsubProtocol_decodePayload;
-        act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata;
+        act->protocolSvc.decodePayload = pubsubProtocol_v1_decodePayload;
+        act->protocolSvc.decodeMetadata = pubsubProtocol_v1_decodeMetadata;
+        act->protocolSvc.decodeFooter = pubsubProtocol_decodeFooter;
 
         act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props);
     }
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
new file mode 100644
index 0000000..6bd79d0
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <math.h>
+#include <string.h>
+
+#include "utils.h"
+#include "celix_properties.h"
+
+#include "pubsub_wire_protocol_impl.h"
+#include "pubsub_wire_protocol_common.h"
+
+#define BYTESWAP_SYNC true
+
+struct pubsub_protocol_wire_v1 {
+};
+
+celix_status_t pubsubProtocol_create(pubsub_protocol_wire_v1_t **protocol) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *protocol = calloc(1, sizeof(**protocol));
+
+    if (!*protocol) {
+        status = CELIX_ENOMEM;
+    }
+    else {
+        //
+    }
+
+    return status;
+}
+
+celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) {
+    celix_status_t status = CELIX_SUCCESS;
+    free(protocol);
+    return status;
+}
+
+celix_status_t pubsubProtocol_getHeaderSize(void* handle __attribute__((unused)), size_t *length) {
+    *length = sizeof(int) * 5 + sizeof(short) * 2; // header + sync + version = 24
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_getHeaderBufferSize(void* handle, size_t *length) {
+    return pubsubProtocol_getHeaderSize(handle, length);
+}
+
+celix_status_t pubsubProtocol_getSyncHeaderSize(void* handle __attribute__((unused)),  size_t *length) {
+    *length = sizeof(int);
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_getSyncHeader(void* handle __attribute__((unused)), void *syncHeader) {
+    pubsubProtocol_writeInt(syncHeader, 0, true, PROTOCOL_WIRE_V1_SYNC_HEADER);
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_getFooterSize(void* handle __attribute__((unused)),  size_t *length) {
+    *length = 0;
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle __attribute__((unused)), bool *isSupported) {
+    *isSupported = false;
+    return CELIX_SUCCESS;
+}
+celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    celix_status_t status = CELIX_SUCCESS;
+    // Get HeaderSize
+    size_t headerSize = 0;
+    pubsubProtocol_getHeaderSize(handle, &headerSize);
+
+    if (*outBuffer == NULL) {
+        *outBuffer = calloc(1, headerSize);
+        *outLength = headerSize;
+    }
+    if (*outBuffer == NULL) {
+        status = CELIX_ENOMEM;
+    } else {
+        int idx = 0;
+        idx = pubsubProtocol_writeInt(*outBuffer, idx,  BYTESWAP_SYNC, PROTOCOL_WIRE_V1_SYNC_HEADER);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx,  true, PROTOCOL_WIRE_V1_ENVELOPE_VERSION);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx,  true, message->header.msgId);
+        idx = pubsubProtocol_writeShort(*outBuffer, idx, true, message->header.msgMajorVersion);
+        idx = pubsubProtocol_writeShort(*outBuffer, idx, true, message->header.msgMinorVersion);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx,  true, message->header.payloadSize);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx,  true, message->header.metadataSize);
+
+        *outLength = idx;
+    }
+
+    return status;
+}
+
+celix_status_t pubsubProtocol_v1_encodePayload(void *handle __attribute__((unused)), pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    return pubsubProtocol_encodePayload(message, outBuffer, outLength);
+}
+
+celix_status_t pubsubProtocol_v1_encodeMetadata(void *handle __attribute__((unused)), pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    return pubsubProtocol_encodeMetadata(message, outBuffer, outLength);
+}
+
+celix_status_t pubsubProtocol_encodeFooter(void *handle __attribute__((unused)), pubsub_protocol_message_t *message __attribute__((unused)), void **outBuffer, size_t *outLength) {
+    *outBuffer = NULL;
+    return pubsubProtocol_getFooterSize(handle,  outLength);
+}
+
+celix_status_t pubsubProtocol_v1_decodePayload(void* handle __attribute__((unused)), void *data, size_t length, pubsub_protocol_message_t *message){
+    return pubsubProtocol_decodePayload(data, length, message);
+}
+
+celix_status_t pubsubProtocol_v1_decodeMetadata(void* handle __attribute__((unused)), void *data, size_t length, pubsub_protocol_message_t *message) {
+    return pubsubProtocol_decodeMetadata(data, length, message);
+}
+
+celix_status_t pubsubProtocol_decodeFooter(void* handle __attribute__((unused)), void *data __attribute__((unused)), size_t length __attribute__((unused)), pubsub_protocol_message_t *message __attribute__((unused))) {
+    celix_status_t status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsubProtocol_decodeHeader(void *handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    int idx = 0;
+    size_t headerSize = 0;
+    pubsubProtocol_getHeaderSize(handle, &headerSize);
+    if (length == headerSize) {
+        unsigned int sync;
+        idx = pubsubProtocol_readInt(data, idx,  BYTESWAP_SYNC, &sync);
+        if (sync != PROTOCOL_WIRE_V1_SYNC_HEADER) {
+            status = CELIX_ILLEGAL_ARGUMENT;
+        } else {
+            unsigned int envelopeVersion;
+            idx = pubsubProtocol_readInt(data, idx,  true, &envelopeVersion);
+            if (envelopeVersion != PROTOCOL_WIRE_V1_ENVELOPE_VERSION) {
+                status = CELIX_ILLEGAL_ARGUMENT;
+            } else {
+                idx = pubsubProtocol_readInt(data, idx,  true, &message->header.msgId);
+                idx = pubsubProtocol_readShort(data, idx, true, &message->header.msgMajorVersion);
+                idx = pubsubProtocol_readShort(data, idx, true, &message->header.msgMinorVersion);
+                idx = pubsubProtocol_readInt(data, idx,  true, &message->header.payloadSize);
+                pubsubProtocol_readInt(data, idx,  true, &message->header.metadataSize);
+                // Set message segmentation parameters to defaults
+                message->header.seqNr           = 0;
+                message->header.payloadPartSize = message->header.payloadSize;
+                message->header.payloadOffset   = 0;
+            }
+        }
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+    }
+    return status;
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
similarity index 69%
rename from bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
index 06c4dcd..9b7521f 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
@@ -37,15 +37,18 @@ celix_status_t pubsubProtocol_getHeaderSize(void *handle, size_t *length);
 celix_status_t pubsubProtocol_getHeaderBufferSize(void *handle, size_t *length);
 celix_status_t pubsubProtocol_getSyncHeaderSize(void *handle, size_t *length);
 celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader);
+celix_status_t pubsubProtocol_getFooterSize(void* handle,  size_t *length);
 celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported);
 
 celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
-celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
-celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_v1_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_v1_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 
 celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
-celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
-celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_v1_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_v1_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 
 #ifdef __cplusplus
 }
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt
similarity index 53%
rename from bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt
index 793ade8..61324e2 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt
@@ -15,28 +15,27 @@
 # specific language governing permissions and limitations
 # under the License.
 
-add_library(celix_wire_protocol_v1_impl STATIC
-    src/pubsub_wire_protocol_impl.c
-    src/pubsub_wire_protocol_common.c
+add_library(celix_wire_protocol_v2_impl STATIC
+        src/pubsub_wire_v2_protocol_impl.c
 )
-target_include_directories(celix_wire_protocol_v1_impl PRIVATE src)
-target_link_libraries(celix_wire_protocol_v1_impl PRIVATE Celix::pubsub_spi )
+target_include_directories(celix_wire_protocol_v2_impl PRIVATE src)
+target_link_libraries(celix_wire_protocol_v2_impl PUBLIC Celix::pubsub_spi)
+target_link_libraries(celix_wire_protocol_v2_impl PUBLIC celix_pubsub_protocol_lib)
 
-
-add_celix_bundle(celix_pubsub_protocol_wire_v1
-    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_protocol_wire_v1"
+add_celix_bundle(celix_pubsub_protocol_wire_v2
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_protocol_wire_v2"
     VERSION "1.0.0"
     GROUP "Celix/PubSub"
     SOURCES
-        src/ps_wire_protocol_activator.c
+        src/ps_wire_v2_protocol_activator.c
 )
-target_include_directories(celix_pubsub_protocol_wire_v1 PRIVATE src)
-target_link_libraries(celix_pubsub_protocol_wire_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
-target_link_libraries(celix_pubsub_protocol_wire_v1 PRIVATE celix_wire_protocol_v1_impl)
+target_include_directories(celix_pubsub_protocol_wire_v2 PRIVATE src)
+target_link_libraries(celix_pubsub_protocol_wire_v2 PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
+target_link_libraries(celix_pubsub_protocol_wire_v2 PRIVATE celix_wire_protocol_v2_impl)
 
-install_celix_bundle(celix_pubsub_protocol_wire_v1 EXPORT celix COMPONENT pubsub)
+install_celix_bundle(celix_pubsub_protocol_wire_v2 EXPORT celix COMPONENT pubsub)
 
-add_library(Celix::pubsub_protocol_wire_v1 ALIAS celix_pubsub_protocol_wire_v1)
+add_library(Celix::pubsub_protocol_wire_v2 ALIAS celix_pubsub_protocol_wire_v2)
 
 if (ENABLE_TESTING)
     add_subdirectory(gtest)
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/CMakeLists.txt
similarity index 69%
rename from bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/CMakeLists.txt
index f86a18e..4d557a0 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/CMakeLists.txt
@@ -17,12 +17,12 @@
 
 set(SOURCES
         src/main.cc
-        src/PS_WP_tests.cc
+        src/PS_WP_v2_tests.cc
     )
-add_executable(celix_pswp_tests ${SOURCES})
+add_executable(celix_pswp_v2_tests ${SOURCES})
 #target_include_directories(celix_cxx_pswp_tests SYSTEM PRIVATE gtest)
-target_include_directories(celix_pswp_tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
-target_link_libraries(celix_pswp_tests PRIVATE celix_wire_protocol_v1_impl GTest::gtest Celix::pubsub_spi)
+target_include_directories(celix_pswp_v2_tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
+target_link_libraries(celix_pswp_v2_tests PRIVATE celix_wire_protocol_v2_impl GTest::gtest Celix::pubsub_spi)
 
-add_test(NAME celix_pswp_tests COMMAND celix_pswp_tests)
-setup_target_for_coverage(celix_pswp_tests SCAN_DIR ..)
\ No newline at end of file
+add_test(NAME celix_pswp_v2_tests COMMAND celix_pswp_v2_tests)
+setup_target_for_coverage(celix_pswp_v2_tests SCAN_DIR ..)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/PS_WP_v2_tests.cc b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/PS_WP_v2_tests.cc
new file mode 100644
index 0000000..080b6f0
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/PS_WP_v2_tests.cc
@@ -0,0 +1,249 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <sstream>
+#include <arpa/inet.h>
+#include <iostream>
+
+#include <pubsub_wire_protocol_common.h>
+
+#include "gtest/gtest.h"
+
+#include "pubsub_wire_v2_protocol_impl.h"
+#include "celix_byteswap.h"
+#include <cstring>
+
+class WireProtocolV2Test : public ::testing::Test {
+public:
+    WireProtocolV2Test() = default;
+    ~WireProtocolV2Test() override = default;
+
+};
+
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_EncodeHeader_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v2_t *wireprotocol;
+    pubsubProtocol_wire_v2_create(&wireprotocol);
+
+    pubsub_protocol_message_t message;
+    message.header.msgId = 1;
+    message.header.seqNr = 4;
+    message.header.msgMajorVersion = 0;
+    message.header.msgMinorVersion = 0;
+    message.header.payloadSize = 2;
+    message.header.metadataSize = 3;
+    message.header.payloadPartSize = 4;
+    message.header.payloadOffset = 2;
+    message.header.isLastSegment = 1;
+    message.header.convertEndianess = 1;
+
+    void *headerData = nullptr;
+    size_t headerLength = 0;
+    celix_status_t status = pubsubProtocol_wire_v2_encodeHeader(nullptr, &message, &headerData, &headerLength);
+
+    unsigned char exp[40];
+    uint32_t s = bswap_32(0xABBADEAF);
+    memcpy(exp, &s, sizeof(uint32_t));
+    uint32_t e = 0x02000000; //envelope version
+    memcpy(exp+4, &e, sizeof(uint32_t));
+    uint32_t m = 0x01000000; //msg id
+    memcpy(exp+8, &m, sizeof(uint32_t));
+    uint32_t seq = 0x04000000; //seqnr
+    memcpy(exp+12, &seq, sizeof(uint32_t));
+    uint32_t v = 0x00000000;
+    memcpy(exp+16, &v, sizeof(uint32_t));
+    uint32_t ps = 0x02000000;
+    memcpy(exp+20, &ps, sizeof(uint32_t));
+    uint32_t ms = 0x03000000;
+    memcpy(exp+24, &ms, sizeof(uint32_t));
+    uint32_t pps = 0x04000000;
+    memcpy(exp+28, &pps, sizeof(uint32_t));
+    uint32_t ppo = 0x02000000;
+    memcpy(exp+32, &ppo, sizeof(uint32_t));
+    uint32_t ils = 0x01000000;
+    memcpy(exp+36, &ils, sizeof(uint32_t));
+
+    ASSERT_EQ(status, CELIX_SUCCESS);
+    ASSERT_EQ(40, headerLength);
+    for (int i = 0; i < 40; i++) {
+        ASSERT_EQ(((unsigned char*) headerData)[i], exp[i]);
+    }
+
+    pubsubProtocol_wire_v2_destroy(wireprotocol);
+    free(headerData);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeHeader_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v2_t *wireprotocol;
+    pubsubProtocol_wire_v2_create(&wireprotocol);
+
+    unsigned char exp[40];
+    uint32_t s = bswap_32(0xABBADEAF); //sync
+    memcpy(exp, &s, sizeof(uint32_t));
+    uint32_t e = 0x02000000; //envelope version
+    memcpy(exp+4, &e, sizeof(uint32_t));
+    uint32_t m = 0x01000000; //msg id
+    memcpy(exp+8, &m, sizeof(uint32_t));
+    uint32_t seq = 0x08000000; //seqnr
+    memcpy(exp+12, &seq, sizeof(uint32_t));
+    uint32_t v = 0x00000000;
+    memcpy(exp+16, &v, sizeof(uint32_t));
+    uint32_t ps = 0x02000000;
+    memcpy(exp+20, &ps, sizeof(uint32_t));
+    uint32_t ms = 0x03000000;
+    memcpy(exp+24, &ms, sizeof(uint32_t));
+    uint32_t pps = 0x04000000;
+    memcpy(exp+28, &pps, sizeof(uint32_t));
+    uint32_t ppo = 0x02000000;
+    memcpy(exp+32, &ppo, sizeof(uint32_t));
+    uint32_t ils = 0x01000000;
+    memcpy(exp+36, &ils, sizeof(uint32_t));
+
+    pubsub_protocol_message_t message;
+
+    celix_status_t status = pubsubProtocol_wire_v2_decodeHeader(nullptr, exp, 40, &message);
+
+    ASSERT_EQ(CELIX_SUCCESS, status);
+    ASSERT_EQ(1, message.header.msgId);
+    ASSERT_EQ(8, message.header.seqNr);
+    ASSERT_EQ(0, message.header.msgMajorVersion);
+    ASSERT_EQ(0, message.header.msgMinorVersion);
+    ASSERT_EQ(2, message.header.payloadSize);
+    ASSERT_EQ(3, message.header.metadataSize);
+    ASSERT_EQ(4, message.header.payloadPartSize);
+    ASSERT_EQ(2, message.header.payloadOffset);
+    ASSERT_EQ(1, message.header.isLastSegment);
+    ASSERT_EQ(1, message.header.convertEndianess);
+
+    pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeHeader_IncorrectSync_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v2_t *wireprotocol;
+    pubsubProtocol_wire_v2_create(&wireprotocol);
+
+    unsigned char exp[40];
+    uint32_t s = 0xBAABABBA;
+    memcpy(exp, &s, sizeof(uint32_t));
+    uint32_t e = 0x01000000;
+    memcpy(exp+4, &e, sizeof(uint32_t));
+    uint32_t m = 0x01000000;
+    memcpy(exp+8, &m, sizeof(uint32_t));
+    uint32_t seq = 0x08000000;
+    memcpy(exp+12, &seq, sizeof(uint32_t));
+    uint32_t v = 0x00000000;
+    memcpy(exp+16, &v, sizeof(uint32_t));
+    uint32_t ps = 0x02000000;
+    memcpy(exp+20, &ps, sizeof(uint32_t));
+    uint32_t ms = 0x03000000;
+    memcpy(exp+24, &ms, sizeof(uint32_t));
+
+    pubsub_protocol_message_t message;
+
+    celix_status_t status = pubsubProtocol_wire_v2_decodeHeader(nullptr, exp, 40, &message);
+
+    ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
+
+    pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeHeader_IncorrectVersion_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v2_t *wireprotocol;
+    pubsubProtocol_wire_v2_create(&wireprotocol);
+
+    unsigned char exp[40];
+    uint32_t s = 0xABBADEAF;
+    memcpy(exp, &s, sizeof(uint32_t));
+    uint32_t e = 0x02000000;
+    memcpy(exp+4, &e, sizeof(uint32_t));
+    uint32_t m = 0x01000000;
+    memcpy(exp+8, &m, sizeof(uint32_t));
+    uint32_t seq = 0x08000000;
+    memcpy(exp+12, &seq, sizeof(uint32_t));
+    uint32_t v = 0x00000000;
+    memcpy(exp+16, &v, sizeof(uint32_t));
+    uint32_t ps = 0x02000000;
+    memcpy(exp+20, &ps, sizeof(uint32_t));
+    uint32_t ms = 0x03000000;
+    memcpy(exp+24, &ms, sizeof(uint32_t));
+
+    pubsub_protocol_message_t message;
+
+    celix_status_t status = pubsubProtocol_wire_v2_decodeHeader(nullptr, exp, 40, &message);
+
+    ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
+
+    pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_EncodeFooter_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v2_t *wireprotocol;
+    pubsubProtocol_wire_v2_create(&wireprotocol);
+
+    pubsub_protocol_message_t message;
+    message.header.convertEndianess = 0;
+
+    void *footerData = nullptr;
+    size_t footerLength = 0;
+    celix_status_t status = pubsubProtocol_wire_v2_encodeFooter(nullptr, &message, &footerData, &footerLength);
+
+    unsigned char exp[4];
+    uint32_t s = 0xDEAFABBA;
+    memcpy(exp, &s, sizeof(uint32_t));
+    ASSERT_EQ(status, CELIX_SUCCESS);
+    ASSERT_EQ(4, footerLength);
+    for (int i = 0; i < 4; i++) {
+        if (((unsigned char*) footerData)[i] != exp[i]) {
+            std::cerr << "error at index " << std::to_string(i) << std::endl;
+        }
+        ASSERT_EQ(((unsigned char*) footerData)[i], exp[i]);
+    }
+    pubsubProtocol_wire_v2_destroy(wireprotocol);
+    free(footerData);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeFooter_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v2_t *wireprotocol;
+    pubsubProtocol_wire_v2_create(&wireprotocol);
+
+    unsigned char exp[4];
+    uint32_t s = 0xDEAFABBA;
+    memcpy(exp, &s, sizeof(uint32_t));
+    pubsub_protocol_message_t message;
+    message.header.convertEndianess = 0;
+    celix_status_t status = pubsubProtocol_wire_v2_decodeFooter(nullptr, exp, 4, &message);
+
+    ASSERT_EQ(CELIX_SUCCESS, status);
+    pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeFooter_IncorrectSync_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v2_t *wireprotocol;
+    pubsubProtocol_wire_v2_create(&wireprotocol);
+
+    unsigned char exp[4];
+    uint32_t s = 0xABBABAAB;
+    memcpy(exp, &s, sizeof(uint32_t));
+    pubsub_protocol_message_t message;
+
+    celix_status_t status = pubsubProtocol_wire_v2_decodeFooter(nullptr, exp, 4, &message);
+    ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
+
+    pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/main.cc b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/main.cc
similarity index 100%
rename from bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/main.cc
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/main.cc
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/ps_wire_v2_protocol_activator.c
similarity index 57%
rename from bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/ps_wire_v2_protocol_activator.c
index 359ea8f..80e4433 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/ps_wire_v2_protocol_activator.c
@@ -21,10 +21,10 @@
 #include <pubsub_constants.h>
 
 #include "celix_api.h"
-#include "pubsub_wire_protocol_impl.h"
+#include "pubsub_wire_v2_protocol_impl.h"
 
 typedef struct ps_wp_activator {
-    pubsub_protocol_wire_v1_t *wireprotocol;
+    pubsub_protocol_wire_v2_t *wireprotocol;
 
     pubsub_protocol_service_t protocolSvc;
     long wireProtocolSvcId;
@@ -33,25 +33,28 @@ typedef struct ps_wp_activator {
 static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
     act->wireProtocolSvcId = -1L;
 
-    celix_status_t status = pubsubProtocol_create(&(act->wireprotocol));
+    celix_status_t status = pubsubProtocol_wire_v2_create(&(act->wireprotocol));
     if (status == CELIX_SUCCESS) {
         /* Set serializertype */
         celix_properties_t *props = celix_properties_create();
-        celix_properties_set(props, PUBSUB_PROTOCOL_TYPE_KEY, PUBSUB_WIRE_PROTOCOL_TYPE);
+        celix_properties_set(props, PUBSUB_PROTOCOL_TYPE_KEY, PUBSUB_WIRE_V2_PROTOCOL_TYPE);
 
-        act->protocolSvc.getHeaderSize = pubsubProtocol_getHeaderSize;
-        act->protocolSvc.getHeaderBufferSize = pubsubProtocol_getHeaderBufferSize;
-        act->protocolSvc.getSyncHeaderSize = pubsubProtocol_getSyncHeaderSize;
-        act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader;
-        act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_isMessageSegmentationSupported;
-        
-        act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader;
-        act->protocolSvc.encodePayload = pubsubProtocol_encodePayload;
-        act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata;
+        act->protocolSvc.getHeaderSize = pubsubProtocol_wire_v2_getHeaderSize;
+        act->protocolSvc.getHeaderBufferSize = pubsubProtocol_wire_v2_getHeaderBufferSize;
+        act->protocolSvc.getSyncHeaderSize = pubsubProtocol_wire_v2_getSyncHeaderSize;
+        act->protocolSvc.getSyncHeader = pubsubProtocol_wire_v2_getSyncHeader;
+        act->protocolSvc.getFooterSize = pubsubProtocol_wire_v2_getFooterSize;
+        act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_wire_v2_isMessageSegmentationSupported;
 
-        act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader;
-        act->protocolSvc.decodePayload = pubsubProtocol_decodePayload;
-        act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata;
+        act->protocolSvc.encodeHeader = pubsubProtocol_wire_v2_encodeHeader;
+        act->protocolSvc.encodePayload = pubsubProtocol_wire_v2_encodePayload;
+        act->protocolSvc.encodeMetadata = pubsubProtocol_wire_v2_encodeMetadata;
+        act->protocolSvc.encodeFooter = pubsubProtocol_wire_v2_encodeFooter;
+
+        act->protocolSvc.decodeHeader = pubsubProtocol_wire_v2_decodeHeader;
+        act->protocolSvc.decodePayload = pubsubProtocol_wire_v2_decodePayload;
+        act->protocolSvc.decodeMetadata = pubsubProtocol_wire_v2_decodeMetadata;
+        act->protocolSvc.decodeFooter = pubsubProtocol_wire_v2_decodeFooter;
 
         act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props);
     }
@@ -61,7 +64,7 @@ static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
 static int ps_wp_stop(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
     celix_bundleContext_unregisterService(ctx, act->wireProtocolSvcId);
     act->wireProtocolSvcId = -1L;
-    pubsubProtocol_destroy(act->wireprotocol);
+    pubsubProtocol_wire_v2_destroy(act->wireprotocol);
     return CELIX_SUCCESS;
 }
 
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c
new file mode 100644
index 0000000..a05e095
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "celix_properties.h"
+
+#include "pubsub_wire_v2_protocol_impl.h"
+#include "pubsub_wire_protocol_common.h"
+
+struct pubsub_protocol_wire_v2 {
+};
+celix_status_t pubsubProtocol_wire_v2_create(pubsub_protocol_wire_v2_t **protocol) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    *protocol = calloc(1, sizeof(**protocol));
+
+    if (!*protocol) {
+        status = CELIX_ENOMEM;
+    }
+    else {}
+    return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_destroy(pubsub_protocol_wire_v2_t* protocol) {
+    celix_status_t status = CELIX_SUCCESS;
+    free(protocol);
+    return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_getHeaderSize(void* handle, size_t *length) {
+    *length = sizeof(int) * 9 + sizeof(short) * 2; // header + sync + version = 36
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_getHeaderBufferSize(void* handle, size_t *length) {
+    return pubsubProtocol_wire_v2_getHeaderSize(handle, length);
+}
+
+celix_status_t pubsubProtocol_wire_v2_getSyncHeaderSize(void* handle,  size_t *length) {
+    *length = sizeof(int);
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_getSyncHeader(void* handle, void *syncHeader) {
+    pubsubProtocol_writeInt(syncHeader, 0, false, PROTOCOL_WIRE_V2_SYNC_HEADER);
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_getFooterSize(void* handle,  size_t *length) {
+    *length = sizeof(int);;
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_isMessageSegmentationSupported(void* handle, bool *isSupported) {
+    *isSupported = true;
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    celix_status_t status = CELIX_SUCCESS;
+    // Get HeaderSize
+    size_t headerSize = 0;
+    pubsubProtocol_wire_v2_getHeaderSize(handle, &headerSize);
+
+    if (*outBuffer == NULL) {
+        *outBuffer = malloc(headerSize);
+        *outLength = headerSize;
+    }
+    if (*outBuffer == NULL) {
+        status = CELIX_ENOMEM;
+    } else {
+        int idx = 0;
+        unsigned int convert = message->header.convertEndianess;
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, PROTOCOL_WIRE_V2_SYNC_HEADER);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, PROTOCOL_WIRE_V2_ENVELOPE_VERSION);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.msgId);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.seqNr);
+        idx = pubsubProtocol_writeShort(*outBuffer, idx, convert, message->header.msgMajorVersion);
+        idx = pubsubProtocol_writeShort(*outBuffer, idx, convert, message->header.msgMinorVersion);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadSize);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.metadataSize);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadPartSize);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadOffset);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.isLastSegment);
+        *outLength = idx;
+    }
+
+    return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    celix_status_t status = CELIX_SUCCESS;
+    // Get HeaderSize
+    size_t footerSize = 0;
+    pubsubProtocol_wire_v2_getFooterSize(handle, &footerSize);
+
+    if (*outBuffer == NULL) {
+        *outBuffer = malloc(footerSize);
+        *outLength = footerSize;
+    }
+    if (*outBuffer == NULL) {
+        status = CELIX_ENOMEM;
+    } else {
+        int idx = 0;
+        unsigned int convert = message->header.convertEndianess;
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, PROTOCOL_WIRE_V2_SYNC_FOOTER);
+        *outLength = idx;
+    }
+
+    return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    int idx = 0;
+    size_t headerSize = 0;
+    pubsubProtocol_wire_v2_getHeaderSize(handle, &headerSize);
+    if (length == headerSize) {
+        unsigned int sync = 0;
+        unsigned int sync_endianess = 0;
+        idx = pubsubProtocol_readInt(data, idx, false, &sync);
+        pubsubProtocol_readInt(data, 0, true, &sync_endianess);
+        message->header.convertEndianess = (sync_endianess == PROTOCOL_WIRE_V2_SYNC_HEADER) ? true : false;
+        if ((sync != PROTOCOL_WIRE_V2_SYNC_HEADER) && (sync_endianess != PROTOCOL_WIRE_V2_SYNC_HEADER)) {
+            status = CELIX_ILLEGAL_ARGUMENT;
+        } else {
+            unsigned int envelopeVersion;
+            unsigned int convert = message->header.convertEndianess;
+            idx = pubsubProtocol_readInt(data, idx, convert, &envelopeVersion);
+            if (envelopeVersion != PROTOCOL_WIRE_V2_ENVELOPE_VERSION) {
+                fprintf(stderr, "found sync %x and converted sync %x\n", sync, sync_endianess);
+                fprintf(stderr, "wrong envelop version\n");
+                fprintf(stderr, "Got %i, need %i\n", envelopeVersion, PROTOCOL_WIRE_V2_ENVELOPE_VERSION);
+                status = CELIX_ILLEGAL_ARGUMENT;
+            } else {
+                idx = pubsubProtocol_readInt(data, idx, convert, &message->header.msgId);
+                idx = pubsubProtocol_readInt(data, idx, convert, &message->header.seqNr);
+                idx = pubsubProtocol_readShort(data, idx, convert, &message->header.msgMajorVersion);
+                idx = pubsubProtocol_readShort(data, idx, convert, &message->header.msgMinorVersion);
+                idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadSize);
+                idx = pubsubProtocol_readInt(data, idx, convert, &message->header.metadataSize);
+                idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadPartSize);
+                idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadOffset);
+                pubsubProtocol_readInt(data, idx, convert, &message->header.isLastSegment);
+            }
+        }
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+    }
+    return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    int idx = 0;
+    size_t footerSize = 0;
+    pubsubProtocol_wire_v2_getFooterSize(handle, &footerSize);
+    if (length == footerSize) {
+        unsigned int footerSync;
+        unsigned int convert = message->header.convertEndianess;
+        idx = pubsubProtocol_readInt(data, idx, convert, &footerSync);
+        if (footerSync != PROTOCOL_WIRE_V2_SYNC_FOOTER) {
+            status = CELIX_ILLEGAL_ARGUMENT;
+        }
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+    }
+    return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_encodePayload(void* handle __attribute__((unused)), pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    return pubsubProtocol_encodePayload(message, outBuffer, outLength);
+}
+
+celix_status_t pubsubProtocol_wire_v2_encodeMetadata(void* handle __attribute__((unused)), pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    return pubsubProtocol_encodeMetadata(message, outBuffer, outLength);
+}
+
+celix_status_t pubsubProtocol_wire_v2_decodePayload(void* handle __attribute__((unused)), void *data, size_t length, pubsub_protocol_message_t *message) {
+    return pubsubProtocol_decodePayload(data, length, message);
+}
+
+celix_status_t pubsubProtocol_wire_v2_decodeMetadata(void* handle __attribute__((unused)), void *data, size_t length, pubsub_protocol_message_t *message) {
+    return pubsubProtocol_decodeMetadata(data, length, message);
+}
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.h b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.h
new file mode 100644
index 0000000..295a7a2
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PROTOCOL_WIRE_V2_H_
+#define PUBSUB_PROTOCOL_WIRE_V2_H_
+
+#include "pubsub_protocol.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define PUBSUB_WIRE_V2_PROTOCOL_TYPE "envelope-v2"
+
+typedef struct pubsub_protocol_wire_v2 pubsub_protocol_wire_v2_t;
+
+celix_status_t pubsubProtocol_wire_v2_create(pubsub_protocol_wire_v2_t **protocol);
+celix_status_t pubsubProtocol_wire_v2_destroy(pubsub_protocol_wire_v2_t* protocol);
+
+celix_status_t pubsubProtocol_wire_v2_getHeaderSize(void *handle, size_t *length);
+celix_status_t pubsubProtocol_wire_v2_getHeaderBufferSize(void *handle, size_t *length);
+celix_status_t pubsubProtocol_wire_v2_getSyncHeaderSize(void *handle, size_t *length);
+celix_status_t pubsubProtocol_wire_v2_getSyncHeader(void* handle, void *syncHeader);
+celix_status_t pubsubProtocol_wire_v2_getFooterSize(void* handle,  size_t *length);
+celix_status_t pubsubProtocol_wire_v2_isMessageSegmentationSupported(void* handle, bool *isSupported);
+
+celix_status_t pubsubProtocol_wire_v2_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_wire_v2_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+celix_status_t pubsubProtocol_wire_v2_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_wire_v2_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+celix_status_t pubsubProtocol_wire_v2_encodePayload(void* handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_wire_v2_encodeMetadata(void* handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_wire_v2_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_wire_v2_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PUBSUB_PROTOCOL_WIRE_V2_H_ */
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c
deleted file mode 100644
index 2f07375..0000000
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "pubsub_wire_protocol_common.h"
-
-#include <string.h>
-#if defined(__APPLE__)
-    #include <machine/endian.h>
-#else
-    #include <endian.h>
-    #include <arpa/inet.h>
-#endif
-
-int readShort(const unsigned char *data, int offset, uint16_t *val) {
-    memcpy(val, data + offset, sizeof(uint16_t));
-    *val = ntohs(*val);
-    return offset + (int)sizeof(uint16_t);
-}
-
-int readInt(const unsigned char *data, int offset, uint32_t *val) {
-    memcpy(val, data + offset, sizeof(uint32_t));
-    *val = ntohl(*val);
-    return offset + (int)sizeof(uint32_t);
-}
-
-int readLong(const unsigned char *data, int offset, uint64_t *val) {
-    memcpy(val, data + offset, sizeof(uint64_t));
-#if defined(__APPLE__)
-    *val = ntohll(*val);
-#else
-    *val = be64toh(*val);
-#endif
-    return offset + (int)sizeof(uint64_t);
-}
-
-int writeShort(unsigned char *data, int offset, uint16_t val) {
-    uint16_t nVal = htons(val);
-    memcpy(data + offset, &nVal, sizeof(uint16_t));
-    return offset + (int)sizeof(uint16_t);
-}
-
-int writeInt(unsigned char *data, int offset, uint32_t val) {
-    uint32_t nVal = htonl(val);
-    memcpy(data + offset, &nVal, sizeof(uint32_t));
-    return offset + (int)sizeof(uint32_t);
-}
-
-int writeLong(unsigned char *data, int offset, uint64_t val) {
-#if defined(__APPLE__)
-    uint64_t nVal = htonll(val);
-#else
-    uint64_t nVal = htobe64(val);
-#endif
-    memcpy(data + offset, &nVal, sizeof(uint64_t));
-    return offset + (int)sizeof(uint64_t);
-}
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
deleted file mode 100644
index f5befd2..0000000
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
-#define CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
-
-#include <stdint.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
-static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
-
-int readShort(const unsigned char *data, int offset, uint16_t *val);
-int readInt(const unsigned char *data, int offset, uint32_t *val);
-int readLong(const unsigned char *data, int offset, uint64_t *val);
-
-int writeShort(unsigned char *data, int offset, uint16_t val);
-int writeInt(unsigned char *data, int offset, uint32_t val);
-int writeLong(unsigned char *data, int offset, uint64_t val);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif //CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
index ab8b883..ad1a387 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
@@ -42,11 +42,17 @@ struct pubsub_protocol_header {
     uint32_t payloadSize;
     uint32_t metadataSize;
 
+    /** optional convert Endianess attribute, this attribute is used to indicate the header needs to converted for endianess during encoding
+     *  this attribute is used to indicate the payload needs to converted for endianess after header decoding.
+     *  Note: this attribute is transmitted using the wire protocol, the sync word is used to determine endianess conversion */
+    uint32_t convertEndianess;
+
     /** Optional message segmentation attributes, these attributes are only used/written by the protocol admin.
      *  When message segmentation is supported by the protocol admin */
     uint32_t seqNr;
     uint32_t payloadPartSize;
     uint32_t payloadOffset;
+    uint32_t isLastSegment;
 };
 
 typedef struct pubsub_protocol_payload pubsub_protocol_payload_t;
@@ -113,6 +119,18 @@ typedef struct pubsub_protocol_service {
      * @return status code indicating failure or success
      */
     celix_status_t (*getSyncHeader)(void *handle, void *sync);
+
+  /**
+    * Returns the size of the footer.
+    * Is used by the receiver to configure the expected size of the footer.
+    * The receiver reads the footer to know if the complete message including paylaod is received.
+    *
+    * @param handle handle for service
+    * @param length output param for footer size
+    * @return status code indicating failure or success
+    */
+    celix_status_t (*getFooterSize)(void *handle, size_t *length);
+
   /**
     * Returns the if the protocol service supports the message segmentation attributes that is used by the underlying protocol.
     *
@@ -158,6 +176,18 @@ typedef struct pubsub_protocol_service {
     celix_status_t (*encodeMetadata)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 
     /**
+     * Encodes the footer
+     *
+     * @param handle handle for service
+     * @param message message to use footer from
+     * @param outBuffer byte array containing the encoded footer
+     * @param outLength length of the byte array
+     * @return status code indicating failure or success
+     */
+    celix_status_t (*encodeFooter)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+
+  /**
      * Decodes the given data into message.header.
      *
      * @param handle handle for service
@@ -191,6 +221,17 @@ typedef struct pubsub_protocol_service {
      * @return status code indicating failure or success
      */
     celix_status_t (*decodeMetadata)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+    /**
+     * Decodes the given data into message.header.
+     *
+     * @param handle handle for service
+     * @param data incoming byte array to decode
+     * @param length length of the byte array
+     * @param message pointer to message to be filled in with decoded footer
+     * @return status code indicating failure or success
+     */
+    celix_status_t (*decodeFooter)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 } pubsub_protocol_service_t;
 
 #endif /* PUBSUB_PROTOCOL_SERVICE_H_ */
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index ef73910..4c03808 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -141,7 +141,7 @@ if (BUILD_PUBSUB_PSA_TCP)
             Celix::pubsub_serializer_json
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_tcp
-            Celix::pubsub_protocol_wire_v1
+            Celix::pubsub_protocol_wire_v2
             pubsub_sut
             pubsub_tst
             )
@@ -225,6 +225,26 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     add_test(NAME pubsub_zmq_tests COMMAND pubsub_zmq_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_zmq_tests SCAN_DIR ..)
 
+    add_celix_container(pubsub_zmq_wire_v2_tests
+        USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+        DIR ${CMAKE_CURRENT_BINARY_DIR}
+        PROPERTIES
+        LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+        BUNDLES
+        Celix::pubsub_serializer_json
+        Celix::pubsub_topology_manager
+        Celix::pubsub_admin_zmq
+        Celix::pubsub_protocol_wire_v2
+        pubsub_sut
+        pubsub_tst
+        )
+
+    target_link_libraries(pubsub_zmq_wire_v2_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi ZMQ::lib CZMQ::lib)
+    target_include_directories(pubsub_zmq_wire_v2_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
+    add_test(NAME pubsub_zmq_wire_v2_tests COMMAND pubsub_zmq_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_wire_v2_tests,CONTAINER_LOC>)
+    setup_target_for_coverage(pubsub_zmq_wire_v2_tests SCAN_DIR ..)
+
 
     add_celix_container(pubsub_zmq_zerocopy_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
@@ -249,4 +269,28 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     #TODO fix issues with ZeroCopy and reanble test again
     add_test(NAME pubsub_zmq_zerocopy_tests COMMAND pubsub_zmq_zerocopy_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_zmq_zerocopy_tests SCAN_DIR ..)
+
+    add_celix_container(pubsub_zmq_zerocopy_wire_v2_tests
+        USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+        DIR ${CMAKE_CURRENT_BINARY_DIR}
+        PROPERTIES
+        LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+        PSA_ZMQ_ZEROCOPY_ENABLED=true
+        BUNDLES
+        Celix::pubsub_serializer_json
+        Celix::pubsub_topology_manager
+        Celix::pubsub_admin_zmq
+        Celix::pubsub_protocol_wire_v2
+        Celix::shell
+        Celix::shell_tui
+        pubsub_sut
+        pubsub_tst
+        )
+    target_link_libraries(pubsub_zmq_zerocopy_wire_v2_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi ZMQ::lib CZMQ::lib)
+    target_include_directories(pubsub_zmq_zerocopy_wire_v2_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
+
+    #TODO fix issues with ZeroCopy and reanble test again
+    add_test(NAME pubsub_zmq_zerocopy_wire_v2_tests COMMAND pubsub_zmq_zerocopy_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_wire_v2_tests,CONTAINER_LOC>)
+    setup_target_for_coverage(pubsub_zmq_zerocopy_wire_v2_tests SCAN_DIR ..)
 endif ()
diff --git a/libs/framework/src/celix_log.c b/libs/framework/src/celix_log.c
index 587189b..012283d 100644
--- a/libs/framework/src/celix_log.c
+++ b/libs/framework/src/celix_log.c
@@ -27,6 +27,10 @@
 #include "celix_threads.h"
 #include "celix_array_list.h"
 
+#ifdef NO_MEMSTREAM_AVAILABLE
+#include "memstream/open_memstream.h"
+#endif
+
 #define LOG_NAME        "celix_framework"
 
 struct celix_framework_logger {
diff --git a/libs/utils/include/celix_byteswap.h b/libs/utils/include/celix_byteswap.h
new file mode 100644
index 0000000..47c413a
--- /dev/null
+++ b/libs/utils/include/celix_byteswap.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_BYTESWAP_H
+#define CELIX_BYTESWAP_H
+
+/** Define byteswap compatibility functions for Apple OSX */
+#if defined(__APPLE__)
+/* Swap bytes in 16 bit value.  */
+#define bswap_16(x) \
+     ((((x) & 0xff00) <<  8) | (((x) & 0x00ff) >> 8))
+#define __bswap_16 bswap_16
+
+/* Swap bytes in 32 bit value.  */
+#define bswap_32(x) \
+     ((((x) & 0xff000000) >> 24) | (((x) & 0x00ff0000) >>  8) | 	\
+      (((x) & 0x0000ff00) <<  8) | (((x) & 0x000000ff) << 24))
+#define __bswap_32 bswap_32
+
+/* Swap bytes in 64 bit value.  */
+# define bswap_64(x) \
+       ((((x) & 0xff00000000000000ull) >> 56)	|  \
+        (((x) & 0x00ff000000000000ull) >> 40)	|	 \
+        (((x) & 0x0000ff0000000000ull) >> 24)	|	 \
+        (((x) & 0x000000ff00000000ull) >> 8)	|	 \
+        (((x) & 0x00000000ff000000ull) << 8)	|	 \
+        (((x) & 0x0000000000ff0000ull) << 24)	|	 \
+        (((x) & 0x000000000000ff00ull) << 40)	|	 \
+        (((x) & 0x00000000000000ffull) << 56))
+#define __bswap_64 bswap_64
+#else
+#include <byteswap.h>
+#endif
+
+#endif /* CELIX_BYTESWAP_H */
\ No newline at end of file