You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/06/08 19:31:40 UTC
[celix] 01/02: Add Footer
This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch feature/proposal_protocol_footer
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 9a94bd6ca9c245eea19e707dc87a582de71e9372
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Mon Jun 8 20:16:33 2020 +0200
Add Footer
---
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 45 ++++++++++++-----
.../src/ps_wire_protocol_activator.c | 3 ++
.../src/pubsub_wire_protocol_common.h | 1 +
.../src/pubsub_wire_protocol_impl.c | 58 ++++++++++++++++++++--
.../src/pubsub_wire_protocol_impl.h | 3 ++
.../pubsub/pubsub_spi/include/pubsub_protocol.h | 35 +++++++++++++
libs/framework/src/celix_log.c | 1 +
7 files changed, 130 insertions(+), 16 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 91eb97a..3d647f8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -82,6 +82,7 @@ typedef struct psa_tcp_connection_entry {
unsigned int headerSize;
unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
void *headerBuffer;
+ unsigned int footerSize;
void *footerBuffer;
unsigned int bufferSize;
void *buffer;
@@ -331,6 +332,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
entry->headerBufferSize = size;
handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
entry->syncSize = size;
+ handle->protocol->getFooterSize(handle->protocol->handle, &size);
+ entry->footerSize = size;
entry->bufferSize = handle->bufferSize;
entry->connected = false;
entry->msg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX);
@@ -340,8 +343,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
entry->msg_iovlen++;
}
- entry->footerBuffer = calloc(sizeof(char), entry->headerSize);
- entry->buffer = calloc(sizeof(char), entry->bufferSize);
+ if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+ if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
entry->msg_iovlen++;
@@ -825,7 +828,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
// Message buffer is to small, reallocate to make it bigger
if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
- handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+ handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
if (entry->buffer) free(entry->buffer);
entry->buffer = malloc((size_t) handle->bufferSize);
entry->bufferSize = handle->bufferSize;
@@ -899,20 +902,19 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
}
}
- // Check for end of message using, header of next message. Because of streaming protocol
- // TODO: Add to protocol service to decode/EncodeFooter with unique sync word(different then header)
+ // Check for end of message using, footer of message. Because of streaming protocol
if (nbytes > 0) {
- pubsub_protocol_message_t header;
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->headerSize, MSG_PEEK);
- if (handle->protocol->decodeHeader(handle->protocol->handle,
+ validMsg = true;
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->footerSize, 0);
+ if (handle->protocol->decodeFooter(handle->protocol->handle,
entry->footerBuffer,
- entry->headerSize,
- &header) == CELIX_SUCCESS) {
- // valid header for next buffer, this means that the message is valid
+ entry->footerSize,
+ &entry->header) == CELIX_SUCCESS) {
+ // valid footer, this means that the message is valid
validMsg = true;
} else {
// Did not receive correct header
- L_ERROR("[TCP Socket] Failed to decode next message header seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+ L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
entry->bufferReadSize = 0;
}
}
@@ -1052,6 +1054,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
}
message->header.metadataSize = metadataSize;
+ void *footerData = NULL;
+ size_t footerDataSize = 0;
+ if (entry->footerSize) {
+ handle->protocol->encodeFooter(handle->protocol->handle, message,
+ &footerData,
+ &footerDataSize);
+ }
+
size_t msgSize = 0;
struct msghdr msg;
struct iovec msg_iov[IOV_MAX];
@@ -1085,6 +1095,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
}
+ // Write optional footerData in vector buffer
+ if (footerData && footerDataSize) {
+ msg.msg_iovlen++;
+ msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+ msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ }
+
void *headerData = NULL;
size_t headerSize = 0;
// check if header is not part of the payload (=> headerBufferSize = 0)s
@@ -1142,6 +1160,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
if (metadataData) {
free(metadataData);
}
+ if (footerData) {
+ free(footerData);
+ }
entry->seqNr++;
}
}
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
index 359ea8f..78ae2a2 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
@@ -43,15 +43,18 @@ static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
act->protocolSvc.getHeaderBufferSize = pubsubProtocol_getHeaderBufferSize;
act->protocolSvc.getSyncHeaderSize = pubsubProtocol_getSyncHeaderSize;
act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader;
+ act->protocolSvc.getFooterSize = pubsubProtocol_getFooterSize;
act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_isMessageSegmentationSupported;
act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader;
act->protocolSvc.encodePayload = pubsubProtocol_encodePayload;
act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata;
+ act->protocolSvc.encodeFooter = pubsubProtocol_encodeFooter;
act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader;
act->protocolSvc.decodePayload = pubsubProtocol_decodePayload;
act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata;
+ act->protocolSvc.decodeFooter = pubsubProtocol_decodeFooter;
act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props);
}
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
index f5befd2..ec946a8 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
@@ -27,6 +27,7 @@ extern "C" {
#endif
static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_SYNC_FOOTER = 0xBAABABBA;
static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
int readShort(const unsigned char *data, int offset, uint16_t *val);
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
index 5e265d9..97d7fdd 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -59,15 +59,12 @@ celix_status_t pubsubProtocol_create(pubsub_protocol_wire_v1_t **protocol) {
celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) {
celix_status_t status = CELIX_SUCCESS;
-
free(protocol);
-
return status;
}
celix_status_t pubsubProtocol_getHeaderSize(void* handle, size_t *length) {
*length = sizeof(int) * 5 + sizeof(short) * 2; // header + sync + version = 24
-
return CELIX_SUCCESS;
}
@@ -85,6 +82,11 @@ celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
return CELIX_SUCCESS;
}
+celix_status_t pubsubProtocol_getFooterSize(void* handle, size_t *length) {
+ *length = sizeof(int);
+ return CELIX_SUCCESS;
+}
+
celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported) {
*isSupported = false;
return CELIX_SUCCESS;
@@ -95,6 +97,10 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
size_t headerSize = 0;
pubsubProtocol_getHeaderSize(handle, &headerSize);
+ // Get HeaderSize
+ size_t footerSize = 0;
+ pubsubProtocol_getFooterSize(handle, &footerSize);
+
if (*outBuffer == NULL) {
*outBuffer = calloc(1, headerSize);
*outLength = headerSize;
@@ -110,7 +116,7 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
idx = writeInt(*outBuffer, idx, message->header.payloadSize);
idx = writeInt(*outBuffer, idx, message->header.metadataSize);
-
+ idx = writeInt(*outBuffer, idx, footerSize);
*outLength = idx;
}
@@ -175,6 +181,27 @@ celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_messa
return status;
}
+celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ celix_status_t status = CELIX_SUCCESS;
+ // Get HeaderSize
+ size_t footerSize = 0;
+ pubsubProtocol_getFooterSize(handle, &footerSize);
+
+ if (*outBuffer == NULL) {
+ *outBuffer = calloc(1, footerSize);
+ *outLength = footerSize;
+ }
+ if (*outBuffer == NULL) {
+ status = CELIX_ENOMEM;
+ } else {
+ int idx = 0;
+ idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC_FOOTER);
+ *outLength = idx;
+ }
+
+ return status;
+}
+
celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
celix_status_t status = CELIX_SUCCESS;
@@ -253,6 +280,29 @@ celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t le
return status;
}
+celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ int idx = 0;
+ size_t footerSize = 0;
+ pubsubProtocol_getFooterSize(handle, &footerSize);
+ if (length == footerSize) {
+ unsigned int footerSync;
+ unsigned int footerSyncValue = PROTOCOL_WIRE_SYNC_FOOTER;
+ unsigned int headerSyncValue = PROTOCOL_WIRE_SYNC;
+ idx = readInt(data, idx, &footerSync);
+ if (footerSync != footerSyncValue) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ if (footerSync != headerSyncValue) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ } else {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ return status;
+}
+
static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) {
celix_status_t status = CELIX_SUCCESS;
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
index 06c4dcd..1693d09 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
@@ -37,15 +37,18 @@ celix_status_t pubsubProtocol_getHeaderSize(void *handle, size_t *length);
celix_status_t pubsubProtocol_getHeaderBufferSize(void *handle, size_t *length);
celix_status_t pubsubProtocol_getSyncHeaderSize(void *handle, size_t *length);
celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader);
+celix_status_t pubsubProtocol_getFooterSize(void* handle, size_t *length);
celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported);
celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
#ifdef __cplusplus
}
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
index 2ed8f81..17ca6dd 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
@@ -112,6 +112,18 @@ typedef struct pubsub_protocol_service {
* @return status code indicating failure or success
*/
celix_status_t (*getSyncHeader)(void *handle, void *sync);
+
+ /**
+ * Returns the size of the footer.
+ * Is used by the receiver to configure the expected size of the footer.
+ * The receiver reads the footer to know if the complete message including paylaod is received.
+ *
+ * @param handle handle for service
+ * @param length output param for footer size
+ * @return status code indicating failure or success
+ */
+ celix_status_t (*getFooterSize)(void *handle, size_t *length);
+
/**
* Returns the if the protocol service supports the message segmentation attributes that is used by the underlying protocol.
*
@@ -157,6 +169,18 @@ typedef struct pubsub_protocol_service {
celix_status_t (*encodeMetadata)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
/**
+ * Encodes the footer
+ *
+ * @param handle handle for service
+ * @param message message to use footer from
+ * @param outBuffer byte array containing the encoded footer
+ * @param outLength length of the byte array
+ * @return status code indicating failure or success
+ */
+ celix_status_t (*encodeFooter)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+
+ /**
* Decodes the given data into message.header.
*
* @param handle handle for service
@@ -190,6 +214,17 @@ typedef struct pubsub_protocol_service {
* @return status code indicating failure or success
*/
celix_status_t (*decodeMetadata)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+ /**
+ * Decodes the given data into message.header.
+ *
+ * @param handle handle for service
+ * @param data incoming byte array to decode
+ * @param length length of the byte array
+ * @param message pointer to message to be filled in with decoded footer
+ * @return status code indicating failure or success
+ */
+ celix_status_t (*decodeFooter)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
} pubsub_protocol_service_t;
#endif /* PUBSUB_PROTOCOL_SERVICE_H_ */
diff --git a/libs/framework/src/celix_log.c b/libs/framework/src/celix_log.c
index 587189b..b2c074e 100644
--- a/libs/framework/src/celix_log.c
+++ b/libs/framework/src/celix_log.c
@@ -26,6 +26,7 @@
#include "celix_log.h"
#include "celix_threads.h"
#include "celix_array_list.h"
+#include "memstream/open_memstream.h"
#define LOG_NAME "celix_framework"