You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/06/08 19:31:41 UTC
[celix] 02/02: Add Footer to ZMQ
This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch feature/proposal_protocol_footer
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 864ad47c009bde48bb3747fef9b6a858b7b1edd0
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Mon Jun 8 21:30:46 2020 +0200
Add Footer to ZMQ
---
.../src/pubsub_zmq_topic_receiver.c | 7 ++-
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 35 ++++++++++-----
.../gtest/src/PS_WP_tests.cc | 52 ++++++++++++++++++++++
.../src/pubsub_wire_protocol_common.h | 2 +-
.../src/pubsub_wire_protocol_impl.c | 18 ++------
5 files changed, 85 insertions(+), 29 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 0ec9d7a..bac104d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -628,8 +628,8 @@ static void* psa_zmq_recvThread(void * data) {
zmsg_t *zmsg = zmsg_recv(receiver->zmqSock);
if (zmsg != NULL) {
- if (zmsg_size(zmsg) < 2) {
- L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata)), got %i frames", (int)zmsg_size(zmsg));
+ if (zmsg_size(zmsg) < 3) {
+ L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata) + footer), got %i frames", (int)zmsg_size(zmsg));
} else {
zframe_t *header = zmsg_pop(zmsg); // header
zframe_t *payload = NULL;
@@ -650,6 +650,8 @@ static void* psa_zmq_recvThread(void * data) {
} else {
message.metadata.metadata = NULL;
}
+ zframe_t *footer = zmsg_pop(zmsg); // footer
+ receiver->protocol->decodeFooter(receiver->protocol->handle, zframe_data(footer), zframe_size(footer), &message);
if (header != NULL && payload != NULL) {
struct timespec receiveTime;
clock_gettime(CLOCK_REALTIME, &receiveTime);
@@ -659,6 +661,7 @@ static void* psa_zmq_recvThread(void * data) {
zframe_destroy(&header);
zframe_destroy(&payload);
zframe_destroy(&metadata);
+ zframe_destroy(&footer);
}
zmsg_destroy(&zmsg);
} else {
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 91fab55..fe7e171 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -578,6 +578,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
if(metadataLength > 1000000) {
L_WARN("ERR LARGE METADATA DETECTED\n");
}
+ void *footerData = NULL;
+ size_t footerLength = 0;
+ entry->protSer->encodeFooter(entry->protSer->handle, &message, &footerData, &footerLength);
message.header.msgId = msgTypeId;
message.header.msgMajorVersion = 0;
@@ -601,6 +604,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
zmq_msg_t msg1; // Header
zmq_msg_t msg2; // Payload
zmq_msg_t msg3; // Metadata
+ zmq_msg_t msg4; // Footer
void *socket = zsock_resolve(sender->zmq.socket);
psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
freeMsgEntry->msgSer = entry->msgSer;
@@ -617,16 +621,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
//send Payload
if (rc > 0) {
- if(metadataLength > 0) {
- zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, NULL);
- } else {
- zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
- }
- int flags = 0;
- if (metadataLength > 0) {
- flags = ZMQ_SNDMORE;
- }
- rc = zmq_msg_send(&msg2, socket, flags);
+ zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+ rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
if (rc == -1) {
L_WARN("Error sending payload msg. %s", strerror(errno));
zmq_msg_close(&msg2);
@@ -635,13 +631,24 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
//send MetaData
if (rc > 0 && metadataLength > 0) {
- zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, freeMsgEntry);
- rc = zmq_msg_send(&msg3, socket, 0);
+ zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, NULL);
+ rc = zmq_msg_send(&msg3, socket, ZMQ_SNDMORE);
if (rc == -1) {
L_WARN("Error sending metadata msg. %s", strerror(errno));
zmq_msg_close(&msg3);
}
}
+
+ //send Footer
+ if (rc > 0) {
+ zmq_msg_init_data(&msg4, footerData, footerLength, psa_zmq_freeMsg, NULL);
+ rc = zmq_msg_send(&msg4, socket, 0);
+ if (rc == -1) {
+ L_WARN("Error sending footer msg. %s", strerror(errno));
+ zmq_msg_close(&msg4);
+ }
+ }
+
celixThreadMutex_unlock(&sender->zmq.mutex);
sendOk = rc > 0;
@@ -653,6 +660,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
if (metadataLength > 0) {
zmsg_addmem(msg, metadataData, metadataLength);
}
+ zmsg_addmem(msg, footerData, footerLength);
celixThreadMutex_lock(&sender->zmq.mutex);
int rc = zmsg_send(&msg, sender->zmq.socket);
celixThreadMutex_unlock(&sender->zmq.mutex);
@@ -672,6 +680,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
if (metadataData) {
free(metadataData);
}
+ if (footerData) {
+ free(footerData);
+ }
}
pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc b/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
index 5b9f8a0..5444d47 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
@@ -253,3 +253,55 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_SpecialChars_Test)
pubsubProtocol_destroy(wireprotocol);
}
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_EncodeFooter_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v1_t *wireprotocol;
+ pubsubProtocol_create(&wireprotocol);
+
+ pubsub_protocol_message_t message;
+
+ void *footerData = nullptr;
+ size_t footerLength = 0;
+ celix_status_t status = pubsubProtocol_encodeFooter(nullptr, &message, &footerData, &footerLength);
+
+ unsigned char exp[4];
+ uint32_t s = 0xBAABABBA;
+ memcpy(exp, &s, sizeof(uint32_t));
+ ASSERT_EQ(status, CELIX_SUCCESS);
+ ASSERT_EQ(4, footerLength);
+ for (int i = 0; i < 4; i++) {
+ ASSERT_EQ(((unsigned char*) footerData)[i], exp[i]);
+ }
+
+ pubsubProtocol_destroy(wireprotocol);
+ free(footerData);
+}
+
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeFooter_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v1_t *wireprotocol;
+ pubsubProtocol_create(&wireprotocol);
+
+ unsigned char exp[4];
+ uint32_t s = 0xBAABABBA;
+ memcpy(exp, &s, sizeof(uint32_t));
+ pubsub_protocol_message_t message;
+
+ celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message);
+
+ ASSERT_EQ(CELIX_SUCCESS, status);
+ pubsubProtocol_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_WireProtocolV1Test_DecodeFooter_IncorrectSync_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v1_t *wireprotocol;
+ pubsubProtocol_create(&wireprotocol);
+
+ unsigned char exp[24];
+ uint32_t s = 0xABBABAAB;
+ memcpy(exp, &s, sizeof(uint32_t));
+ pubsub_protocol_message_t message;
+
+ celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message);
+ ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
+
+ pubsubProtocol_destroy(wireprotocol);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
index ec946a8..c9bc70e 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
@@ -26,7 +26,7 @@
extern "C" {
#endif
-static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_SYNC_HEADER = 0xABBABAAB;
static const unsigned int PROTOCOL_WIRE_SYNC_FOOTER = 0xBAABABBA;
static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
index 97d7fdd..3da2094 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -78,7 +78,7 @@ celix_status_t pubsubProtocol_getSyncHeaderSize(void* handle, size_t *length) {
}
celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
- writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC);
+ writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC_HEADER);
return CELIX_SUCCESS;
}
@@ -97,10 +97,6 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
size_t headerSize = 0;
pubsubProtocol_getHeaderSize(handle, &headerSize);
- // Get HeaderSize
- size_t footerSize = 0;
- pubsubProtocol_getFooterSize(handle, &footerSize);
-
if (*outBuffer == NULL) {
*outBuffer = calloc(1, headerSize);
*outLength = headerSize;
@@ -109,14 +105,13 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
status = CELIX_ENOMEM;
} else {
int idx = 0;
- idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC);
+ idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC_HEADER);
idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_ENVELOPE_VERSION);
idx = writeInt(*outBuffer, idx, message->header.msgId);
idx = writeShort(*outBuffer, idx, message->header.msgMajorVersion);
idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
idx = writeInt(*outBuffer, idx, message->header.payloadSize);
idx = writeInt(*outBuffer, idx, message->header.metadataSize);
- idx = writeInt(*outBuffer, idx, footerSize);
*outLength = idx;
}
@@ -211,7 +206,7 @@ celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t leng
if (length == headerSize) {
unsigned int sync;
idx = readInt(data, idx, &sync);
- if (sync != PROTOCOL_WIRE_SYNC) {
+ if (sync != PROTOCOL_WIRE_SYNC_HEADER) {
status = CELIX_ILLEGAL_ARGUMENT;
} else {
unsigned int envelopeVersion;
@@ -288,13 +283,8 @@ celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t leng
pubsubProtocol_getFooterSize(handle, &footerSize);
if (length == footerSize) {
unsigned int footerSync;
- unsigned int footerSyncValue = PROTOCOL_WIRE_SYNC_FOOTER;
- unsigned int headerSyncValue = PROTOCOL_WIRE_SYNC;
idx = readInt(data, idx, &footerSync);
- if (footerSync != footerSyncValue) {
- status = CELIX_ILLEGAL_ARGUMENT;
- }
- if (footerSync != headerSyncValue) {
+ if (footerSync != PROTOCOL_WIRE_SYNC_FOOTER) {
status = CELIX_ILLEGAL_ARGUMENT;
}
} else {