You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/06/08 19:31:40 UTC

[celix] 01/02: Add Footer

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/proposal_protocol_footer
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 9a94bd6ca9c245eea19e707dc87a582de71e9372
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Mon Jun 8 20:16:33 2020 +0200

    Add Footer
---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 45 ++++++++++++-----
 .../src/ps_wire_protocol_activator.c               |  3 ++
 .../src/pubsub_wire_protocol_common.h              |  1 +
 .../src/pubsub_wire_protocol_impl.c                | 58 ++++++++++++++++++++--
 .../src/pubsub_wire_protocol_impl.h                |  3 ++
 .../pubsub/pubsub_spi/include/pubsub_protocol.h    | 35 +++++++++++++
 libs/framework/src/celix_log.c                     |  1 +
 7 files changed, 130 insertions(+), 16 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 91eb97a..3d647f8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -82,6 +82,7 @@ typedef struct psa_tcp_connection_entry {
     unsigned int headerSize;
     unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
     void *headerBuffer;
+    unsigned int footerSize;
     void *footerBuffer;
     unsigned int bufferSize;
     void *buffer;
@@ -331,6 +332,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
         entry->headerBufferSize = size;
         handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
         entry->syncSize = size;
+        handle->protocol->getFooterSize(handle->protocol->handle, &size);
+        entry->footerSize = size;
         entry->bufferSize = handle->bufferSize;
         entry->connected = false;
         entry->msg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX);
@@ -340,8 +343,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
             entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
             entry->msg_iovlen++;
         }
-        entry->footerBuffer = calloc(sizeof(char), entry->headerSize);
-        entry->buffer = calloc(sizeof(char), entry->bufferSize);
+        if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+        if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
         entry->msg_iovlen++;
@@ -825,7 +828,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
 
     // Message buffer is to small, reallocate to make it bigger
     if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
-        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
         if (entry->buffer) free(entry->buffer);
             entry->buffer = malloc((size_t) handle->bufferSize);
             entry->bufferSize = handle->bufferSize;
@@ -899,20 +902,19 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
                         L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
                     }
                 }
-                // Check for end of message using, header of next message. Because of streaming protocol
-                // TODO: Add to protocol service to decode/EncodeFooter with unique sync word(different then header)
+                // Check for end of message using, footer of message. Because of streaming protocol
                 if (nbytes > 0) {
-                    pubsub_protocol_message_t header;
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->headerSize, MSG_PEEK);
-                    if (handle->protocol->decodeHeader(handle->protocol->handle,
+                    validMsg = true;
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->footerSize, 0);
+                    if (handle->protocol->decodeFooter(handle->protocol->handle,
                                                  entry->footerBuffer,
-                                                 entry->headerSize,
-                                                 &header) == CELIX_SUCCESS) {
-                        // valid header for next buffer, this means that the message is valid
+                                                 entry->footerSize,
+                                                 &entry->header) == CELIX_SUCCESS) {
+                        // valid footer, this means that the message is valid
                         validMsg = true;
                     } else {
                         // Did not receive correct header
-                        L_ERROR("[TCP Socket] Failed to decode next message header seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+                        L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
                         entry->bufferReadSize = 0;
                     }
                 }
@@ -1052,6 +1054,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             }
             message->header.metadataSize = metadataSize;
 
+            void *footerData = NULL;
+            size_t footerDataSize = 0;
+            if (entry->footerSize) {
+                handle->protocol->encodeFooter(handle->protocol->handle, message,
+                                                 &footerData,
+                                                 &footerDataSize);
+            }
+
             size_t msgSize = 0;
             struct msghdr msg;
             struct iovec msg_iov[IOV_MAX];
@@ -1085,6 +1095,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
             }
 
+            // Write optional footerData in vector buffer
+            if (footerData && footerDataSize) {
+                msg.msg_iovlen++;
+                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+            }
+
             void *headerData = NULL;
             size_t headerSize = 0;
             // check if header is not part of the payload (=> headerBufferSize = 0)s
@@ -1142,6 +1160,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             if (metadataData) {
                 free(metadataData);
             }
+            if (footerData) {
+                free(footerData);
+            }
             entry->seqNr++;
         }
     }
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
index 359ea8f..78ae2a2 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
@@ -43,15 +43,18 @@ static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
         act->protocolSvc.getHeaderBufferSize = pubsubProtocol_getHeaderBufferSize;
         act->protocolSvc.getSyncHeaderSize = pubsubProtocol_getSyncHeaderSize;
         act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader;
+        act->protocolSvc.getFooterSize = pubsubProtocol_getFooterSize;
         act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_isMessageSegmentationSupported;
         
         act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader;
         act->protocolSvc.encodePayload = pubsubProtocol_encodePayload;
         act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata;
+        act->protocolSvc.encodeFooter = pubsubProtocol_encodeFooter;
 
         act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader;
         act->protocolSvc.decodePayload = pubsubProtocol_decodePayload;
         act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata;
+        act->protocolSvc.decodeFooter = pubsubProtocol_decodeFooter;
 
         act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props);
     }
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
index f5befd2..ec946a8 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
@@ -27,6 +27,7 @@ extern "C" {
 #endif
 
 static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_SYNC_FOOTER = 0xBAABABBA;
 static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
 
 int readShort(const unsigned char *data, int offset, uint16_t *val);
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
index 5e265d9..97d7fdd 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -59,15 +59,12 @@ celix_status_t pubsubProtocol_create(pubsub_protocol_wire_v1_t **protocol) {
 
 celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) {
     celix_status_t status = CELIX_SUCCESS;
-
     free(protocol);
-
     return status;
 }
 
 celix_status_t pubsubProtocol_getHeaderSize(void* handle, size_t *length) {
     *length = sizeof(int) * 5 + sizeof(short) * 2; // header + sync + version = 24
-
     return CELIX_SUCCESS;
 }
 
@@ -85,6 +82,11 @@ celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
     return CELIX_SUCCESS;
 }
 
+celix_status_t pubsubProtocol_getFooterSize(void* handle,  size_t *length) {
+    *length = sizeof(int);
+    return CELIX_SUCCESS;
+}
+
 celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported) {
     *isSupported = false;
     return CELIX_SUCCESS;
@@ -95,6 +97,10 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
     size_t headerSize = 0;
     pubsubProtocol_getHeaderSize(handle, &headerSize);
 
+    // Get HeaderSize
+    size_t footerSize = 0;
+    pubsubProtocol_getFooterSize(handle, &footerSize);
+
     if (*outBuffer == NULL) {
         *outBuffer = calloc(1, headerSize);
         *outLength = headerSize;
@@ -110,7 +116,7 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message
         idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
         idx = writeInt(*outBuffer, idx, message->header.payloadSize);
         idx = writeInt(*outBuffer, idx, message->header.metadataSize);
-
+        idx = writeInt(*outBuffer, idx, footerSize);
         *outLength = idx;
     }
 
@@ -175,6 +181,27 @@ celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_messa
     return status;
 }
 
+celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+    celix_status_t status = CELIX_SUCCESS;
+    // Get HeaderSize
+    size_t footerSize = 0;
+    pubsubProtocol_getFooterSize(handle, &footerSize);
+
+    if (*outBuffer == NULL) {
+        *outBuffer = calloc(1, footerSize);
+        *outLength = footerSize;
+    }
+    if (*outBuffer == NULL) {
+        status = CELIX_ENOMEM;
+    } else {
+        int idx = 0;
+        idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC_FOOTER);
+        *outLength = idx;
+    }
+
+    return status;
+}
+
 celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
     celix_status_t status = CELIX_SUCCESS;
 
@@ -253,6 +280,29 @@ celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t le
     return status;
 }
 
+celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    int idx = 0;
+    size_t footerSize = 0;
+    pubsubProtocol_getFooterSize(handle, &footerSize);
+    if (length == footerSize) {
+        unsigned int footerSync;
+        unsigned int footerSyncValue = PROTOCOL_WIRE_SYNC_FOOTER;
+        unsigned int headerSyncValue = PROTOCOL_WIRE_SYNC;
+        idx = readInt(data, idx, &footerSync);
+        if (footerSync != footerSyncValue) {
+            status = CELIX_ILLEGAL_ARGUMENT;
+        }
+        if (footerSync != headerSyncValue) {
+            status = CELIX_ILLEGAL_ARGUMENT;
+        }
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+    }
+    return status;
+}
+
 static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) {
     celix_status_t status = CELIX_SUCCESS;
 
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
index 06c4dcd..1693d09 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
@@ -37,15 +37,18 @@ celix_status_t pubsubProtocol_getHeaderSize(void *handle, size_t *length);
 celix_status_t pubsubProtocol_getHeaderBufferSize(void *handle, size_t *length);
 celix_status_t pubsubProtocol_getSyncHeaderSize(void *handle, size_t *length);
 celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader);
+celix_status_t pubsubProtocol_getFooterSize(void* handle,  size_t *length);
 celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported);
 
 celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 
 celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 
 #ifdef __cplusplus
 }
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
index 2ed8f81..17ca6dd 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
@@ -112,6 +112,18 @@ typedef struct pubsub_protocol_service {
      * @return status code indicating failure or success
      */
     celix_status_t (*getSyncHeader)(void *handle, void *sync);
+
+  /**
+    * Returns the size of the footer.
+    * Is used by the receiver to configure the expected size of the footer.
+    * The receiver reads the footer to know if the complete message including paylaod is received.
+    *
+    * @param handle handle for service
+    * @param length output param for footer size
+    * @return status code indicating failure or success
+    */
+    celix_status_t (*getFooterSize)(void *handle, size_t *length);
+
   /**
     * Returns the if the protocol service supports the message segmentation attributes that is used by the underlying protocol.
     *
@@ -157,6 +169,18 @@ typedef struct pubsub_protocol_service {
     celix_status_t (*encodeMetadata)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
 
     /**
+     * Encodes the footer
+     *
+     * @param handle handle for service
+     * @param message message to use footer from
+     * @param outBuffer byte array containing the encoded footer
+     * @param outLength length of the byte array
+     * @return status code indicating failure or success
+     */
+    celix_status_t (*encodeFooter)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+
+  /**
      * Decodes the given data into message.header.
      *
      * @param handle handle for service
@@ -190,6 +214,17 @@ typedef struct pubsub_protocol_service {
      * @return status code indicating failure or success
      */
     celix_status_t (*decodeMetadata)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+    /**
+     * Decodes the given data into message.header.
+     *
+     * @param handle handle for service
+     * @param data incoming byte array to decode
+     * @param length length of the byte array
+     * @param message pointer to message to be filled in with decoded footer
+     * @return status code indicating failure or success
+     */
+    celix_status_t (*decodeFooter)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 } pubsub_protocol_service_t;
 
 #endif /* PUBSUB_PROTOCOL_SERVICE_H_ */
diff --git a/libs/framework/src/celix_log.c b/libs/framework/src/celix_log.c
index 587189b..b2c074e 100644
--- a/libs/framework/src/celix_log.c
+++ b/libs/framework/src/celix_log.c
@@ -26,6 +26,7 @@
 #include "celix_log.h"
 #include "celix_threads.h"
 #include "celix_array_list.h"
+#include "memstream/open_memstream.h"
 
 #define LOG_NAME        "celix_framework"