You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/23 15:20:36 UTC
[celix] branch master updated: Feature/proposal protocol footer
(#253)
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/master by this push:
new 01dc53e Feature/proposal protocol footer (#253)
01dc53e is described below
commit 01dc53ea7c4731f5709d0c9d6fae0e254749aa63
Author: rbulter <ro...@gmail.com>
AuthorDate: Tue Jun 23 17:20:25 2020 +0200
Feature/proposal protocol footer (#253)
* adds wire_v2 protocol with footer
Co-authored-by: Pepijn Noltes <pe...@gmail.com>
---
bundles/pubsub/CMakeLists.txt | 2 +-
.../src/pubsub_psa_tcp_constants.h | 1 -
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 71 +++--
.../src/pubsub_tcp_topic_receiver.c | 6 +-
.../src/pubsub_zmq_topic_receiver.c | 10 +-
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 47 ++-
.../gtest => pubsub_protocol}/CMakeLists.txt | 17 +-
.../pubsub_protocol_lib}/CMakeLists.txt | 20 +-
.../include/pubsub_wire_protocol_common.h | 66 +++++
.../src/pubsub_wire_protocol_common.c} | 323 ++++++++-------------
.../pubsub_protocol_wire_v1/CMakeLists.txt | 5 +-
.../pubsub_protocol_wire_v1/gtest/CMakeLists.txt | 0
.../gtest/src/PS_WP_tests.cc | 38 ++-
.../pubsub_protocol_wire_v1/gtest/src/main.cc | 0
.../src/ps_wire_protocol_activator.c | 11 +-
.../src/pubsub_wire_protocol_impl.c | 171 +++++++++++
.../src/pubsub_wire_protocol_impl.h | 11 +-
.../pubsub_protocol_wire_v2}/CMakeLists.txt | 27 +-
.../pubsub_protocol_wire_v2}/gtest/CMakeLists.txt | 12 +-
.../gtest/src/PS_WP_v2_tests.cc | 249 ++++++++++++++++
.../pubsub_protocol_wire_v2}/gtest/src/main.cc | 0
.../src/ps_wire_v2_protocol_activator.c} | 37 +--
.../src/pubsub_wire_v2_protocol_impl.c | 205 +++++++++++++
.../src/pubsub_wire_v2_protocol_impl.h | 58 ++++
.../src/pubsub_wire_protocol_common.c | 72 -----
.../src/pubsub_wire_protocol_common.h | 44 ---
.../pubsub/pubsub_spi/include/pubsub_protocol.h | 41 +++
bundles/pubsub/test/CMakeLists.txt | 46 ++-
libs/framework/src/celix_log.c | 4 +
libs/utils/include/celix_byteswap.h | 51 ++++
30 files changed, 1205 insertions(+), 440 deletions(-)
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 90d762a..e4e78ab 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -46,7 +46,7 @@ if (PUBSUB)
add_subdirectory(pubsub_discovery)
add_subdirectory(pubsub_serializer_json)
add_subdirectory(pubsub_serializer_avrobin)
- add_subdirectory(pubsub_protocol_wire_v1)
+ add_subdirectory(pubsub_protocol)
add_subdirectory(keygen)
add_subdirectory(mock)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 6026212..302c9f6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -65,7 +65,6 @@
#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT 5.0
-#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT 0.0
#define PUBSUB_TCP_PSA_IP_KEY "PSA_IP"
#define PUBSUB_TCP_ADMIN_TYPE "tcp"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 91eb97a..3bb31cd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -82,6 +82,7 @@ typedef struct psa_tcp_connection_entry {
unsigned int headerSize;
unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
void *headerBuffer;
+ unsigned int footerSize;
void *footerBuffer;
unsigned int bufferSize;
void *buffer;
@@ -91,7 +92,6 @@ typedef struct psa_tcp_connection_entry {
struct msghdr msg;
size_t msg_iovlen; /* Number of elements in the vector. */
unsigned int retryCount;
- unsigned int seqNr;
} psa_tcp_connection_entry_t;
//
@@ -331,6 +331,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
entry->headerBufferSize = size;
handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
entry->syncSize = size;
+ handle->protocol->getFooterSize(handle->protocol->handle, &size);
+ entry->footerSize = size;
entry->bufferSize = handle->bufferSize;
entry->connected = false;
entry->msg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX);
@@ -340,8 +342,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
entry->msg_iovlen++;
}
- entry->footerBuffer = calloc(sizeof(char), entry->headerSize);
- entry->buffer = calloc(sizeof(char), entry->bufferSize);
+ if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+ if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
entry->msg_iovlen++;
@@ -825,7 +827,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
// Message buffer is to small, reallocate to make it bigger
if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
- handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+ handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
if (entry->buffer) free(entry->buffer);
entry->buffer = malloc((size_t) handle->bufferSize);
entry->bufferSize = handle->bufferSize;
@@ -836,14 +838,13 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
if (nbytes > 0) {
// Check header message buffer
- if (handle->protocol->decodeHeader(handle->protocol->handle,
- header_buffer,
- entry->headerSize,
- &entry->header) != CELIX_SUCCESS) {
+ if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
// Did not receive correct header
// skip sync word and try to read next header
nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
- if (!entry->headerError) L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+ if (!entry->headerError) {
+ L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+ }
entry->headerError = true;
entry->bufferReadSize = 0;
} else {
@@ -899,21 +900,21 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
}
}
- // Check for end of message using, header of next message. Because of streaming protocol
- // TODO: Add to protocol service to decode/EncodeFooter with unique sync word(different then header)
+ // Check for end of message using, footer of message. Because of streaming protocol
if (nbytes > 0) {
- pubsub_protocol_message_t header;
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->headerSize, MSG_PEEK);
- if (handle->protocol->decodeHeader(handle->protocol->handle,
- entry->footerBuffer,
- entry->headerSize,
- &header) == CELIX_SUCCESS) {
- // valid header for next buffer, this means that the message is valid
- validMsg = true;
+ if (entry->footerSize > 0) {
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0);
+ if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
+ // valid footer, this means that the message is valid
+ validMsg = true;
+ } else {
+ // Did not receive correct header
+ L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+ entry->bufferReadSize = 0;
+ }
} else {
- // Did not receive correct header
- L_ERROR("[TCP Socket] Failed to decode next message header seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
- entry->bufferReadSize = 0;
+ // No Footer, then complete message is received
+ validMsg = true;
}
}
}
@@ -1037,11 +1038,11 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
payloadSize += msgIoVec[i].iov_len;
}
}
-
- message->header.seqNr = entry->seqNr;
+ message->header.convertEndianess = 0;
message->header.payloadSize = payloadSize;
message->header.payloadPartSize = payloadSize;
message->header.payloadOffset = 0;
+ message->header.isLastSegment = 1;
void *metadataData = NULL;
size_t metadataSize = 0;
@@ -1052,6 +1053,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
}
message->header.metadataSize = metadataSize;
+ void *footerData = NULL;
+ size_t footerDataSize = 0;
+ if (entry->footerSize) {
+ handle->protocol->encodeFooter(handle->protocol->handle, message,
+ &footerData,
+ &footerDataSize);
+ }
+
size_t msgSize = 0;
struct msghdr msg;
struct iovec msg_iov[IOV_MAX];
@@ -1085,6 +1094,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
}
+ // Write optional footerData in vector buffer
+ if (footerData && footerDataSize) {
+ msg.msg_iovlen++;
+ msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+ msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ }
+
void *headerData = NULL;
size_t headerSize = 0;
// check if header is not part of the payload (=> headerBufferSize = 0)s
@@ -1128,7 +1145,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
} else if (msgSize) {
entry->retryCount = 0;
if (nbytes != msgSize) {
- L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", entry->seqNr, msgSize, nbytes, strerror(errno));
+ L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno));
}
}
// Release data
@@ -1142,7 +1159,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
if (metadataData) {
free(metadataData);
}
- entry->seqNr++;
+ if (footerData) {
+ free(footerData);
+ }
}
}
celixThreadRwlock_unlock(&handle->dbLock);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index e9add2d..eb6afbd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -156,7 +156,6 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
receiver->protocol = protocol;
receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
- bool isEndpoint = false;
bool isServerEndPoint = false;
/* Check if it's a static endpoint */
@@ -167,7 +166,6 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
if (endPointType != NULL) {
- isEndpoint = true;
if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
staticClientEndPointUrls = staticConnectUrls;
@@ -208,9 +206,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
- double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY,
- (!isEndpoint) ? PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT :
- PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT);
+ double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS,
PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 7e0fff3..088474a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -643,13 +643,16 @@ static void* psa_zmq_recvThread(void * data) {
zmsg_t *zmsg = zmsg_recv(receiver->zmqSock);
if (zmsg != NULL) {
if (zmsg_size(zmsg) < 2) {
- L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata)), got %i frames", (int)zmsg_size(zmsg));
+ L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata) (+ footer)), got %i frames", (int)zmsg_size(zmsg));
} else {
zframe_t *header = zmsg_pop(zmsg); // header
zframe_t *payload = NULL;
zframe_t *metadata = NULL;
+ zframe_t *footer = NULL;
pubsub_protocol_message_t message;
+ size_t footerSize = 0;
+ receiver->protocol->getFooterSize(receiver->protocol->handle, &footerSize);
receiver->protocol->decodeHeader(receiver->protocol->handle, zframe_data(header), zframe_size(header), &message);
if (message.header.payloadSize > 0) {
payload = zmsg_pop(zmsg);
@@ -664,6 +667,10 @@ static void* psa_zmq_recvThread(void * data) {
} else {
message.metadata.metadata = NULL;
}
+ if (footerSize > 0) {
+ footer = zmsg_pop(zmsg); // footer
+ receiver->protocol->decodeFooter(receiver->protocol->handle, zframe_data(footer), zframe_size(footer), &message);
+ }
if (header != NULL && payload != NULL) {
struct timespec receiveTime;
clock_gettime(CLOCK_REALTIME, &receiveTime);
@@ -673,6 +680,7 @@ static void* psa_zmq_recvThread(void * data) {
zframe_destroy(&header);
zframe_destroy(&payload);
zframe_destroy(&metadata);
+ zframe_destroy(&footer);
}
zmsg_destroy(&zmsg);
} else {
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 91fab55..413f1b3 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -578,21 +578,29 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
if(metadataLength > 1000000) {
L_WARN("ERR LARGE METADATA DETECTED\n");
}
+ void *footerData = NULL;
+ size_t footerLength = 0;
+ entry->protSer->encodeFooter(entry->protSer->handle, &message, &footerData, &footerLength);
message.header.msgId = msgTypeId;
+ message.header.seqNr = entry->seqNr;
message.header.msgMajorVersion = 0;
message.header.msgMinorVersion = 0;
message.header.payloadSize = payloadLength;
message.header.metadataSize = metadataLength;
message.header.payloadPartSize = payloadLength;
message.header.payloadOffset = 0;
+ message.header.isLastSegment = 1;
+ message.header.convertEndianess = 0;
+
+ // increase seqNr
+ entry->seqNr++;
void *headerData = NULL;
size_t headerLength = 0;
entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
-
errno = 0;
bool sendOk;
@@ -601,6 +609,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
zmq_msg_t msg1; // Header
zmq_msg_t msg2; // Payload
zmq_msg_t msg3; // Metadata
+ zmq_msg_t msg4; // Footer
void *socket = zsock_resolve(sender->zmq.socket);
psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
freeMsgEntry->msgSer = entry->msgSer;
@@ -617,16 +626,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
//send Payload
if (rc > 0) {
- if(metadataLength > 0) {
- zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, NULL);
- } else {
- zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
- }
- int flags = 0;
- if (metadataLength > 0) {
- flags = ZMQ_SNDMORE;
- }
- rc = zmq_msg_send(&msg2, socket, flags);
+ int flag = ((metadataLength > 0) || (footerLength > 0)) ? ZMQ_SNDMORE : 0;
+ zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+ rc = zmq_msg_send(&msg2, socket, flag);
if (rc == -1) {
L_WARN("Error sending payload msg. %s", strerror(errno));
zmq_msg_close(&msg2);
@@ -635,13 +637,25 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
//send MetaData
if (rc > 0 && metadataLength > 0) {
- zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, freeMsgEntry);
- rc = zmq_msg_send(&msg3, socket, 0);
+ int flag = (footerLength > 0 ) ? ZMQ_SNDMORE : 0;
+ zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, NULL);
+ rc = zmq_msg_send(&msg3, socket, flag);
if (rc == -1) {
L_WARN("Error sending metadata msg. %s", strerror(errno));
zmq_msg_close(&msg3);
}
}
+
+ //send Footer
+ if (rc > 0 && footerLength > 0) {
+ zmq_msg_init_data(&msg4, footerData, footerLength, psa_zmq_freeMsg, NULL);
+ rc = zmq_msg_send(&msg4, socket, 0);
+ if (rc == -1) {
+ L_WARN("Error sending footer msg. %s", strerror(errno));
+ zmq_msg_close(&msg4);
+ }
+ }
+
celixThreadMutex_unlock(&sender->zmq.mutex);
sendOk = rc > 0;
@@ -653,6 +667,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
if (metadataLength > 0) {
zmsg_addmem(msg, metadataData, metadataLength);
}
+ if (footerLength > 0) {
+ zmsg_addmem(msg, footerData, footerLength);
+ }
celixThreadMutex_lock(&sender->zmq.mutex);
int rc = zmsg_send(&msg, sender->zmq.socket);
celixThreadMutex_unlock(&sender->zmq.mutex);
@@ -672,8 +689,10 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
if (metadataData) {
free(metadataData);
}
+ if (footerData) {
+ free(footerData);
+ }
}
-
pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
if (message.metadata.metadata) {
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/CMakeLists.txt
similarity index 61%
copy from bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
copy to bundles/pubsub/pubsub_protocol/CMakeLists.txt
index f86a18e..ad935df 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/CMakeLists.txt
@@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -15,14 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-set(SOURCES
- src/main.cc
- src/PS_WP_tests.cc
- )
-add_executable(celix_pswp_tests ${SOURCES})
-#target_include_directories(celix_cxx_pswp_tests SYSTEM PRIVATE gtest)
-target_include_directories(celix_pswp_tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
-target_link_libraries(celix_pswp_tests PRIVATE celix_wire_protocol_v1_impl GTest::gtest Celix::pubsub_spi)
-add_test(NAME celix_pswp_tests COMMAND celix_pswp_tests)
-setup_target_for_coverage(celix_pswp_tests SCAN_DIR ..)
\ No newline at end of file
+add_subdirectory(pubsub_protocol_lib)
+add_subdirectory(pubsub_protocol_wire_v1)
+add_subdirectory(pubsub_protocol_wire_v2)
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/CMakeLists.txt
similarity index 61%
copy from bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/CMakeLists.txt
index f86a18e..14fea70 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/CMakeLists.txt
@@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -15,14 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-set(SOURCES
- src/main.cc
- src/PS_WP_tests.cc
- )
-add_executable(celix_pswp_tests ${SOURCES})
-#target_include_directories(celix_cxx_pswp_tests SYSTEM PRIVATE gtest)
-target_include_directories(celix_pswp_tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
-target_link_libraries(celix_pswp_tests PRIVATE celix_wire_protocol_v1_impl GTest::gtest Celix::pubsub_spi)
-
-add_test(NAME celix_pswp_tests COMMAND celix_pswp_tests)
-setup_target_for_coverage(celix_pswp_tests SCAN_DIR ..)
\ No newline at end of file
+add_library(celix_pubsub_protocol_lib STATIC
+ src/pubsub_wire_protocol_common.c
+)
+target_link_libraries(celix_pubsub_protocol_lib PUBLIC Celix::pubsub_spi)
+target_include_directories(celix_pubsub_protocol_lib PUBLIC include)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/include/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/include/pubsub_wire_protocol_common.h
new file mode 100644
index 0000000..7dd1ef2
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/include/pubsub_wire_protocol_common.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_PROTOCOL_COMMON_H
+#define CELIX_PUBSUB_PROTOCOL_COMMON_H
+
+#include <stdint.h>
+
+#include "celix_errno.h"
+#include "pubsub_protocol.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define NETSTRING_ERROR_TOO_LONG -1
+#define NETSTRING_ERROR_NO_COLON -2
+#define NETSTRING_ERROR_TOO_SHORT -3
+#define NETSTRING_ERROR_NO_COMMA -4
+#define NETSTRING_ERROR_LEADING_ZERO -5
+#define NETSTRING_ERROR_NO_LENGTH -6
+
+static const unsigned int PROTOCOL_WIRE_V1_SYNC_HEADER = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_V1_ENVELOPE_VERSION = 1;
+
+static const unsigned int PROTOCOL_WIRE_V2_SYNC_HEADER = 0xABBADEAF;
+static const unsigned int PROTOCOL_WIRE_V2_SYNC_FOOTER = 0xDEAFABBA;
+static const unsigned int PROTOCOL_WIRE_V2_ENVELOPE_VERSION = 2;
+
+int pubsubProtocol_readChar(const unsigned char *data, int offset, uint8_t *val);
+int pubsubProtocol_readShort(const unsigned char *data, int offset, uint32_t convert, uint16_t *val);
+int pubsubProtocol_readInt(const unsigned char *data, int offset, uint32_t convert, uint32_t *val);
+int pubsubProtocol_readLong(const unsigned char *data, int offset, uint32_t convert, uint64_t *val);
+
+int pubsubProtocol_writeChar(unsigned char *data, int offset, uint8_t val);
+int pubsubProtocol_writeShort(unsigned char *data, int offset, uint32_t convert, uint16_t val);
+int pubsubProtocol_writeInt(unsigned char *data, int offset, uint32_t convert, uint32_t val);
+int pubsubProtocol_writeLong(unsigned char *data, int offset, uint32_t convert, uint64_t val);
+
+celix_status_t pubsubProtocol_encodePayload(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodeMetadata(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_decodePayload(void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodeMetadata(void *data, size_t length, pubsub_protocol_message_t *message);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //CELIX_PUBSUB_PROTOCOL_COMMON_H
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c
similarity index 59%
rename from bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c
index 5e265d9..7055db9 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c
@@ -17,114 +17,164 @@
* under the License.
*/
-#include <stdio.h>
+#include "pubsub_wire_protocol_common.h"
+
#include <stdlib.h>
-#include <math.h>
#include <string.h>
+#include <math.h>
+#include <ctype.h>
+#include "celix_byteswap.h"
-#include "utils.h"
-#include "celix_properties.h"
-
-#include "pubsub_wire_protocol_impl.h"
-#include "pubsub_wire_protocol_common.h"
+static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) {
+ celix_status_t status = CELIX_SUCCESS;
-#define NETSTRING_ERROR_TOO_LONG -1
-#define NETSTRING_ERROR_NO_COLON -2
-#define NETSTRING_ERROR_TOO_SHORT -3
-#define NETSTRING_ERROR_NO_COMMA -4
-#define NETSTRING_ERROR_LEADING_ZERO -5
-#define NETSTRING_ERROR_NO_LENGTH -6
+ size_t str_len = strlen(string);
+ if (str_len == 0) {
+ // 0:,
+ *netstringOut = calloc(1, 4);
+ if (*netstringOut == NULL) {
+ status = CELIX_ENOMEM;
+ } else {
+ *netstringOut[0] = '0';
+ *netstringOut[1] = ':';
+ *netstringOut[2] = ',';
+ *netstringOut[3] = '\0';
+ }
+ } else {
+ size_t numlen = ceil(log10(str_len + 1));
+ *netstringOut = calloc(1, numlen + str_len + 3);
+ if (*netstringOut == NULL) {
+ status = CELIX_ENOMEM;
+ } else {
+ sprintf(*netstringOut, "%zu:%s,", str_len, string);
+ }
+ }
-struct pubsub_protocol_wire_v1 {
-};
+ return status;
+}
-static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut);
+/* Reads a netstring from a `buffer` of length `buffer_length`. Writes
+ to `netstring_start` a pointer to the beginning of the string in
+ the buffer, and to `netstring_length` the length of the
+ string. Does not allocate any memory. If it reads successfully,
+ then it returns 0. If there is an error, then the return value will
+ be negative. The error values are:
+ NETSTRING_ERROR_TOO_LONG More than 999999999 bytes in a field
+ NETSTRING_ERROR_NO_COLON No colon was found after the number
+ NETSTRING_ERROR_TOO_SHORT Number of bytes greater than buffer length
+ NETSTRING_ERROR_NO_COMMA No comma was found at the end
+ NETSTRING_ERROR_LEADING_ZERO Leading zeros are not allowed
+ NETSTRING_ERROR_NO_LENGTH Length not given at start of netstring
+ If you're sending messages with more than 999999999 bytes -- about
+ 2 GB -- then you probably should not be doing so in the form of a
+ single netstring. This restriction is in place partially to protect
+ from malicious or erroneous input, and partly to be compatible with
+ D. J. Bernstein's reference implementation.
+ Example:
+ if (netstring_read("3:foo,", 6, &str, &len) < 0) explode_and_die();
+ */
static int pubsubProtocol_parseNetstring(unsigned char *buffer, size_t buffer_length,
- unsigned char **netstring_start, size_t *netstring_length);
+ unsigned char **netstring_start, size_t *netstring_length) {
+ int i;
+ size_t len = 0;
-celix_status_t pubsubProtocol_create(pubsub_protocol_wire_v1_t **protocol) {
- celix_status_t status = CELIX_SUCCESS;
+ /* Write default values for outputs */
+ *netstring_start = NULL; *netstring_length = 0;
- *protocol = calloc(1, sizeof(**protocol));
+ /* Make sure buffer is big enough. Minimum size is 3. */
+ if (buffer_length < 3) return NETSTRING_ERROR_TOO_SHORT;
- if (!*protocol) {
- status = CELIX_ENOMEM;
- }
- else {
- //
+ /* No leading zeros allowed! */
+ if (buffer[0] == '0' && isdigit(buffer[1]))
+ return NETSTRING_ERROR_LEADING_ZERO;
+
+ /* The netstring must start with a number */
+ if (!isdigit(buffer[0])) return NETSTRING_ERROR_NO_LENGTH;
+
+ /* Read the number of bytes */
+ for (i = 0; i < buffer_length && isdigit(buffer[i]); i++) {
+ /* Error if more than 9 digits */
+ if (i >= 9) return NETSTRING_ERROR_TOO_LONG;
+ /* Accumulate each digit, assuming ASCII. */
+ len = len*10 + (buffer[i] - '0');
}
- return status;
-}
+ /* Check buffer length once and for all. Specifically, we make sure
+ that the buffer is longer than the number we've read, the length
+ of the string itself, and the colon and comma. */
+ if (i + len + 1 >= buffer_length) return NETSTRING_ERROR_TOO_SHORT;
-celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) {
- celix_status_t status = CELIX_SUCCESS;
+ /* Read the colon */
+ if (buffer[i++] != ':') return NETSTRING_ERROR_NO_COLON;
- free(protocol);
+ /* Test for the trailing comma, and set the return values */
+ if (buffer[i + len] != ',') return NETSTRING_ERROR_NO_COMMA;
+ *netstring_start = &buffer[i]; *netstring_length = len;
- return status;
+ return 0;
}
-celix_status_t pubsubProtocol_getHeaderSize(void* handle, size_t *length) {
- *length = sizeof(int) * 5 + sizeof(short) * 2; // header + sync + version = 24
+int pubsubProtocol_readChar(const unsigned char *data, int offset, uint8_t *val) {
+ memcpy(val, data + offset, sizeof(uint8_t));
+ *val = *val;
+ return offset + sizeof(uint16_t);
+}
- return CELIX_SUCCESS;
+int pubsubProtocol_readShort(const unsigned char *data, int offset, uint32_t convert, uint16_t *val) {
+ memcpy(val, data + offset, sizeof(uint16_t));
+ if (convert) {
+ *val = bswap_16(*val);
+ }
+ return offset + sizeof(uint16_t);
}
-celix_status_t pubsubProtocol_getHeaderBufferSize(void* handle, size_t *length) {
- return pubsubProtocol_getHeaderSize(handle, length);
+int pubsubProtocol_readInt(const unsigned char *data, int offset, uint32_t convert, uint32_t *val) {
+ memcpy(val, data + offset, sizeof(uint32_t));
+ if (convert) {
+ *val = bswap_32(*val);
+ }
+ return offset + sizeof(uint32_t);
}
-celix_status_t pubsubProtocol_getSyncHeaderSize(void* handle, size_t *length) {
- *length = sizeof(int);
- return CELIX_SUCCESS;
+int pubsubProtocol_readLong(const unsigned char *data, int offset, uint32_t convert, uint64_t *val) {
+ memcpy(val, data + offset, sizeof(uint64_t));
+ if (convert) {
+ *val = bswap_64(*val);
+ }
+ return offset + sizeof(uint64_t);
}
-celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
- writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC);
- return CELIX_SUCCESS;
+int pubsubProtocol_writeChar(unsigned char *data, int offset, uint8_t val) {
+ memcpy(data + offset, &val, sizeof(uint8_t));
+ return offset + sizeof(uint8_t);
}
-celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported) {
- *isSupported = false;
- return CELIX_SUCCESS;
+int pubsubProtocol_writeShort(unsigned char *data, int offset, uint32_t convert, uint16_t val) {
+ uint16_t nVal = (convert) ? bswap_16(val) : val;
+ memcpy(data + offset, &nVal, sizeof(uint16_t));
+ return offset + sizeof(uint16_t);
}
-celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
- celix_status_t status = CELIX_SUCCESS;
- // Get HeaderSize
- size_t headerSize = 0;
- pubsubProtocol_getHeaderSize(handle, &headerSize);
- if (*outBuffer == NULL) {
- *outBuffer = calloc(1, headerSize);
- *outLength = headerSize;
- }
- if (*outBuffer == NULL) {
- status = CELIX_ENOMEM;
- } else {
- int idx = 0;
- idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC);
- idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_ENVELOPE_VERSION);
- idx = writeInt(*outBuffer, idx, message->header.msgId);
- idx = writeShort(*outBuffer, idx, message->header.msgMajorVersion);
- idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
- idx = writeInt(*outBuffer, idx, message->header.payloadSize);
- idx = writeInt(*outBuffer, idx, message->header.metadataSize);
-
- *outLength = idx;
- }
+int pubsubProtocol_writeInt(unsigned char *data, int offset, uint32_t convert, uint32_t val) {
+ uint32_t nVal = (convert) ? bswap_32(val) : val;
+ memcpy(data + offset, &nVal, sizeof(uint32_t));
+ return offset + sizeof(uint32_t);
+}
- return status;
+int pubsubProtocol_writeLong(unsigned char *data, int offset, uint32_t convert, uint64_t val) {
+ uint64_t nVal = (convert) ? bswap_64(val) : val;
+ memcpy(data + offset, &nVal, sizeof(uint64_t));
+ return offset + sizeof(uint64_t);
}
-celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+celix_status_t pubsubProtocol_encodePayload(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
*outBuffer = message->payload.payload;
*outLength = message->payload.length;
return CELIX_SUCCESS;
}
-celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+celix_status_t pubsubProtocol_encodeMetadata(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
celix_status_t status = CELIX_SUCCESS;
unsigned char *line = calloc(1, 4);
@@ -167,7 +217,7 @@ celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_messa
}
}
int size = celix_properties_size(message->metadata.metadata);
- writeInt((unsigned char *) line, 0, size);
+ pubsubProtocol_writeInt((unsigned char *) line, 0, true, size);
*outBuffer = line;
*outLength = idx;
@@ -175,52 +225,18 @@ celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_messa
return status;
}
-celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
- celix_status_t status = CELIX_SUCCESS;
-
- int idx = 0;
- size_t headerSize = 0;
- pubsubProtocol_getHeaderSize(handle, &headerSize);
- if (length == headerSize) {
- unsigned int sync;
- idx = readInt(data, idx, &sync);
- if (sync != PROTOCOL_WIRE_SYNC) {
- status = CELIX_ILLEGAL_ARGUMENT;
- } else {
- unsigned int envelopeVersion;
- idx = readInt(data, idx, &envelopeVersion);
- if (envelopeVersion != PROTOCOL_WIRE_ENVELOPE_VERSION) {
- status = CELIX_ILLEGAL_ARGUMENT;
- } else {
- idx = readInt(data, idx, &message->header.msgId);
- idx = readShort(data, idx, &message->header.msgMajorVersion);
- idx = readShort(data, idx, &message->header.msgMinorVersion);
- idx = readInt(data, idx, &message->header.payloadSize);
- readInt(data, idx, &message->header.metadataSize);
- // Set message segmentation parameters to defaults
- message->header.seqNr = 0;
- message->header.payloadPartSize = message->header.payloadSize;
- message->header.payloadOffset = 0;
- }
- }
- } else {
- status = CELIX_ILLEGAL_ARGUMENT;
- }
- return status;
-}
-
-celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message){
+celix_status_t pubsubProtocol_decodePayload(void *data, size_t length, pubsub_protocol_message_t *message){
message->payload.payload = data;
message->payload.length = length;
return CELIX_SUCCESS;
}
-celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+celix_status_t pubsubProtocol_decodeMetadata(void *data, size_t length, pubsub_protocol_message_t *message) {
celix_status_t status = CELIX_SUCCESS;
uint32_t nOfElements;
- size_t idx = readInt(data, 0, &nOfElements);
+ size_t idx = pubsubProtocol_readInt(data, 0, true, &nOfElements);
unsigned char *netstring = data + idx;
int netstringLen = length - idx;
@@ -251,93 +267,4 @@ celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t le
}
return status;
-}
-
-static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) {
- celix_status_t status = CELIX_SUCCESS;
-
- size_t str_len = strlen(string);
- if (str_len == 0) {
- // 0:,
- *netstringOut = calloc(1, 4);
- if (*netstringOut == NULL) {
- status = CELIX_ENOMEM;
- } else {
- *netstringOut[0] = '0';
- *netstringOut[1] = ':';
- *netstringOut[2] = ',';
- *netstringOut[3] = '\0';
- }
- } else {
- size_t numlen = ceil(log10(str_len + 1));
- *netstringOut = calloc(1, numlen + str_len + 3);
- if (*netstringOut == NULL) {
- status = CELIX_ENOMEM;
- } else {
- sprintf(*netstringOut, "%zu:%s,", str_len, string);
- }
- }
-
- return status;
-}
-
-/* Reads a netstring from a `buffer` of length `buffer_length`. Writes
- to `netstring_start` a pointer to the beginning of the string in
- the buffer, and to `netstring_length` the length of the
- string. Does not allocate any memory. If it reads successfully,
- then it returns 0. If there is an error, then the return value will
- be negative. The error values are:
- NETSTRING_ERROR_TOO_LONG More than 999999999 bytes in a field
- NETSTRING_ERROR_NO_COLON No colon was found after the number
- NETSTRING_ERROR_TOO_SHORT Number of bytes greater than buffer length
- NETSTRING_ERROR_NO_COMMA No comma was found at the end
- NETSTRING_ERROR_LEADING_ZERO Leading zeros are not allowed
- NETSTRING_ERROR_NO_LENGTH Length not given at start of netstring
- If you're sending messages with more than 999999999 bytes -- about
- 2 GB -- then you probably should not be doing so in the form of a
- single netstring. This restriction is in place partially to protect
- from malicious or erroneous input, and partly to be compatible with
- D. J. Bernstein's reference implementation.
- Example:
- if (netstring_read("3:foo,", 6, &str, &len) < 0) explode_and_die();
- */
-static int pubsubProtocol_parseNetstring(unsigned char *buffer, size_t buffer_length,
- unsigned char **netstring_start, size_t *netstring_length) {
- int i;
- size_t len = 0;
-
- /* Write default values for outputs */
- *netstring_start = NULL; *netstring_length = 0;
-
- /* Make sure buffer is big enough. Minimum size is 3. */
- if (buffer_length < 3) return NETSTRING_ERROR_TOO_SHORT;
-
- /* No leading zeros allowed! */
- if (buffer[0] == '0' && isdigit(buffer[1]))
- return NETSTRING_ERROR_LEADING_ZERO;
-
- /* The netstring must start with a number */
- if (!isdigit(buffer[0])) return NETSTRING_ERROR_NO_LENGTH;
-
- /* Read the number of bytes */
- for (i = 0; i < buffer_length && isdigit(buffer[i]); i++) {
- /* Error if more than 9 digits */
- if (i >= 9) return NETSTRING_ERROR_TOO_LONG;
- /* Accumulate each digit, assuming ASCII. */
- len = len*10 + (buffer[i] - '0');
- }
-
- /* Check buffer length once and for all. Specifically, we make sure
- that the buffer is longer than the number we've read, the length
- of the string itself, and the colon and comma. */
- if (i + len + 1 >= buffer_length) return NETSTRING_ERROR_TOO_SHORT;
-
- /* Read the colon */
- if (buffer[i++] != ':') return NETSTRING_ERROR_NO_COLON;
-
- /* Test for the trailing comma, and set the return values */
- if (buffer[i + len] != ',') return NETSTRING_ERROR_NO_COMMA;
- *netstring_start = &buffer[i]; *netstring_length = len;
-
- return 0;
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt
similarity index 90%
copy from bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt
index 793ade8..d8a1ef8 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt
@@ -17,11 +17,10 @@
add_library(celix_wire_protocol_v1_impl STATIC
src/pubsub_wire_protocol_impl.c
- src/pubsub_wire_protocol_common.c
)
target_include_directories(celix_wire_protocol_v1_impl PRIVATE src)
-target_link_libraries(celix_wire_protocol_v1_impl PRIVATE Celix::pubsub_spi )
-
+target_link_libraries(celix_wire_protocol_v1_impl PUBLIC Celix::pubsub_spi)
+target_link_libraries(celix_wire_protocol_v1_impl PUBLIC celix_pubsub_protocol_lib)
add_celix_bundle(celix_pubsub_protocol_wire_v1
BUNDLE_SYMBOLICNAME "apache_celix_pubsub_protocol_wire_v1"
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
similarity index 100%
copy from bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
similarity index 85%
rename from bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
index 5b9f8a0..c39604a 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc
@@ -171,7 +171,7 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_EncodeMetadata_Test) { // NOLINT(c
void *data = nullptr;
size_t length = 0;
- celix_status_t status = pubsubProtocol_encodeMetadata(nullptr, &message, &data, &length);
+ celix_status_t status = pubsubProtocol_v1_encodeMetadata(nullptr, &message, &data, &length);
unsigned char exp[12];
uint32_t s = htonl(1);
@@ -199,7 +199,7 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_Test) { // NOLINT(c
memcpy(exp + 4, "1:a,1:b,", 8);
pubsub_protocol_message_t message;
- celix_status_t status = pubsubProtocol_decodeMetadata(nullptr, exp, 12, &message);
+ celix_status_t status = pubsubProtocol_v1_decodeMetadata(nullptr, exp, 12, &message);
ASSERT_EQ(status, CELIX_SUCCESS);
ASSERT_EQ(1, celix_properties_size(message.metadata.metadata));
@@ -221,7 +221,7 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_EmptyKey_Test) { //
memcpy(exp + 4, "0:,1:b,", 7);
pubsub_protocol_message_t message;
- celix_status_t status = pubsubProtocol_decodeMetadata(nullptr, exp, 11, &message);
+ celix_status_t status = pubsubProtocol_v1_decodeMetadata(nullptr, exp, 11, &message);
ASSERT_EQ(status, CELIX_SUCCESS);
ASSERT_EQ(1, celix_properties_size(message.metadata.metadata));
@@ -242,7 +242,7 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_SpecialChars_Test)
memcpy(exp + 4, "4:a,:l,1:b,", 11);
pubsub_protocol_message_t message;
- celix_status_t status = pubsubProtocol_decodeMetadata(nullptr, &exp, 15, &message);
+ celix_status_t status = pubsubProtocol_v1_decodeMetadata(nullptr, &exp, 15, &message);
ASSERT_EQ(status, CELIX_SUCCESS);
ASSERT_EQ(1, celix_properties_size(message.metadata.metadata));
@@ -253,3 +253,33 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_SpecialChars_Test)
pubsubProtocol_destroy(wireprotocol);
}
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_EncodeFooter_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v1_t *wireprotocol;
+ pubsubProtocol_create(&wireprotocol);
+
+ pubsub_protocol_message_t message;
+
+ void *footerData = nullptr;
+ size_t footerLength = 0;
+ celix_status_t status = pubsubProtocol_encodeFooter(nullptr, &message, &footerData, &footerLength);
+
+ ASSERT_EQ(status, CELIX_SUCCESS);
+ ASSERT_EQ(0, footerLength);
+ ASSERT_EQ(nullptr, footerData);
+ pubsubProtocol_destroy(wireprotocol);
+ free(footerData);
+}
+
+TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeFooter_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v1_t *wireprotocol;
+ pubsubProtocol_create(&wireprotocol);
+
+ unsigned char exp[4];
+ uint32_t s = 0xBAABABBA;
+ memcpy(exp, &s, sizeof(uint32_t));
+ pubsub_protocol_message_t message;
+
+ celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message);
+ ASSERT_EQ(CELIX_SUCCESS, status);
+ pubsubProtocol_destroy(wireprotocol);
+}
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/main.cc b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/main.cc
similarity index 100%
copy from bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/main.cc
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/gtest/src/main.cc
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
similarity index 83%
copy from bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
copy to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
index 359ea8f..3aff467 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
@@ -43,15 +43,18 @@ static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
act->protocolSvc.getHeaderBufferSize = pubsubProtocol_getHeaderBufferSize;
act->protocolSvc.getSyncHeaderSize = pubsubProtocol_getSyncHeaderSize;
act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader;
+ act->protocolSvc.getFooterSize = pubsubProtocol_getFooterSize;
act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_isMessageSegmentationSupported;
act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader;
- act->protocolSvc.encodePayload = pubsubProtocol_encodePayload;
- act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata;
+ act->protocolSvc.encodePayload = pubsubProtocol_v1_encodePayload;
+ act->protocolSvc.encodeMetadata = pubsubProtocol_v1_encodeMetadata;
+ act->protocolSvc.encodeFooter = pubsubProtocol_encodeFooter;
act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader;
- act->protocolSvc.decodePayload = pubsubProtocol_decodePayload;
- act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata;
+ act->protocolSvc.decodePayload = pubsubProtocol_v1_decodePayload;
+ act->protocolSvc.decodeMetadata = pubsubProtocol_v1_decodeMetadata;
+ act->protocolSvc.decodeFooter = pubsubProtocol_decodeFooter;
act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props);
}
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
new file mode 100644
index 0000000..6bd79d0
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <math.h>
+#include <string.h>
+
+#include "utils.h"
+#include "celix_properties.h"
+
+#include "pubsub_wire_protocol_impl.h"
+#include "pubsub_wire_protocol_common.h"
+
+#define BYTESWAP_SYNC true
+
+struct pubsub_protocol_wire_v1 {
+};
+
+celix_status_t pubsubProtocol_create(pubsub_protocol_wire_v1_t **protocol) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *protocol = calloc(1, sizeof(**protocol));
+
+ if (!*protocol) {
+ status = CELIX_ENOMEM;
+ }
+ else {
+ //
+ }
+
+ return status;
+}
+
+celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) {
+ celix_status_t status = CELIX_SUCCESS;
+ free(protocol);
+ return status;
+}
+
+celix_status_t pubsubProtocol_getHeaderSize(void* handle __attribute__((unused)), size_t *length) {
+ *length = sizeof(int) * 5 + sizeof(short) * 2; // header + sync + version = 24
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_getHeaderBufferSize(void* handle, size_t *length) {
+ return pubsubProtocol_getHeaderSize(handle, length);
+}
+
+celix_status_t pubsubProtocol_getSyncHeaderSize(void* handle __attribute__((unused)), size_t *length) {
+ *length = sizeof(int);
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_getSyncHeader(void* handle __attribute__((unused)), void *syncHeader) {
+ pubsubProtocol_writeInt(syncHeader, 0, true, PROTOCOL_WIRE_V1_SYNC_HEADER);
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_getFooterSize(void* handle __attribute__((unused)), size_t *length) {
+ *length = 0;
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle __attribute__((unused)), bool *isSupported) {
+ *isSupported = false;
+ return CELIX_SUCCESS;
+}
+celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ celix_status_t status = CELIX_SUCCESS;
+ // Get HeaderSize
+ size_t headerSize = 0;
+ pubsubProtocol_getHeaderSize(handle, &headerSize);
+
+ if (*outBuffer == NULL) {
+ *outBuffer = calloc(1, headerSize);
+ *outLength = headerSize;
+ }
+ if (*outBuffer == NULL) {
+ status = CELIX_ENOMEM;
+ } else {
+ int idx = 0;
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, BYTESWAP_SYNC, PROTOCOL_WIRE_V1_SYNC_HEADER);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, true, PROTOCOL_WIRE_V1_ENVELOPE_VERSION);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, true, message->header.msgId);
+ idx = pubsubProtocol_writeShort(*outBuffer, idx, true, message->header.msgMajorVersion);
+ idx = pubsubProtocol_writeShort(*outBuffer, idx, true, message->header.msgMinorVersion);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, true, message->header.payloadSize);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, true, message->header.metadataSize);
+
+ *outLength = idx;
+ }
+
+ return status;
+}
+
+celix_status_t pubsubProtocol_v1_encodePayload(void *handle __attribute__((unused)), pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ return pubsubProtocol_encodePayload(message, outBuffer, outLength);
+}
+
+celix_status_t pubsubProtocol_v1_encodeMetadata(void *handle __attribute__((unused)), pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ return pubsubProtocol_encodeMetadata(message, outBuffer, outLength);
+}
+
+celix_status_t pubsubProtocol_encodeFooter(void *handle __attribute__((unused)), pubsub_protocol_message_t *message __attribute__((unused)), void **outBuffer, size_t *outLength) {
+ *outBuffer = NULL;
+ return pubsubProtocol_getFooterSize(handle, outLength);
+}
+
+celix_status_t pubsubProtocol_v1_decodePayload(void* handle __attribute__((unused)), void *data, size_t length, pubsub_protocol_message_t *message){
+ return pubsubProtocol_decodePayload(data, length, message);
+}
+
+celix_status_t pubsubProtocol_v1_decodeMetadata(void* handle __attribute__((unused)), void *data, size_t length, pubsub_protocol_message_t *message) {
+ return pubsubProtocol_decodeMetadata(data, length, message);
+}
+
+celix_status_t pubsubProtocol_decodeFooter(void* handle __attribute__((unused)), void *data __attribute__((unused)), size_t length __attribute__((unused)), pubsub_protocol_message_t *message __attribute__((unused))) {
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsubProtocol_decodeHeader(void *handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ int idx = 0;
+ size_t headerSize = 0;
+ pubsubProtocol_getHeaderSize(handle, &headerSize);
+ if (length == headerSize) {
+ unsigned int sync;
+ idx = pubsubProtocol_readInt(data, idx, BYTESWAP_SYNC, &sync);
+ if (sync != PROTOCOL_WIRE_V1_SYNC_HEADER) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ } else {
+ unsigned int envelopeVersion;
+ idx = pubsubProtocol_readInt(data, idx, true, &envelopeVersion);
+ if (envelopeVersion != PROTOCOL_WIRE_V1_ENVELOPE_VERSION) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ } else {
+ idx = pubsubProtocol_readInt(data, idx, true, &message->header.msgId);
+ idx = pubsubProtocol_readShort(data, idx, true, &message->header.msgMajorVersion);
+ idx = pubsubProtocol_readShort(data, idx, true, &message->header.msgMinorVersion);
+ idx = pubsubProtocol_readInt(data, idx, true, &message->header.payloadSize);
+ pubsubProtocol_readInt(data, idx, true, &message->header.metadataSize);
+ // Set message segmentation parameters to defaults
+ message->header.seqNr = 0;
+ message->header.payloadPartSize = message->header.payloadSize;
+ message->header.payloadOffset = 0;
+ }
+ }
+ } else {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ return status;
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
similarity index 69%
rename from bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
index 06c4dcd..9b7521f 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
@@ -37,15 +37,18 @@ celix_status_t pubsubProtocol_getHeaderSize(void *handle, size_t *length);
celix_status_t pubsubProtocol_getHeaderBufferSize(void *handle, size_t *length);
celix_status_t pubsubProtocol_getSyncHeaderSize(void *handle, size_t *length);
celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader);
+celix_status_t pubsubProtocol_getFooterSize(void* handle, size_t *length);
celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported);
celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
-celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
-celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_v1_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_v1_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
-celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
-celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_v1_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_v1_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
#ifdef __cplusplus
}
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt
similarity index 53%
rename from bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt
index 793ade8..61324e2 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt
@@ -15,28 +15,27 @@
# specific language governing permissions and limitations
# under the License.
-add_library(celix_wire_protocol_v1_impl STATIC
- src/pubsub_wire_protocol_impl.c
- src/pubsub_wire_protocol_common.c
+add_library(celix_wire_protocol_v2_impl STATIC
+ src/pubsub_wire_v2_protocol_impl.c
)
-target_include_directories(celix_wire_protocol_v1_impl PRIVATE src)
-target_link_libraries(celix_wire_protocol_v1_impl PRIVATE Celix::pubsub_spi )
+target_include_directories(celix_wire_protocol_v2_impl PRIVATE src)
+target_link_libraries(celix_wire_protocol_v2_impl PUBLIC Celix::pubsub_spi)
+target_link_libraries(celix_wire_protocol_v2_impl PUBLIC celix_pubsub_protocol_lib)
-
-add_celix_bundle(celix_pubsub_protocol_wire_v1
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_protocol_wire_v1"
+add_celix_bundle(celix_pubsub_protocol_wire_v2
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_protocol_wire_v2"
VERSION "1.0.0"
GROUP "Celix/PubSub"
SOURCES
- src/ps_wire_protocol_activator.c
+ src/ps_wire_v2_protocol_activator.c
)
-target_include_directories(celix_pubsub_protocol_wire_v1 PRIVATE src)
-target_link_libraries(celix_pubsub_protocol_wire_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
-target_link_libraries(celix_pubsub_protocol_wire_v1 PRIVATE celix_wire_protocol_v1_impl)
+target_include_directories(celix_pubsub_protocol_wire_v2 PRIVATE src)
+target_link_libraries(celix_pubsub_protocol_wire_v2 PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
+target_link_libraries(celix_pubsub_protocol_wire_v2 PRIVATE celix_wire_protocol_v2_impl)
-install_celix_bundle(celix_pubsub_protocol_wire_v1 EXPORT celix COMPONENT pubsub)
+install_celix_bundle(celix_pubsub_protocol_wire_v2 EXPORT celix COMPONENT pubsub)
-add_library(Celix::pubsub_protocol_wire_v1 ALIAS celix_pubsub_protocol_wire_v1)
+add_library(Celix::pubsub_protocol_wire_v2 ALIAS celix_pubsub_protocol_wire_v2)
if (ENABLE_TESTING)
add_subdirectory(gtest)
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/CMakeLists.txt
similarity index 69%
rename from bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/CMakeLists.txt
index f86a18e..4d557a0 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/CMakeLists.txt
@@ -17,12 +17,12 @@
set(SOURCES
src/main.cc
- src/PS_WP_tests.cc
+ src/PS_WP_v2_tests.cc
)
-add_executable(celix_pswp_tests ${SOURCES})
+add_executable(celix_pswp_v2_tests ${SOURCES})
#target_include_directories(celix_cxx_pswp_tests SYSTEM PRIVATE gtest)
-target_include_directories(celix_pswp_tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
-target_link_libraries(celix_pswp_tests PRIVATE celix_wire_protocol_v1_impl GTest::gtest Celix::pubsub_spi)
+target_include_directories(celix_pswp_v2_tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
+target_link_libraries(celix_pswp_v2_tests PRIVATE celix_wire_protocol_v2_impl GTest::gtest Celix::pubsub_spi)
-add_test(NAME celix_pswp_tests COMMAND celix_pswp_tests)
-setup_target_for_coverage(celix_pswp_tests SCAN_DIR ..)
\ No newline at end of file
+add_test(NAME celix_pswp_v2_tests COMMAND celix_pswp_v2_tests)
+setup_target_for_coverage(celix_pswp_v2_tests SCAN_DIR ..)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/PS_WP_v2_tests.cc b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/PS_WP_v2_tests.cc
new file mode 100644
index 0000000..080b6f0
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/PS_WP_v2_tests.cc
@@ -0,0 +1,249 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <sstream>
+#include <arpa/inet.h>
+#include <iostream>
+
+#include <pubsub_wire_protocol_common.h>
+
+#include "gtest/gtest.h"
+
+#include "pubsub_wire_v2_protocol_impl.h"
+#include "celix_byteswap.h"
+#include <cstring>
+
+class WireProtocolV2Test : public ::testing::Test {
+public:
+ WireProtocolV2Test() = default;
+ ~WireProtocolV2Test() override = default;
+
+};
+
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_EncodeHeader_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v2_t *wireprotocol;
+ pubsubProtocol_wire_v2_create(&wireprotocol);
+
+ pubsub_protocol_message_t message;
+ message.header.msgId = 1;
+ message.header.seqNr = 4;
+ message.header.msgMajorVersion = 0;
+ message.header.msgMinorVersion = 0;
+ message.header.payloadSize = 2;
+ message.header.metadataSize = 3;
+ message.header.payloadPartSize = 4;
+ message.header.payloadOffset = 2;
+ message.header.isLastSegment = 1;
+ message.header.convertEndianess = 1;
+
+ void *headerData = nullptr;
+ size_t headerLength = 0;
+ celix_status_t status = pubsubProtocol_wire_v2_encodeHeader(nullptr, &message, &headerData, &headerLength);
+
+ unsigned char exp[40];
+ uint32_t s = bswap_32(0xABBADEAF);
+ memcpy(exp, &s, sizeof(uint32_t));
+ uint32_t e = 0x02000000; //envelope version
+ memcpy(exp+4, &e, sizeof(uint32_t));
+ uint32_t m = 0x01000000; //msg id
+ memcpy(exp+8, &m, sizeof(uint32_t));
+ uint32_t seq = 0x04000000; //seqnr
+ memcpy(exp+12, &seq, sizeof(uint32_t));
+ uint32_t v = 0x00000000;
+ memcpy(exp+16, &v, sizeof(uint32_t));
+ uint32_t ps = 0x02000000;
+ memcpy(exp+20, &ps, sizeof(uint32_t));
+ uint32_t ms = 0x03000000;
+ memcpy(exp+24, &ms, sizeof(uint32_t));
+ uint32_t pps = 0x04000000;
+ memcpy(exp+28, &pps, sizeof(uint32_t));
+ uint32_t ppo = 0x02000000;
+ memcpy(exp+32, &ppo, sizeof(uint32_t));
+ uint32_t ils = 0x01000000;
+ memcpy(exp+36, &ils, sizeof(uint32_t));
+
+ ASSERT_EQ(status, CELIX_SUCCESS);
+ ASSERT_EQ(40, headerLength);
+ for (int i = 0; i < 40; i++) {
+ ASSERT_EQ(((unsigned char*) headerData)[i], exp[i]);
+ }
+
+ pubsubProtocol_wire_v2_destroy(wireprotocol);
+ free(headerData);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeHeader_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v2_t *wireprotocol;
+ pubsubProtocol_wire_v2_create(&wireprotocol);
+
+ unsigned char exp[40];
+ uint32_t s = bswap_32(0xABBADEAF); //sync
+ memcpy(exp, &s, sizeof(uint32_t));
+ uint32_t e = 0x02000000; //envelope version
+ memcpy(exp+4, &e, sizeof(uint32_t));
+ uint32_t m = 0x01000000; //msg id
+ memcpy(exp+8, &m, sizeof(uint32_t));
+ uint32_t seq = 0x08000000; //seqnr
+ memcpy(exp+12, &seq, sizeof(uint32_t));
+ uint32_t v = 0x00000000;
+ memcpy(exp+16, &v, sizeof(uint32_t));
+ uint32_t ps = 0x02000000;
+ memcpy(exp+20, &ps, sizeof(uint32_t));
+ uint32_t ms = 0x03000000;
+ memcpy(exp+24, &ms, sizeof(uint32_t));
+ uint32_t pps = 0x04000000;
+ memcpy(exp+28, &pps, sizeof(uint32_t));
+ uint32_t ppo = 0x02000000;
+ memcpy(exp+32, &ppo, sizeof(uint32_t));
+ uint32_t ils = 0x01000000;
+ memcpy(exp+36, &ils, sizeof(uint32_t));
+
+ pubsub_protocol_message_t message;
+
+ celix_status_t status = pubsubProtocol_wire_v2_decodeHeader(nullptr, exp, 40, &message);
+
+ ASSERT_EQ(CELIX_SUCCESS, status);
+ ASSERT_EQ(1, message.header.msgId);
+ ASSERT_EQ(8, message.header.seqNr);
+ ASSERT_EQ(0, message.header.msgMajorVersion);
+ ASSERT_EQ(0, message.header.msgMinorVersion);
+ ASSERT_EQ(2, message.header.payloadSize);
+ ASSERT_EQ(3, message.header.metadataSize);
+ ASSERT_EQ(4, message.header.payloadPartSize);
+ ASSERT_EQ(2, message.header.payloadOffset);
+ ASSERT_EQ(1, message.header.isLastSegment);
+ ASSERT_EQ(1, message.header.convertEndianess);
+
+ pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeHeader_IncorrectSync_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v2_t *wireprotocol;
+ pubsubProtocol_wire_v2_create(&wireprotocol);
+
+ unsigned char exp[40];
+ uint32_t s = 0xBAABABBA;
+ memcpy(exp, &s, sizeof(uint32_t));
+ uint32_t e = 0x01000000;
+ memcpy(exp+4, &e, sizeof(uint32_t));
+ uint32_t m = 0x01000000;
+ memcpy(exp+8, &m, sizeof(uint32_t));
+ uint32_t seq = 0x08000000;
+ memcpy(exp+12, &seq, sizeof(uint32_t));
+ uint32_t v = 0x00000000;
+ memcpy(exp+16, &v, sizeof(uint32_t));
+ uint32_t ps = 0x02000000;
+ memcpy(exp+20, &ps, sizeof(uint32_t));
+ uint32_t ms = 0x03000000;
+ memcpy(exp+24, &ms, sizeof(uint32_t));
+
+ pubsub_protocol_message_t message;
+
+ celix_status_t status = pubsubProtocol_wire_v2_decodeHeader(nullptr, exp, 40, &message);
+
+ ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
+
+ pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeHeader_IncorrectVersion_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v2_t *wireprotocol;
+ pubsubProtocol_wire_v2_create(&wireprotocol);
+
+ unsigned char exp[40];
+ uint32_t s = 0xABBADEAF;
+ memcpy(exp, &s, sizeof(uint32_t));
+ uint32_t e = 0x02000000;
+ memcpy(exp+4, &e, sizeof(uint32_t));
+ uint32_t m = 0x01000000;
+ memcpy(exp+8, &m, sizeof(uint32_t));
+ uint32_t seq = 0x08000000;
+ memcpy(exp+12, &seq, sizeof(uint32_t));
+ uint32_t v = 0x00000000;
+ memcpy(exp+16, &v, sizeof(uint32_t));
+ uint32_t ps = 0x02000000;
+ memcpy(exp+20, &ps, sizeof(uint32_t));
+ uint32_t ms = 0x03000000;
+ memcpy(exp+24, &ms, sizeof(uint32_t));
+
+ pubsub_protocol_message_t message;
+
+ celix_status_t status = pubsubProtocol_wire_v2_decodeHeader(nullptr, exp, 40, &message);
+
+ ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
+
+ pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_EncodeFooter_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v2_t *wireprotocol;
+ pubsubProtocol_wire_v2_create(&wireprotocol);
+
+ pubsub_protocol_message_t message;
+ message.header.convertEndianess = 0;
+
+ void *footerData = nullptr;
+ size_t footerLength = 0;
+ celix_status_t status = pubsubProtocol_wire_v2_encodeFooter(nullptr, &message, &footerData, &footerLength);
+
+ unsigned char exp[4];
+ uint32_t s = 0xDEAFABBA;
+ memcpy(exp, &s, sizeof(uint32_t));
+ ASSERT_EQ(status, CELIX_SUCCESS);
+ ASSERT_EQ(4, footerLength);
+ for (int i = 0; i < 4; i++) {
+ if (((unsigned char*) footerData)[i] != exp[i]) {
+ std::cerr << "error at index " << std::to_string(i) << std::endl;
+ }
+ ASSERT_EQ(((unsigned char*) footerData)[i], exp[i]);
+ }
+ pubsubProtocol_wire_v2_destroy(wireprotocol);
+ free(footerData);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeFooter_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v2_t *wireprotocol;
+ pubsubProtocol_wire_v2_create(&wireprotocol);
+
+ unsigned char exp[4];
+ uint32_t s = 0xDEAFABBA;
+ memcpy(exp, &s, sizeof(uint32_t));
+ pubsub_protocol_message_t message;
+ message.header.convertEndianess = 0;
+ celix_status_t status = pubsubProtocol_wire_v2_decodeFooter(nullptr, exp, 4, &message);
+
+ ASSERT_EQ(CELIX_SUCCESS, status);
+ pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
+
+TEST_F(WireProtocolV2Test, WireProtocolV2Test_DecodeFooter_IncorrectSync_Test) { // NOLINT(cert-err58-cpp)
+ pubsub_protocol_wire_v2_t *wireprotocol;
+ pubsubProtocol_wire_v2_create(&wireprotocol);
+
+ unsigned char exp[4];
+ uint32_t s = 0xABBABAAB;
+ memcpy(exp, &s, sizeof(uint32_t));
+ pubsub_protocol_message_t message;
+
+ celix_status_t status = pubsubProtocol_wire_v2_decodeFooter(nullptr, exp, 4, &message);
+ ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
+
+ pubsubProtocol_wire_v2_destroy(wireprotocol);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/main.cc b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/main.cc
similarity index 100%
rename from bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/main.cc
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/gtest/src/main.cc
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/ps_wire_v2_protocol_activator.c
similarity index 57%
rename from bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
rename to bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/ps_wire_v2_protocol_activator.c
index 359ea8f..80e4433 100644
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/ps_wire_v2_protocol_activator.c
@@ -21,10 +21,10 @@
#include <pubsub_constants.h>
#include "celix_api.h"
-#include "pubsub_wire_protocol_impl.h"
+#include "pubsub_wire_v2_protocol_impl.h"
typedef struct ps_wp_activator {
- pubsub_protocol_wire_v1_t *wireprotocol;
+ pubsub_protocol_wire_v2_t *wireprotocol;
pubsub_protocol_service_t protocolSvc;
long wireProtocolSvcId;
@@ -33,25 +33,28 @@ typedef struct ps_wp_activator {
static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
act->wireProtocolSvcId = -1L;
- celix_status_t status = pubsubProtocol_create(&(act->wireprotocol));
+ celix_status_t status = pubsubProtocol_wire_v2_create(&(act->wireprotocol));
if (status == CELIX_SUCCESS) {
/* Set serializertype */
celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_PROTOCOL_TYPE_KEY, PUBSUB_WIRE_PROTOCOL_TYPE);
+ celix_properties_set(props, PUBSUB_PROTOCOL_TYPE_KEY, PUBSUB_WIRE_V2_PROTOCOL_TYPE);
- act->protocolSvc.getHeaderSize = pubsubProtocol_getHeaderSize;
- act->protocolSvc.getHeaderBufferSize = pubsubProtocol_getHeaderBufferSize;
- act->protocolSvc.getSyncHeaderSize = pubsubProtocol_getSyncHeaderSize;
- act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader;
- act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_isMessageSegmentationSupported;
-
- act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader;
- act->protocolSvc.encodePayload = pubsubProtocol_encodePayload;
- act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata;
+ act->protocolSvc.getHeaderSize = pubsubProtocol_wire_v2_getHeaderSize;
+ act->protocolSvc.getHeaderBufferSize = pubsubProtocol_wire_v2_getHeaderBufferSize;
+ act->protocolSvc.getSyncHeaderSize = pubsubProtocol_wire_v2_getSyncHeaderSize;
+ act->protocolSvc.getSyncHeader = pubsubProtocol_wire_v2_getSyncHeader;
+ act->protocolSvc.getFooterSize = pubsubProtocol_wire_v2_getFooterSize;
+ act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_wire_v2_isMessageSegmentationSupported;
- act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader;
- act->protocolSvc.decodePayload = pubsubProtocol_decodePayload;
- act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata;
+ act->protocolSvc.encodeHeader = pubsubProtocol_wire_v2_encodeHeader;
+ act->protocolSvc.encodePayload = pubsubProtocol_wire_v2_encodePayload;
+ act->protocolSvc.encodeMetadata = pubsubProtocol_wire_v2_encodeMetadata;
+ act->protocolSvc.encodeFooter = pubsubProtocol_wire_v2_encodeFooter;
+
+ act->protocolSvc.decodeHeader = pubsubProtocol_wire_v2_decodeHeader;
+ act->protocolSvc.decodePayload = pubsubProtocol_wire_v2_decodePayload;
+ act->protocolSvc.decodeMetadata = pubsubProtocol_wire_v2_decodeMetadata;
+ act->protocolSvc.decodeFooter = pubsubProtocol_wire_v2_decodeFooter;
act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props);
}
@@ -61,7 +64,7 @@ static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
static int ps_wp_stop(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->wireProtocolSvcId);
act->wireProtocolSvcId = -1L;
- pubsubProtocol_destroy(act->wireprotocol);
+ pubsubProtocol_wire_v2_destroy(act->wireprotocol);
return CELIX_SUCCESS;
}
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c
new file mode 100644
index 0000000..a05e095
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "celix_properties.h"
+
+#include "pubsub_wire_v2_protocol_impl.h"
+#include "pubsub_wire_protocol_common.h"
+
+struct pubsub_protocol_wire_v2 {
+};
+celix_status_t pubsubProtocol_wire_v2_create(pubsub_protocol_wire_v2_t **protocol) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *protocol = calloc(1, sizeof(**protocol));
+
+ if (!*protocol) {
+ status = CELIX_ENOMEM;
+ }
+ else {}
+ return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_destroy(pubsub_protocol_wire_v2_t* protocol) {
+ celix_status_t status = CELIX_SUCCESS;
+ free(protocol);
+ return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_getHeaderSize(void* handle, size_t *length) {
+ *length = sizeof(int) * 9 + sizeof(short) * 2; // header + sync + version = 36
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_getHeaderBufferSize(void* handle, size_t *length) {
+ return pubsubProtocol_wire_v2_getHeaderSize(handle, length);
+}
+
+celix_status_t pubsubProtocol_wire_v2_getSyncHeaderSize(void* handle, size_t *length) {
+ *length = sizeof(int);
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_getSyncHeader(void* handle, void *syncHeader) {
+ pubsubProtocol_writeInt(syncHeader, 0, false, PROTOCOL_WIRE_V2_SYNC_HEADER);
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_getFooterSize(void* handle, size_t *length) {
+ *length = sizeof(int);;
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_isMessageSegmentationSupported(void* handle, bool *isSupported) {
+ *isSupported = true;
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_wire_v2_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ celix_status_t status = CELIX_SUCCESS;
+ // Get HeaderSize
+ size_t headerSize = 0;
+ pubsubProtocol_wire_v2_getHeaderSize(handle, &headerSize);
+
+ if (*outBuffer == NULL) {
+ *outBuffer = malloc(headerSize);
+ *outLength = headerSize;
+ }
+ if (*outBuffer == NULL) {
+ status = CELIX_ENOMEM;
+ } else {
+ int idx = 0;
+ unsigned int convert = message->header.convertEndianess;
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, PROTOCOL_WIRE_V2_SYNC_HEADER);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, PROTOCOL_WIRE_V2_ENVELOPE_VERSION);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.msgId);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.seqNr);
+ idx = pubsubProtocol_writeShort(*outBuffer, idx, convert, message->header.msgMajorVersion);
+ idx = pubsubProtocol_writeShort(*outBuffer, idx, convert, message->header.msgMinorVersion);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadSize);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.metadataSize);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadPartSize);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadOffset);
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.isLastSegment);
+ *outLength = idx;
+ }
+
+ return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ celix_status_t status = CELIX_SUCCESS;
+ // Get HeaderSize
+ size_t footerSize = 0;
+ pubsubProtocol_wire_v2_getFooterSize(handle, &footerSize);
+
+ if (*outBuffer == NULL) {
+ *outBuffer = malloc(footerSize);
+ *outLength = footerSize;
+ }
+ if (*outBuffer == NULL) {
+ status = CELIX_ENOMEM;
+ } else {
+ int idx = 0;
+ unsigned int convert = message->header.convertEndianess;
+ idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, PROTOCOL_WIRE_V2_SYNC_FOOTER);
+ *outLength = idx;
+ }
+
+ return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ int idx = 0;
+ size_t headerSize = 0;
+ pubsubProtocol_wire_v2_getHeaderSize(handle, &headerSize);
+ if (length == headerSize) {
+ unsigned int sync = 0;
+ unsigned int sync_endianess = 0;
+ idx = pubsubProtocol_readInt(data, idx, false, &sync);
+ pubsubProtocol_readInt(data, 0, true, &sync_endianess);
+ message->header.convertEndianess = (sync_endianess == PROTOCOL_WIRE_V2_SYNC_HEADER) ? true : false;
+ if ((sync != PROTOCOL_WIRE_V2_SYNC_HEADER) && (sync_endianess != PROTOCOL_WIRE_V2_SYNC_HEADER)) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ } else {
+ unsigned int envelopeVersion;
+ unsigned int convert = message->header.convertEndianess;
+ idx = pubsubProtocol_readInt(data, idx, convert, &envelopeVersion);
+ if (envelopeVersion != PROTOCOL_WIRE_V2_ENVELOPE_VERSION) {
+ fprintf(stderr, "found sync %x and converted sync %x\n", sync, sync_endianess);
+ fprintf(stderr, "wrong envelop version\n");
+ fprintf(stderr, "Got %i, need %i\n", envelopeVersion, PROTOCOL_WIRE_V2_ENVELOPE_VERSION);
+ status = CELIX_ILLEGAL_ARGUMENT;
+ } else {
+ idx = pubsubProtocol_readInt(data, idx, convert, &message->header.msgId);
+ idx = pubsubProtocol_readInt(data, idx, convert, &message->header.seqNr);
+ idx = pubsubProtocol_readShort(data, idx, convert, &message->header.msgMajorVersion);
+ idx = pubsubProtocol_readShort(data, idx, convert, &message->header.msgMinorVersion);
+ idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadSize);
+ idx = pubsubProtocol_readInt(data, idx, convert, &message->header.metadataSize);
+ idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadPartSize);
+ idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadOffset);
+ pubsubProtocol_readInt(data, idx, convert, &message->header.isLastSegment);
+ }
+ }
+ } else {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ int idx = 0;
+ size_t footerSize = 0;
+ pubsubProtocol_wire_v2_getFooterSize(handle, &footerSize);
+ if (length == footerSize) {
+ unsigned int footerSync;
+ unsigned int convert = message->header.convertEndianess;
+ idx = pubsubProtocol_readInt(data, idx, convert, &footerSync);
+ if (footerSync != PROTOCOL_WIRE_V2_SYNC_FOOTER) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ } else {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ }
+ return status;
+}
+
+celix_status_t pubsubProtocol_wire_v2_encodePayload(void* handle __attribute__((unused)), pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ return pubsubProtocol_encodePayload(message, outBuffer, outLength);
+}
+
+celix_status_t pubsubProtocol_wire_v2_encodeMetadata(void* handle __attribute__((unused)), pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ return pubsubProtocol_encodeMetadata(message, outBuffer, outLength);
+}
+
+celix_status_t pubsubProtocol_wire_v2_decodePayload(void* handle __attribute__((unused)), void *data, size_t length, pubsub_protocol_message_t *message) {
+ return pubsubProtocol_decodePayload(data, length, message);
+}
+
+celix_status_t pubsubProtocol_wire_v2_decodeMetadata(void* handle __attribute__((unused)), void *data, size_t length, pubsub_protocol_message_t *message) {
+ return pubsubProtocol_decodeMetadata(data, length, message);
+}
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.h b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.h
new file mode 100644
index 0000000..295a7a2
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PROTOCOL_WIRE_V2_H_
+#define PUBSUB_PROTOCOL_WIRE_V2_H_
+
+#include "pubsub_protocol.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define PUBSUB_WIRE_V2_PROTOCOL_TYPE "envelope-v2"
+
+typedef struct pubsub_protocol_wire_v2 pubsub_protocol_wire_v2_t;
+
+celix_status_t pubsubProtocol_wire_v2_create(pubsub_protocol_wire_v2_t **protocol);
+celix_status_t pubsubProtocol_wire_v2_destroy(pubsub_protocol_wire_v2_t* protocol);
+
+celix_status_t pubsubProtocol_wire_v2_getHeaderSize(void *handle, size_t *length);
+celix_status_t pubsubProtocol_wire_v2_getHeaderBufferSize(void *handle, size_t *length);
+celix_status_t pubsubProtocol_wire_v2_getSyncHeaderSize(void *handle, size_t *length);
+celix_status_t pubsubProtocol_wire_v2_getSyncHeader(void* handle, void *syncHeader);
+celix_status_t pubsubProtocol_wire_v2_getFooterSize(void* handle, size_t *length);
+celix_status_t pubsubProtocol_wire_v2_isMessageSegmentationSupported(void* handle, bool *isSupported);
+
+celix_status_t pubsubProtocol_wire_v2_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_wire_v2_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+celix_status_t pubsubProtocol_wire_v2_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_wire_v2_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+celix_status_t pubsubProtocol_wire_v2_encodePayload(void* handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_wire_v2_encodeMetadata(void* handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_wire_v2_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_wire_v2_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PUBSUB_PROTOCOL_WIRE_V2_H_ */
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c
deleted file mode 100644
index 2f07375..0000000
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "pubsub_wire_protocol_common.h"
-
-#include <string.h>
-#if defined(__APPLE__)
- #include <machine/endian.h>
-#else
- #include <endian.h>
- #include <arpa/inet.h>
-#endif
-
-int readShort(const unsigned char *data, int offset, uint16_t *val) {
- memcpy(val, data + offset, sizeof(uint16_t));
- *val = ntohs(*val);
- return offset + (int)sizeof(uint16_t);
-}
-
-int readInt(const unsigned char *data, int offset, uint32_t *val) {
- memcpy(val, data + offset, sizeof(uint32_t));
- *val = ntohl(*val);
- return offset + (int)sizeof(uint32_t);
-}
-
-int readLong(const unsigned char *data, int offset, uint64_t *val) {
- memcpy(val, data + offset, sizeof(uint64_t));
-#if defined(__APPLE__)
- *val = ntohll(*val);
-#else
- *val = be64toh(*val);
-#endif
- return offset + (int)sizeof(uint64_t);
-}
-
-int writeShort(unsigned char *data, int offset, uint16_t val) {
- uint16_t nVal = htons(val);
- memcpy(data + offset, &nVal, sizeof(uint16_t));
- return offset + (int)sizeof(uint16_t);
-}
-
-int writeInt(unsigned char *data, int offset, uint32_t val) {
- uint32_t nVal = htonl(val);
- memcpy(data + offset, &nVal, sizeof(uint32_t));
- return offset + (int)sizeof(uint32_t);
-}
-
-int writeLong(unsigned char *data, int offset, uint64_t val) {
-#if defined(__APPLE__)
- uint64_t nVal = htonll(val);
-#else
- uint64_t nVal = htobe64(val);
-#endif
- memcpy(data + offset, &nVal, sizeof(uint64_t));
- return offset + (int)sizeof(uint64_t);
-}
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
deleted file mode 100644
index f5befd2..0000000
--- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
-#define CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
-
-#include <stdint.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
-static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
-
-int readShort(const unsigned char *data, int offset, uint16_t *val);
-int readInt(const unsigned char *data, int offset, uint32_t *val);
-int readLong(const unsigned char *data, int offset, uint64_t *val);
-
-int writeShort(unsigned char *data, int offset, uint16_t val);
-int writeInt(unsigned char *data, int offset, uint32_t val);
-int writeLong(unsigned char *data, int offset, uint64_t val);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif //CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
index ab8b883..ad1a387 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
@@ -42,11 +42,17 @@ struct pubsub_protocol_header {
uint32_t payloadSize;
uint32_t metadataSize;
+ /** optional convert Endianess attribute, this attribute is used to indicate the header needs to converted for endianess during encoding
+ * this attribute is used to indicate the payload needs to converted for endianess after header decoding.
+ * Note: this attribute is transmitted using the wire protocol, the sync word is used to determine endianess conversion */
+ uint32_t convertEndianess;
+
/** Optional message segmentation attributes, these attributes are only used/written by the protocol admin.
* When message segmentation is supported by the protocol admin */
uint32_t seqNr;
uint32_t payloadPartSize;
uint32_t payloadOffset;
+ uint32_t isLastSegment;
};
typedef struct pubsub_protocol_payload pubsub_protocol_payload_t;
@@ -113,6 +119,18 @@ typedef struct pubsub_protocol_service {
* @return status code indicating failure or success
*/
celix_status_t (*getSyncHeader)(void *handle, void *sync);
+
+ /**
+ * Returns the size of the footer.
+ * Is used by the receiver to configure the expected size of the footer.
+ * The receiver reads the footer to know if the complete message including paylaod is received.
+ *
+ * @param handle handle for service
+ * @param length output param for footer size
+ * @return status code indicating failure or success
+ */
+ celix_status_t (*getFooterSize)(void *handle, size_t *length);
+
/**
* Returns the if the protocol service supports the message segmentation attributes that is used by the underlying protocol.
*
@@ -158,6 +176,18 @@ typedef struct pubsub_protocol_service {
celix_status_t (*encodeMetadata)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
/**
+ * Encodes the footer
+ *
+ * @param handle handle for service
+ * @param message message to use footer from
+ * @param outBuffer byte array containing the encoded footer
+ * @param outLength length of the byte array
+ * @return status code indicating failure or success
+ */
+ celix_status_t (*encodeFooter)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+
+ /**
* Decodes the given data into message.header.
*
* @param handle handle for service
@@ -191,6 +221,17 @@ typedef struct pubsub_protocol_service {
* @return status code indicating failure or success
*/
celix_status_t (*decodeMetadata)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+ /**
+ * Decodes the given data into message.header.
+ *
+ * @param handle handle for service
+ * @param data incoming byte array to decode
+ * @param length length of the byte array
+ * @param message pointer to message to be filled in with decoded footer
+ * @return status code indicating failure or success
+ */
+ celix_status_t (*decodeFooter)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
} pubsub_protocol_service_t;
#endif /* PUBSUB_PROTOCOL_SERVICE_H_ */
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index ef73910..4c03808 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -141,7 +141,7 @@ if (BUILD_PUBSUB_PSA_TCP)
Celix::pubsub_serializer_json
Celix::pubsub_topology_manager
Celix::pubsub_admin_tcp
- Celix::pubsub_protocol_wire_v1
+ Celix::pubsub_protocol_wire_v2
pubsub_sut
pubsub_tst
)
@@ -225,6 +225,26 @@ if (BUILD_PUBSUB_PSA_ZMQ)
add_test(NAME pubsub_zmq_tests COMMAND pubsub_zmq_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_tests,CONTAINER_LOC>)
setup_target_for_coverage(pubsub_zmq_tests SCAN_DIR ..)
+ add_celix_container(pubsub_zmq_wire_v2_tests
+ USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ BUNDLES
+ Celix::pubsub_serializer_json
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_zmq
+ Celix::pubsub_protocol_wire_v2
+ pubsub_sut
+ pubsub_tst
+ )
+
+ target_link_libraries(pubsub_zmq_wire_v2_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi ZMQ::lib CZMQ::lib)
+ target_include_directories(pubsub_zmq_wire_v2_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
+ add_test(NAME pubsub_zmq_wire_v2_tests COMMAND pubsub_zmq_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_wire_v2_tests,CONTAINER_LOC>)
+ setup_target_for_coverage(pubsub_zmq_wire_v2_tests SCAN_DIR ..)
+
add_celix_container(pubsub_zmq_zerocopy_tests
USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
@@ -249,4 +269,28 @@ if (BUILD_PUBSUB_PSA_ZMQ)
#TODO fix issues with ZeroCopy and reanble test again
add_test(NAME pubsub_zmq_zerocopy_tests COMMAND pubsub_zmq_zerocopy_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_tests,CONTAINER_LOC>)
setup_target_for_coverage(pubsub_zmq_zerocopy_tests SCAN_DIR ..)
+
+ add_celix_container(pubsub_zmq_zerocopy_wire_v2_tests
+ USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ PSA_ZMQ_ZEROCOPY_ENABLED=true
+ BUNDLES
+ Celix::pubsub_serializer_json
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_zmq
+ Celix::pubsub_protocol_wire_v2
+ Celix::shell
+ Celix::shell_tui
+ pubsub_sut
+ pubsub_tst
+ )
+ target_link_libraries(pubsub_zmq_zerocopy_wire_v2_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi ZMQ::lib CZMQ::lib)
+ target_include_directories(pubsub_zmq_zerocopy_wire_v2_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
+
+ #TODO fix issues with ZeroCopy and reanble test again
+ add_test(NAME pubsub_zmq_zerocopy_wire_v2_tests COMMAND pubsub_zmq_zerocopy_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_wire_v2_tests,CONTAINER_LOC>)
+ setup_target_for_coverage(pubsub_zmq_zerocopy_wire_v2_tests SCAN_DIR ..)
endif ()
diff --git a/libs/framework/src/celix_log.c b/libs/framework/src/celix_log.c
index 587189b..012283d 100644
--- a/libs/framework/src/celix_log.c
+++ b/libs/framework/src/celix_log.c
@@ -27,6 +27,10 @@
#include "celix_threads.h"
#include "celix_array_list.h"
+#ifdef NO_MEMSTREAM_AVAILABLE
+#include "memstream/open_memstream.h"
+#endif
+
#define LOG_NAME "celix_framework"
struct celix_framework_logger {
diff --git a/libs/utils/include/celix_byteswap.h b/libs/utils/include/celix_byteswap.h
new file mode 100644
index 0000000..47c413a
--- /dev/null
+++ b/libs/utils/include/celix_byteswap.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_BYTESWAP_H
+#define CELIX_BYTESWAP_H
+
+/** Define byteswap compatibility functions for Apple OSX */
+#if defined(__APPLE__)
+/* Swap bytes in 16 bit value. */
+#define bswap_16(x) \
+ ((((x) & 0xff00) << 8) | (((x) & 0x00ff) >> 8))
+#define __bswap_16 bswap_16
+
+/* Swap bytes in 32 bit value. */
+#define bswap_32(x) \
+ ((((x) & 0xff000000) >> 24) | (((x) & 0x00ff0000) >> 8) | \
+ (((x) & 0x0000ff00) << 8) | (((x) & 0x000000ff) << 24))
+#define __bswap_32 bswap_32
+
+/* Swap bytes in 64 bit value. */
+# define bswap_64(x) \
+ ((((x) & 0xff00000000000000ull) >> 56) | \
+ (((x) & 0x00ff000000000000ull) >> 40) | \
+ (((x) & 0x0000ff0000000000ull) >> 24) | \
+ (((x) & 0x000000ff00000000ull) >> 8) | \
+ (((x) & 0x00000000ff000000ull) << 8) | \
+ (((x) & 0x0000000000ff0000ull) << 24) | \
+ (((x) & 0x000000000000ff00ull) << 40) | \
+ (((x) & 0x00000000000000ffull) << 56))
+#define __bswap_64 bswap_64
+#else
+#include <byteswap.h>
+#endif
+
+#endif /* CELIX_BYTESWAP_H */
\ No newline at end of file