You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/06/08 19:31:39 UTC

[celix] branch feature/proposal_protocol_footer created (now 864ad47)

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a change to branch feature/proposal_protocol_footer
in repository https://gitbox.apache.org/repos/asf/celix.git.


      at 864ad47  Add Footer to ZMQ

This branch includes the following new commits:

     new 9a94bd6  Add Footer
     new 864ad47  Add Footer to ZMQ

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[celix] 01/02: Add Footer

Posted by rb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/proposal_protocol_footer
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 9a94bd6ca9c245eea19e707dc87a582de71e9372
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Mon Jun 8 20:16:33 2020 +0200

    Add Footer
---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 45 ++++++++++++-----
 .../src/ps_wire_protocol_activator.c               |  3 ++
 .../src/pubsub_wire_protocol_common.h              |  1 +
 .../src/pubsub_wire_protocol_impl.c                | 58 ++++++++++++++++++++--
 .../src/pubsub_wire_protocol_impl.h                |  3 ++
 .../pubsub/pubsub_spi/include/pubsub_protocol.h    | 35 +++++++++++++
 libs/framework/src/celix_log.c                     |  1 +
 7 files changed, 130 insertions(+), 16 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 91eb97a..3d647f8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -82,6 +82,7 @@ typedef struct psa_tcp_connection_entry {
     unsigned int headerSize;
     unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
     void *headerBuffer;
+    unsigned int footerSize;
     void *footerBuffer;
     unsigned int bufferSize;
     void *buffer;
@@ -331,6 +332,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
         entry->headerBufferSize = size;
         handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
         entry->syncSize = size;
+        handle->protocol->getFooterSize(handle->protocol->handle, &size);
+        entry->footerSize = size;
         entry->bufferSize = handle->bufferSize;
         entry->connected = false;
         entry->msg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX);
@@ -340,8 +343,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
             entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
             entry->msg_iovlen++;
         }
-        entry->footerBuffer = calloc(sizeof(char), entry->headerSize);
-        entry->buffer = calloc(sizeof(char), entry->bufferSize);
+        if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+        if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
         entry->msg_iovlen++;
@@ -825,7 +828,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
 
     // Message buffer is to small, reallocate to make it bigger
     if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
-        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
         if (entry->buffer) free(entry->buffer);
             entry->buffer = malloc((size_t) handle->bufferSize);
             entry->bufferSize = handle->bufferSize;
@@ -899,20 +902,19 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
                         L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
                     }
                 }
-                // Check for end of message using, header of next message. Because of streaming protocol
-                // TODO: Add to protocol service to decode/EncodeFooter with unique sync word(different then header)
+                // Check for end of message using, footer of message. Because of streaming protocol
                 if (nbytes > 0) {
-                    pubsub_protocol_message_t header;
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->headerSize, MSG_PEEK);
-                    if (handle->protocol->decodeHeader(handle->protocol->handle,
+                    validMsg = true;
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->footerSize, 0);
+                    if (handle->protocol->decodeFooter(handle->protocol->handle,
                                                  entry->footerBuffer,
-                                                 entry->headerSize,
-                                                 &header) == CELIX_SUCCESS) {
-                        // valid header for next buffer, this means that the message is valid
+                                                 entry->footerSize,
+                                                 &entry->header) == CELIX_SUCCESS) {
+                        // valid footer, this means that the message is valid
                         validMsg = true;
                     } else {
                         // Did not receive correct header
-                        L_ERROR("[TCP Socket] Failed to decode next message header seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+                        L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
                         entry->bufferReadSize = 0;
                     }
                 }
@@ -1052,6 +1054,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             }
             message->header.metadataSize = metadataSize;
 
+            void *footerData = NULL;
+            size_t footerDataSize = 0;
+            if (entry->footerSize) {
+                handle->protocol->encodeFooter(handle->protocol->handle, message,
+                                                 &footerData,
+                                                 &footerDataSize);
+            }
+
             size_t msgSize = 0;
             struct msghdr msg;
             struct iovec msg_iov[IOV_MAX];
@@ -1085,6 +1095,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
             }
 
+            // Write optional footerData in vector buffer
+            if (footerData && footerDataSize) {
+                msg.msg_iovlen++;
+                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+            }
+
             void *headerData = NULL;
             size_t headerSize = 0;
             // check if header is not part of the payload (=> headerBufferSize = 0)s
@@ -1142,6 +1160,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             if (metadataData) {
                 free(metadataData);
             }
+            if (footerData) {
+                free(footerData);
+            }
             entry->seqNr++;
         }
     }
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
index 359ea8f..78ae2a2 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
@@ -43,15 +43,18 @@ static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
         act->protocolSvc.getHeaderBufferSize = pubsubProtocol_getHeaderBufferSize;
         act->protocolSvc.getSyncHeaderSize = pubsubProtocol_getSyncHeaderSize;
         act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader;
+        act->protocolSvc.getFooterSize = pubsubProtocol_getFooterSize;
         act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_isMessageSegmentationSupported;
         
         act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader;
         act->protocolSvc.encodePayload = pubsubProtocol_encodePayload;
         act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata;
+        act->protocolSvc.encodeFooter = pubsubProtocol_encodeFooter;
 
         act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader;
         act->protocolSvc.decodePayload = pubsubProtocol_decodePayload;
         act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata;
+        act->protocolSvc.decodeFooter = pubsubProtocol_decodeFooter;
 
         act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props);
     }
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
index f5befd2..ec946a8 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
@@ -27,6 +27,7 @@ extern "C" {
 #endif
 
 static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_SYNC_FOOTER = 0xBAABABBA;
 static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
 
 int readShort(const unsigned char *data, int offset, uint16_t *val);
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
index 5e265d9..97d7fdd 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -59,15 +59,12 @@ celix_status_t pubsubProtocol_create(pubsub_protocol_wire_v1_t **protocol) {
 
 celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) {
     celix_status_t status = CELIX_SUCCESS;
-
     free(protocol);
-
     return status;
 }
 
 celix_status_t pubsubProtocol_getHeaderSize(void* handle, size_t *length) {
     *length = sizeof(int) * 5 + sizeof(short) * 2; // header + sync + version = 24
-
     return CELIX_SUCCESS;
 }
 
@@ -85,6 +82,11 @@ celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
     return CELIX_SUCCESS;
 }
 
+celix_status_t pubsubProtocol_getFooterSize(void* handle,  size_t *length) {
+    *length = sizeof(int);
+    return CELIX_SUCCESS;
+}
+
 celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported) {
     *isSupported = false;
     return CELIX_SUCCESS;
@@ -95,6 +97,10 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
     size_t headerSize = 0;
     pubsubProtocol_getHeaderSize(handle, &headerSize);
 
+    // Get HeaderSize
+    size_t footerSize = 0;
+    pubsubProtocol_getFooterSize(handle, &footerSize);
+
     if (*outBuffer == NULL) {
         *outBuffer = calloc(1, headerSize);
         *outLength = headerSize;
@@ -110,7 +116,7 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
         idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
         idx = writeInt(*outBuffer, idx, message->header.payloadSize);
         idx = writeInt(*outBuffer, idx, message->header.metadataSize);
-
+        idx = writeInt(*outBuffer, idx, footerSize);
         *outLength = idx;
     }
 
@@ -175,6 +181,27 @@ celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_messa
     return status;
 }
 
+celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    celix_status_t status = CELIX_SUCCESS;
+    // Get HeaderSize
+    size_t footerSize = 0;
+    pubsubProtocol_getFooterSize(handle, &footerSize);
+
+    if (*outBuffer == NULL) {
+        *outBuffer = calloc(1, footerSize);
+        *outLength = footerSize;
+    }
+    if (*outBuffer == NULL) {
+        status = CELIX_ENOMEM;
+    } else {
+        int idx = 0;
+        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC_FOOTER);
+        *outLength = idx;
+    }
+
+    return status;
+}
+
 celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
     celix_status_t status = CELIX_SUCCESS;
 
@@ -253,6 +280,29 @@ celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t le
     return status;
 }
 
+celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    int idx = 0;
+    size_t footerSize = 0;
+    pubsubProtocol_getFooterSize(handle, &footerSize);
+    if (length == footerSize) {
+        unsigned int footerSync;
+        unsigned int footerSyncValue = PROTOCOL_WIRE_SYNC_FOOTER;
+        unsigned int headerSyncValue = PROTOCOL_WIRE_SYNC;
+        idx = readInt(data, idx, &footerSync);
+        if (footerSync != footerSyncValue) {
+            status = CELIX_ILLEGAL_ARGUMENT;
+        }
+        if (footerSync != headerSyncValue) {
+            status = CELIX_ILLEGAL_ARGUMENT;
+        }
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+    }
+    return status;
+}
+
 static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) {
     celix_status_t status = CELIX_SUCCESS;
 
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
index 06c4dcd..1693d09 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
@@ -37,15 +37,18 @@ celix_status_t pubsubProtocol_getHeaderSize(void *handle, size_t *length);
 celix_status_t pubsubProtocol_getHeaderBufferSize(void *handle, size_t *length);
 celix_status_t pubsubProtocol_getSyncHeaderSize(void *handle, size_t *length);
 celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader);
+celix_status_t pubsubProtocol_getFooterSize(void* handle,  size_t *length);
 celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported);
 
 celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 
 celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 
 #ifdef __cplusplus
 }
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
index 2ed8f81..17ca6dd 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
@@ -112,6 +112,18 @@ typedef struct pubsub_protocol_service {
      * @return status code indicating failure or success
      */
     celix_status_t (*getSyncHeader)(void *handle, void *sync);
+
+  /**
+    * Returns the size of the footer.
+    * Is used by the receiver to configure the expected size of the footer.
+    * The receiver reads the footer to know if the complete message including paylaod is received.
+    *
+    * @param handle handle for service
+    * @param length output param for footer size
+    * @return status code indicating failure or success
+    */
+    celix_status_t (*getFooterSize)(void *handle, size_t *length);
+
   /**
     * Returns the if the protocol service supports the message segmentation attributes that is used by the underlying protocol.
     *
@@ -157,6 +169,18 @@ typedef struct pubsub_protocol_service {
     celix_status_t (*encodeMetadata)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 
     /**
+     * Encodes the footer
+     *
+     * @param handle handle for service
+     * @param message message to use footer from
+     * @param outBuffer byte array containing the encoded footer
+     * @param outLength length of the byte array
+     * @return status code indicating failure or success
+     */
+    celix_status_t (*encodeFooter)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+
+  /**
      * Decodes the given data into message.header.
      *
      * @param handle handle for service
@@ -190,6 +214,17 @@ typedef struct pubsub_protocol_service {
      * @return status code indicating failure or success
      */
     celix_status_t (*decodeMetadata)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+    /**
+     * Decodes the given data into message.header.
+     *
+     * @param handle handle for service
+     * @param data incoming byte array to decode
+     * @param length length of the byte array
+     * @param message pointer to message to be filled in with decoded footer
+     * @return status code indicating failure or success
+     */
+    celix_status_t (*decodeFooter)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 } pubsub_protocol_service_t;
 
 #endif /* PUBSUB_PROTOCOL_SERVICE_H_ */
diff --git a/libs/framework/src/celix_log.c b/libs/framework/src/celix_log.c
index 587189b..b2c074e 100644
--- a/libs/framework/src/celix_log.c
+++ b/libs/framework/src/celix_log.c
@@ -26,6 +26,7 @@
 #include "celix_log.h"
 #include "celix_threads.h"
 #include "celix_array_list.h"
+#include "memstream/open_memstream.h"
 
 #define LOG_NAME        "celix_framework"
 


[celix] 02/02: Add Footer to ZMQ

Posted by rb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/proposal_protocol_footer
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 864ad47c009bde48bb3747fef9b6a858b7b1edd0
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Mon Jun 8 21:30:46 2020 +0200

    Add Footer to ZMQ
---
 .../src/pubsub_zmq_topic_receiver.c                |  7 ++-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 35 ++++++++++-----
 .../gtest/src/PS_WP_tests.cc                       | 52 ++++++++++++++++++++++
 .../src/pubsub_wire_protocol_common.h              |  2 +-
 .../src/pubsub_wire_protocol_impl.c                | 18 ++------
 5 files changed, 85 insertions(+), 29 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 0ec9d7a..bac104d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -628,8 +628,8 @@ static void* psa_zmq_recvThread(void * data) {
 
         zmsg_t *zmsg = zmsg_recv(receiver->zmqSock);
         if (zmsg != NULL) {
-            if (zmsg_size(zmsg) < 2) {
-                L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata)), got %i frames", (int)zmsg_size(zmsg));
+            if (zmsg_size(zmsg) < 3) {
+                L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata) + footer), got %i frames", (int)zmsg_size(zmsg));
             } else {
                 zframe_t *header = zmsg_pop(zmsg); // header
                 zframe_t *payload = NULL;
@@ -650,6 +650,8 @@ static void* psa_zmq_recvThread(void * data) {
                 } else {
                     message.metadata.metadata = NULL;
                 }
+                zframe_t *footer = zmsg_pop(zmsg); // footer
+                receiver->protocol->decodeFooter(receiver->protocol->handle, zframe_data(footer), zframe_size(footer), &message);
                 if (header != NULL && payload != NULL) {
                     struct timespec receiveTime;
                     clock_gettime(CLOCK_REALTIME, &receiveTime);
@@ -659,6 +661,7 @@ static void* psa_zmq_recvThread(void * data) {
                 zframe_destroy(&header);
                 zframe_destroy(&payload);
                 zframe_destroy(&metadata);
+                zframe_destroy(&footer);
             }
             zmsg_destroy(&zmsg);
         } else {
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 91fab55..fe7e171 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -578,6 +578,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 if(metadataLength > 1000000) {
                     L_WARN("ERR LARGE METADATA DETECTED\n");
                 }
+                void *footerData = NULL;
+                size_t footerLength = 0;
+                entry->protSer->encodeFooter(entry->protSer->handle, &message, &footerData, &footerLength);
 
                 message.header.msgId = msgTypeId;
                 message.header.msgMajorVersion = 0;
@@ -601,6 +604,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                     zmq_msg_t msg1; // Header
                     zmq_msg_t msg2; // Payload
                     zmq_msg_t msg3; // Metadata
+                    zmq_msg_t msg4; // Footer
                     void *socket = zsock_resolve(sender->zmq.socket);
                     psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
                     freeMsgEntry->msgSer = entry->msgSer;
@@ -617,16 +621,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
                     //send Payload
                     if (rc > 0) {
-                        if(metadataLength > 0) {
-                            zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, NULL);
-                        } else {
-                            zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
-                        }
-                        int flags = 0;
-                        if (metadataLength > 0) {
-                            flags = ZMQ_SNDMORE;
-                        }
-                        rc = zmq_msg_send(&msg2, socket, flags);
+                        zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+                        rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
                         if (rc == -1) {
                             L_WARN("Error sending payload msg. %s", strerror(errno));
                             zmq_msg_close(&msg2);
@@ -635,13 +631,24 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
                     //send MetaData
                     if (rc > 0 && metadataLength > 0) {
-                        zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, freeMsgEntry);
-                        rc = zmq_msg_send(&msg3, socket, 0);
+                        zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, NULL);
+                        rc = zmq_msg_send(&msg3, socket, ZMQ_SNDMORE);
                         if (rc == -1) {
                             L_WARN("Error sending metadata msg. %s", strerror(errno));
                             zmq_msg_close(&msg3);
                         }
                     }
+
+                    //send Footer
+                    if (rc > 0) {
+                        zmq_msg_init_data(&msg4, footerData, footerLength, psa_zmq_freeMsg, NULL);
+                        rc = zmq_msg_send(&msg4, socket, 0);
+                        if (rc == -1) {
+                            L_WARN("Error sending footer msg. %s", strerror(errno));
+                            zmq_msg_close(&msg4);
+                        }
+                    }
+
                     celixThreadMutex_unlock(&sender->zmq.mutex);
 
                     sendOk = rc > 0;
@@ -653,6 +660,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                     if (metadataLength > 0) {
                         zmsg_addmem(msg, metadataData, metadataLength);
                     }
+                    zmsg_addmem(msg, footerData, footerLength);
                     celixThreadMutex_lock(&sender->zmq.mutex);
                     int rc = zmsg_send(&msg, sender->zmq.socket);
                     celixThreadMutex_unlock(&sender->zmq.mutex);
@@ -672,6 +680,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                     if (metadataData) {
                         free(metadataData);
                     }
+                    if (footerData) {
+                        free(footerData);
+                    }
                 }
 
                 pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc b/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
index 5b9f8a0..5444d47 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
@@ -253,3 +253,55 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_SpecialChars_Test)
     pubsubProtocol_destroy(wireprotocol);
 }
 
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_EncodeFooter_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v1_t *wireprotocol;
+    pubsubProtocol_create(&wireprotocol);
+
+    pubsub_protocol_message_t message;
+
+    void *footerData = nullptr;
+    size_t footerLength = 0;
+    celix_status_t status = pubsubProtocol_encodeFooter(nullptr, &message, &footerData, &footerLength);
+
+    unsigned char exp[4];
+    uint32_t s = 0xBAABABBA;
+    memcpy(exp, &s, sizeof(uint32_t));
+    ASSERT_EQ(status, CELIX_SUCCESS);
+    ASSERT_EQ(4, footerLength);
+    for (int i = 0; i < 4; i++) {
+        ASSERT_EQ(((unsigned char*) footerData)[i], exp[i]);
+    }
+
+    pubsubProtocol_destroy(wireprotocol);
+    free(footerData);
+}
+
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeFooter_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v1_t *wireprotocol;
+    pubsubProtocol_create(&wireprotocol);
+
+    unsigned char exp[4];
+    uint32_t s = 0xBAABABBA;
+    memcpy(exp, &s, sizeof(uint32_t));
+    pubsub_protocol_message_t message;
+
+    celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message);
+
+    ASSERT_EQ(CELIX_SUCCESS, status);
+    pubsubProtocol_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_WireProtocolV1Test_DecodeFooter_IncorrectSync_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v1_t *wireprotocol;
+    pubsubProtocol_create(&wireprotocol);
+
+    unsigned char exp[24];
+    uint32_t s = 0xABBABAAB;
+    memcpy(exp, &s, sizeof(uint32_t));
+    pubsub_protocol_message_t message;
+
+    celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message);
+    ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
+
+    pubsubProtocol_destroy(wireprotocol);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
index ec946a8..c9bc70e 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
@@ -26,7 +26,7 @@
 extern "C" {
 #endif
 
-static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_SYNC_HEADER = 0xABBABAAB;
 static const unsigned int PROTOCOL_WIRE_SYNC_FOOTER = 0xBAABABBA;
 static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
 
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
index 97d7fdd..3da2094 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -78,7 +78,7 @@ celix_status_t pubsubProtocol_getSyncHeaderSize(void* handle,  size_t *length) {
 }
 
 celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
-    writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC);
+    writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC_HEADER);
     return CELIX_SUCCESS;
 }
 
@@ -97,10 +97,6 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
     size_t headerSize = 0;
     pubsubProtocol_getHeaderSize(handle, &headerSize);
 
-    // Get HeaderSize
-    size_t footerSize = 0;
-    pubsubProtocol_getFooterSize(handle, &footerSize);
-
     if (*outBuffer == NULL) {
         *outBuffer = calloc(1, headerSize);
         *outLength = headerSize;
@@ -109,14 +105,13 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
         status = CELIX_ENOMEM;
     } else {
         int idx = 0;
-        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC);
+        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC_HEADER);
         idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_ENVELOPE_VERSION);
         idx = writeInt(*outBuffer, idx, message->header.msgId);
         idx = writeShort(*outBuffer, idx, message->header.msgMajorVersion);
         idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
         idx = writeInt(*outBuffer, idx, message->header.payloadSize);
         idx = writeInt(*outBuffer, idx, message->header.metadataSize);
-        idx = writeInt(*outBuffer, idx, footerSize);
         *outLength = idx;
     }
 
@@ -211,7 +206,7 @@ celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t leng
     if (length == headerSize) {
         unsigned int sync;
         idx = readInt(data, idx, &sync);
-        if (sync != PROTOCOL_WIRE_SYNC) {
+        if (sync != PROTOCOL_WIRE_SYNC_HEADER) {
             status = CELIX_ILLEGAL_ARGUMENT;
         } else {
             unsigned int envelopeVersion;
@@ -288,13 +283,8 @@ celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t leng
     pubsubProtocol_getFooterSize(handle, &footerSize);
     if (length == footerSize) {
         unsigned int footerSync;
-        unsigned int footerSyncValue = PROTOCOL_WIRE_SYNC_FOOTER;
-        unsigned int headerSyncValue = PROTOCOL_WIRE_SYNC;
         idx = readInt(data, idx, &footerSync);
-        if (footerSync != footerSyncValue) {
-            status = CELIX_ILLEGAL_ARGUMENT;
-        }
-        if (footerSync != headerSyncValue) {
+        if (footerSync != PROTOCOL_WIRE_SYNC_FOOTER) {
             status = CELIX_ILLEGAL_ARGUMENT;
         }
     } else {