You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/08/29 08:54:37 UTC

[celix] 02/03: Fix tests

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/tcp_admin_msg_segmentation
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 954d64d02f167413ad8e68d1b9e2874afd084f53
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Sat Aug 29 09:06:33 2020 +0200

    Fix tests
---
 CMakeLists.txt                                     | 20 +++----
 .../pubsub/pubsub_admin_tcp/src/psa_activator.c    |  4 +-
 .../src/pubsub_psa_tcp_constants.h                 |  3 ++
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 30 ++++++-----
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 29 ++++++++--
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |  1 +
 .../src/pubsub_tcp_topic_receiver.c                | 61 ++++++----------------
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  6 +--
 bundles/pubsub/test/CMakeLists.txt                 |  4 +-
 bundles/pubsub/test/test/sut_endpoint_activator.c  |  2 -
 10 files changed, 79 insertions(+), 81 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index a298fdd..8b7a406 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -32,6 +32,16 @@ IF (${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION} EQUAL 3.3 AND ${CMAKE_GENERATO
     message( FATAL_ERROR "Building Celix using CMake 3.3 and makefiles is not supported due to a bug in the Makefile Generator (see Bug 15696). Please change the used CMake version - both, CMake 3.2 and CMake 3.4 are working fine. Or use a different generator (e.g. Ninja)." )
 ENDIF()
 
+# Options
+option(ENABLE_TESTING "Enables unit/bundle testing" TRUE)
+if (ENABLE_TESTING)
+    find_package(GTest CONFIG QUIET)
+    if (NOT GTest_FOUND)
+        include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake)
+    endif()
+       enable_testing()
+endif ()
+
 set(ENABLE_MORE_WARNINGS OFF)
 
 # Set C specific flags
@@ -113,16 +123,6 @@ set(CELIX_MICRO "1")
 # Default bundle version
 set(DEFAULT_VERSION 1.0.0)
 
-# Options
-option(ENABLE_TESTING "Enables unit/bundle testing" FALSE)
-if (ENABLE_TESTING)
-    find_package(GTest CONFIG QUIET)
-    if (NOT GTest_FOUND)
-        include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake)
-    endif()
-    enable_testing()
-endif()
-
 option(CELIX_INSTALL_DEPRECATED_API "whether to install (and use) deprecated apis (i.e. header without a celix_ prefix." ON)
 
 option(CELIX_ADD_DEPRECATED_ATTRIBUTES "If enabled add deprecated attributes to deprecated services/functions." ON)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
index a5ae576..d93c478 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
@@ -121,9 +121,7 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
         celix_properties_t *props = celix_properties_create();
         celix_properties_set(props, CELIX_SHELL_COMMAND_NAME, "celix::psa_tcp");
         celix_properties_set(props, CELIX_SHELL_COMMAND_USAGE, "psa_tcp");
-        celix_properties_set(props,
-                             CELIX_SHELL_COMMAND_DESCRIPTION,
-                             "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
+        celix_properties_set(props, CELIX_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
         act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, CELIX_SHELL_COMMAND_SERVICE_NAME, props);
     }
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 9f03d13..dee36e6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -65,6 +65,9 @@
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY      "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT  5.0
 
+#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY      "PUBSUB_TCP_SUBSCRIBER_BLOCKING"
+#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT  false
+
 #define PUBSUB_TCP_PSA_IP_KEY                   "PSA_IP"
 #define PUBSUB_TCP_ADMIN_TYPE                   "tcp"
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index e7ad284..b1c1ea9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -636,24 +636,13 @@ pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t *psa, pubsub_t
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
-    const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
-
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
     const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
 
     if (url == NULL) {
         L_WARN("[PSA TCP] Error got endpoint without tcp url");
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eTopic != NULL && topic != NULL && strncmp(eTopic, topic, 1024 * 1024) == 0) {
-            if (scope == NULL && eScope == NULL) {
-                pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
-            } else if (scope != NULL && eScope != NULL && strncmp(eScope, scope, 1024 * 1024) == 0) {
-                pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
-            }
-        }
+        pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
     }
 
     return status;
@@ -690,6 +679,23 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
     pubsub_tcp_admin_t *psa = handle;
     celix_status_t status = CELIX_SUCCESS;
 
+
+    char *line = celix_utils_strdup(commandLine);
+    char *token = line;
+    strtok_r(line, " ", &token); //first token is command name
+    strtok_r(NULL, " ", &token); //second token is sub command
+
+    if (celix_utils_stringEquals(token, "nr_of_receivers")) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map));
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+    if (celix_utils_stringEquals(token, "nr_of_senders")) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map));
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    }
+
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
     celixThreadMutex_lock(&psa->serializers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index be5a694..daf36dc 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -97,6 +97,8 @@ typedef struct psa_tcp_connection_entry {
     size_t writeMetaBufferSize;
     void *writeMetaBuffer;
     unsigned int retryCount;
+    celix_thread_mutex_t writeMutex;
+    celix_thread_mutex_t readMutex;
 } psa_tcp_connection_entry_t;
 
 //
@@ -129,6 +131,7 @@ struct pubsub_tcpHandler {
     celix_thread_t thread;
     bool running;
     bool isEndPoint;
+    bool isBlocking;
 };
 
 static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
@@ -304,6 +307,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
     if (fd >= 0) {
         entry = calloc(sizeof(psa_tcp_connection_entry_t), 1);
         entry->fd = fd;
+        celixThreadMutex_create(&entry->writeMutex, NULL);
+        celixThreadMutex_create(&entry->readMutex, NULL);
         if (url)
             entry->url = strndup(url, 1024 * 1024);
         if (interface_url) {
@@ -359,6 +364,8 @@ pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
         if (entry->writeFooterBuffer) free(entry->writeFooterBuffer);
         if (entry->readMetaBuffer) free(entry->readMetaBuffer);
         if (entry->writeMetaBuffer) free(entry->writeMetaBuffer);
+        celixThreadMutex_destroy(&entry->writeMutex);
+        celixThreadMutex_destroy(&entry->readMutex);
         free(entry);
     }
 }
@@ -421,7 +428,9 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
                 L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno));
                 entry = NULL;
             }
-            rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+            if (!handle->isBlocking) {
+                rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+            }
             if (rc < 0) {
                 pubsub_tcpHandler_freeEntry(entry);
                 L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno));
@@ -750,6 +759,14 @@ void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle,bool isEndPoint)
     }
 }
 
+void pubsub_tcpHandler_setBlocking(pubsub_tcpHandler_t *handle,bool isBlocking) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->isBlocking = isBlocking;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
 static inline
 void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
 
@@ -774,7 +791,7 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
 // If the message is completely reassembled true is returned and the index and size have valid values
 //
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
-    celixThreadRwlock_writeLock(&handle->dbLock);
+    celixThreadRwlock_readLock(&handle->dbLock);
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
     if (entry == NULL)
         entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
@@ -788,7 +805,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
         celixThreadRwlock_unlock(&handle->dbLock);
         return -1;
     }
-
+    celixThreadMutex_lock(&entry->readMutex);
     if (entry->readHeaderBufferSize && entry->readHeaderBuffer) {
         entry->readHeaderBuffer = malloc(entry->readHeaderBufferSize);
     }
@@ -936,6 +953,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
             nbytes = 0; //Return 0 as indicator to close the connection
         }
     }
+    celixThreadMutex_unlock(&entry->readMutex);
     celixThreadRwlock_unlock(&handle->dbLock);
     return (int)nbytes;
 }
@@ -984,12 +1002,13 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
     int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];
     int nofConnToClose = 0;
     if (handle) {
-        celixThreadRwlock_writeLock(&handle->dbLock);
+        celixThreadRwlock_readLock(&handle->dbLock);
         hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
         size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->connected) continue;
+            celixThreadMutex_lock(&entry->writeMutex);
             void *payloadData = NULL;
             size_t payloadSize = 0;
             if (msg_iov_len == 1) {
@@ -1029,6 +1048,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 ((!isMessageSegmentationSupported) && (totalMessageSize > entry->maxMsgSize))) {
                 L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n",
                        entry->fd);
+                celixThreadMutex_unlock(&entry->writeMutex);
                 continue;
             }
 
@@ -1174,6 +1194,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                     free(payloadData);
                 }
             }
+            celixThreadMutex_unlock(&entry->writeMutex);
         }
         celixThreadRwlock_unlock(&handle->dbLock);
     }
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index a08911c..70eb29a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -66,6 +66,7 @@ void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned
 void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle, bool isEndPoint);
+void pubsub_tcpHandler_setBlocking(pubsub_tcpHandler_t *handle, bool isBlocking);
 
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 533e773..ccfbb57 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -24,17 +24,13 @@
 #include <pubsub/subscriber.h>
 #include <memory.h>
 #include <pubsub_constants.h>
-#include <assert.h>
-#include <pubsub_endpoint.h>
 #include <arpa/inet.h>
 #include <celix_log_helper.h>
-#include <math.h>
 #include "pubsub_tcp_handler.h"
 #include "pubsub_tcp_topic_receiver.h"
 #include "pubsub_psa_tcp_constants.h"
 #include "pubsub_tcp_common.h"
 
-#include "celix_utils_api.h"
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 #include <pubsub_utils.h>
@@ -120,24 +116,14 @@ typedef struct psa_tcp_subscriber_entry {
     bool initialized; //true if the init function is called through the receive thread
 } psa_tcp_subscriber_entry_t;
 
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                  const celix_bundle_t *owner);
-
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                     const celix_bundle_t *owner);
-
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void *psa_tcp_recvThread(void *data);
-
 static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
-
 static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
-
 static void processMsg(void *handle, const pubsub_protocol_message_t *hdr, bool *release, struct timespec *receiveTime);
-
 static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
-
 static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-
 static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
 
 pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -219,6 +205,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
                                                                  PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
         long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+        bool blocking = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY, PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT);
+
         pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
         pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
         pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
@@ -228,10 +216,10 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         pubsub_tcpHandler_setThreadPriority(receiver->socketHandler, prio, sched);
         pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt);
         pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
+        pubsub_tcpHandler_setBlocking(receiver->socketHandler, blocking);
     }
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                     PSA_TCP_DEFAULT_METRICS_ENABLED);
-
+                                                                          PSA_TCP_DEFAULT_METRICS_ENABLED);
     celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
     celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
     celixThreadMutex_create(&receiver->thread.mutex, NULL);
@@ -346,7 +334,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             pubsub_tcpHandler_destroy(receiver->socketHandler);
             receiver->socketHandler = NULL;
         }
-
+        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
         if (receiver->scope != NULL) {
             free(receiver->scope);
         }
@@ -371,8 +359,7 @@ long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver
     return receiver->protocolSvcId;
 }
 
-void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
-                                             celix_array_list_t *unconnectedUrls) {
+void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) {
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -466,8 +453,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
 
         hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
-        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
-                                                           &entry->msgTypes);
+        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, &entry->msgTypes);
 
         if (rc == 0) {
             entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
@@ -498,7 +484,6 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
     long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
-
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
     if (entry != NULL) {
@@ -568,17 +553,8 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                     hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
                     while (hashMapIterator_hasNext(&iter)) {
                         pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                        svc->receive(svc->handle,
-                                     msgSer->msgName,
-                                     msgSer->msgId,
-                                     deSerializedMsg,
-                                     message->metadata.metadata,
-                                     &release);
-                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
-                                                                   msgType,
-                                                                   msgId,
-                                                                   deSerializedMsg,
-                                                                   metadata);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
                         if (!release && hashMapIterator_hasNext(&iter)) {
                             //receive function has taken ownership and still more receive function to come ..
                             //deserialize again for new message
@@ -707,10 +683,8 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
                     result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
                     result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
                     result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
-                    result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds =
-                        metrics->averageTimeBetweenMessagesInSeconds;
-                    result->msgTypes[i].origins[k].averageSerializationTimeInSeconds =
-                        metrics->averageSerializationTimeInSeconds;
+                    result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+                    result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
                     result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
                     result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
 
@@ -720,7 +694,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
                            metrics->msgTypeId);
                 }
             }
-            i += 1;
+            i +=1 ;
         }
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -825,12 +799,11 @@ static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t
 
     int versionMajor;
     int versionMinor;
-    if (msgVersion != NULL) {
+    if (msgVersion!=NULL) {
         version_getMajor(msgVersion, &versionMajor);
         version_getMinor(msgVersion, &versionMinor);
-        if (major == ((unsigned char) versionMajor)) { /* Different major means incompatible */
-            check = (minor >=
-                ((unsigned char) versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+        if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
+            check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
         }
     }
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index f7598f9..c49f642 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -490,8 +490,7 @@ pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_se
             result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
             result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
             result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
-            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds =
-                mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
             result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
             result->msgMetrics[i].bndId = entry->bndId;
             result->msgMetrics[i].typeId = mEntry->type;
@@ -563,8 +562,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             entry->seqNr++;
             bool sendOk = true;
             {
-                int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput,
-                                                 serializedIoVecOutputLen, 0);
+                int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
                 if (rc < 0) {
                     status = -1;
                     sendOk = false;
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index a78958a..b18f252 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -38,7 +38,7 @@ celix_bundle_files(pubsub_endpoint_sut
 add_celix_bundle(pubsub_endpoint_tst
         #Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
         SOURCES
-        test/tst_activator.c
+        test/tst_endpoint_activator.c
         VERSION 1.0.0
         )
 target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
@@ -213,7 +213,7 @@ if (BUILD_PUBSUB_PSA_TCP)
             Celix::pubsub_serializer_json
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_tcp
-            Celix::pubsub_protocol_wire_v1
+            Celix::pubsub_protocol_wire_v2
             pubsub_loopback
             pubsub_endpoint_sut
             pubsub_endpoint_tst
diff --git a/bundles/pubsub/test/test/sut_endpoint_activator.c b/bundles/pubsub/test/test/sut_endpoint_activator.c
index c52ebf7..f3d8fa8 100644
--- a/bundles/pubsub/test/test/sut_endpoint_activator.c
+++ b/bundles/pubsub/test/test/sut_endpoint_activator.c
@@ -97,9 +97,7 @@ static void* sut_sendThread(void *data) {
             if (msg.seqNr % 1000 == 0) {
                 printf("Send %i messages\n", msg.seqNr);
             }
-
 		    msg.seqNr += 1;
-
         }
         pthread_mutex_unlock(&act->mutex);