You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/06/08 19:31:41 UTC

[celix] 02/02: Add Footer to ZMQ

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/proposal_protocol_footer
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 864ad47c009bde48bb3747fef9b6a858b7b1edd0
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Mon Jun 8 21:30:46 2020 +0200

    Add Footer to ZMQ
---
 .../src/pubsub_zmq_topic_receiver.c                |  7 ++-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 35 ++++++++++-----
 .../gtest/src/PS_WP_tests.cc                       | 52 ++++++++++++++++++++++
 .../src/pubsub_wire_protocol_common.h              |  2 +-
 .../src/pubsub_wire_protocol_impl.c                | 18 ++------
 5 files changed, 85 insertions(+), 29 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 0ec9d7a..bac104d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -628,8 +628,8 @@ static void* psa_zmq_recvThread(void * data) {
 
         zmsg_t *zmsg = zmsg_recv(receiver->zmqSock);
         if (zmsg != NULL) {
-            if (zmsg_size(zmsg) < 2) {
-                L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata)), got %i frames", (int)zmsg_size(zmsg));
+            if (zmsg_size(zmsg) < 3) {
+                L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata) + footer), got %i frames", (int)zmsg_size(zmsg));
             } else {
                 zframe_t *header = zmsg_pop(zmsg); // header
                 zframe_t *payload = NULL;
@@ -650,6 +650,8 @@ static void* psa_zmq_recvThread(void * data) {
                 } else {
                     message.metadata.metadata = NULL;
                 }
+                zframe_t *footer = zmsg_pop(zmsg); // footer
+                receiver->protocol->decodeFooter(receiver->protocol->handle, zframe_data(footer), zframe_size(footer), &message);
                 if (header != NULL && payload != NULL) {
                     struct timespec receiveTime;
                     clock_gettime(CLOCK_REALTIME, &receiveTime);
@@ -659,6 +661,7 @@ static void* psa_zmq_recvThread(void * data) {
                 zframe_destroy(&header);
                 zframe_destroy(&payload);
                 zframe_destroy(&metadata);
+                zframe_destroy(&footer);
             }
             zmsg_destroy(&zmsg);
         } else {
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 91fab55..fe7e171 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -578,6 +578,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 if(metadataLength > 1000000) {
                     L_WARN("ERR LARGE METADATA DETECTED\n");
                 }
+                void *footerData = NULL;
+                size_t footerLength = 0;
+                entry->protSer->encodeFooter(entry->protSer->handle, &message, &footerData, &footerLength);
 
                 message.header.msgId = msgTypeId;
                 message.header.msgMajorVersion = 0;
@@ -601,6 +604,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                     zmq_msg_t msg1; // Header
                     zmq_msg_t msg2; // Payload
                     zmq_msg_t msg3; // Metadata
+                    zmq_msg_t msg4; // Footer
                     void *socket = zsock_resolve(sender->zmq.socket);
                     psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
                     freeMsgEntry->msgSer = entry->msgSer;
@@ -617,16 +621,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
                     //send Payload
                     if (rc > 0) {
-                        if(metadataLength > 0) {
-                            zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, NULL);
-                        } else {
-                            zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
-                        }
-                        int flags = 0;
-                        if (metadataLength > 0) {
-                            flags = ZMQ_SNDMORE;
-                        }
-                        rc = zmq_msg_send(&msg2, socket, flags);
+                        zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+                        rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
                         if (rc == -1) {
                             L_WARN("Error sending payload msg. %s", strerror(errno));
                             zmq_msg_close(&msg2);
@@ -635,13 +631,24 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
                     //send MetaData
                     if (rc > 0 && metadataLength > 0) {
-                        zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, freeMsgEntry);
-                        rc = zmq_msg_send(&msg3, socket, 0);
+                        zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, NULL);
+                        rc = zmq_msg_send(&msg3, socket, ZMQ_SNDMORE);
                         if (rc == -1) {
                             L_WARN("Error sending metadata msg. %s", strerror(errno));
                             zmq_msg_close(&msg3);
                         }
                     }
+
+                    //send Footer
+                    if (rc > 0) {
+                        zmq_msg_init_data(&msg4, footerData, footerLength, psa_zmq_freeMsg, NULL);
+                        rc = zmq_msg_send(&msg4, socket, 0);
+                        if (rc == -1) {
+                            L_WARN("Error sending footer msg. %s", strerror(errno));
+                            zmq_msg_close(&msg4);
+                        }
+                    }
+
                     celixThreadMutex_unlock(&sender->zmq.mutex);
 
                     sendOk = rc > 0;
@@ -653,6 +660,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                     if (metadataLength > 0) {
                         zmsg_addmem(msg, metadataData, metadataLength);
                     }
+                    zmsg_addmem(msg, footerData, footerLength);
                     celixThreadMutex_lock(&sender->zmq.mutex);
                     int rc = zmsg_send(&msg, sender->zmq.socket);
                     celixThreadMutex_unlock(&sender->zmq.mutex);
@@ -672,6 +680,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                     if (metadataData) {
                         free(metadataData);
                     }
+                    if (footerData) {
+                        free(footerData);
+                    }
                 }
 
                 pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc b/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
index 5b9f8a0..5444d47 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
@@ -253,3 +253,55 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_SpecialChars_Test)
     pubsubProtocol_destroy(wireprotocol);
 }
 
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_EncodeFooter_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v1_t *wireprotocol;
+    pubsubProtocol_create(&wireprotocol);
+
+    pubsub_protocol_message_t message;
+
+    void *footerData = nullptr;
+    size_t footerLength = 0;
+    celix_status_t status = pubsubProtocol_encodeFooter(nullptr, &message, &footerData, &footerLength);
+
+    unsigned char exp[4];
+    uint32_t s = 0xBAABABBA;
+    memcpy(exp, &s, sizeof(uint32_t));
+    ASSERT_EQ(status, CELIX_SUCCESS);
+    ASSERT_EQ(4, footerLength);
+    for (int i = 0; i < 4; i++) {
+        ASSERT_EQ(((unsigned char*) footerData)[i], exp[i]);
+    }
+
+    pubsubProtocol_destroy(wireprotocol);
+    free(footerData);
+}
+
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeFooter_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v1_t *wireprotocol;
+    pubsubProtocol_create(&wireprotocol);
+
+    unsigned char exp[4];
+    uint32_t s = 0xBAABABBA;
+    memcpy(exp, &s, sizeof(uint32_t));
+    pubsub_protocol_message_t message;
+
+    celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message);
+
+    ASSERT_EQ(CELIX_SUCCESS, status);
+    pubsubProtocol_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_WireProtocolV1Test_DecodeFooter_IncorrectSync_Test) { // NOLINT(cert-err58-cpp)
+    pubsub_protocol_wire_v1_t *wireprotocol;
+    pubsubProtocol_create(&wireprotocol);
+
+    unsigned char exp[24];
+    uint32_t s = 0xABBABAAB;
+    memcpy(exp, &s, sizeof(uint32_t));
+    pubsub_protocol_message_t message;
+
+    celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message);
+    ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
+
+    pubsubProtocol_destroy(wireprotocol);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
index ec946a8..c9bc70e 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
@@ -26,7 +26,7 @@
 extern "C" {
 #endif
 
-static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_SYNC_HEADER = 0xABBABAAB;
 static const unsigned int PROTOCOL_WIRE_SYNC_FOOTER = 0xBAABABBA;
 static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
 
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
index 97d7fdd..3da2094 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -78,7 +78,7 @@ celix_status_t pubsubProtocol_getSyncHeaderSize(void* handle,  size_t *length) {
 }
 
 celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
-    writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC);
+    writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC_HEADER);
     return CELIX_SUCCESS;
 }
 
@@ -97,10 +97,6 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
     size_t headerSize = 0;
     pubsubProtocol_getHeaderSize(handle, &headerSize);
 
-    // Get HeaderSize
-    size_t footerSize = 0;
-    pubsubProtocol_getFooterSize(handle, &footerSize);
-
     if (*outBuffer == NULL) {
         *outBuffer = calloc(1, headerSize);
         *outLength = headerSize;
@@ -109,14 +105,13 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
         status = CELIX_ENOMEM;
     } else {
         int idx = 0;
-        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC);
+        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC_HEADER);
         idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_ENVELOPE_VERSION);
         idx = writeInt(*outBuffer, idx, message->header.msgId);
         idx = writeShort(*outBuffer, idx, message->header.msgMajorVersion);
         idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
         idx = writeInt(*outBuffer, idx, message->header.payloadSize);
         idx = writeInt(*outBuffer, idx, message->header.metadataSize);
-        idx = writeInt(*outBuffer, idx, footerSize);
         *outLength = idx;
     }
 
@@ -211,7 +206,7 @@ celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t leng
     if (length == headerSize) {
         unsigned int sync;
         idx = readInt(data, idx, &sync);
-        if (sync != PROTOCOL_WIRE_SYNC) {
+        if (sync != PROTOCOL_WIRE_SYNC_HEADER) {
             status = CELIX_ILLEGAL_ARGUMENT;
         } else {
             unsigned int envelopeVersion;
@@ -288,13 +283,8 @@ celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t leng
     pubsubProtocol_getFooterSize(handle, &footerSize);
     if (length == footerSize) {
         unsigned int footerSync;
-        unsigned int footerSyncValue = PROTOCOL_WIRE_SYNC_FOOTER;
-        unsigned int headerSyncValue = PROTOCOL_WIRE_SYNC;
         idx = readInt(data, idx, &footerSync);
-        if (footerSync != footerSyncValue) {
-            status = CELIX_ILLEGAL_ARGUMENT;
-        }
-        if (footerSync != headerSyncValue) {
+        if (footerSync != PROTOCOL_WIRE_SYNC_FOOTER) {
             status = CELIX_ILLEGAL_ARGUMENT;
         }
     } else {