You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rl...@apache.org on 2019/07/27 18:07:21 UTC

[celix] branch develop updated: Add pubsub_admin_tcp

This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new 58f77a3  Add pubsub_admin_tcp
     new 5be8c57  Merge pull request #36 from rbulter/feature/add_tcp_pubsub_endpoint
58f77a3 is described below

commit 58f77a3e1419cba2108726066277ccd5d4af39fe
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Thu Jul 18 21:01:10 2019 +0200

    Add pubsub_admin_tcp
---
 bundles/pubsub/CMakeLists.txt                      |   1 +
 bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt     |  47 ++
 .../pubsub/pubsub_admin_tcp/src/psa_activator.c    | 128 ++++
 .../src/pubsub_psa_tcp_constants.h                 | 105 +++
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 739 ++++++++++++++++++++
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h |  54 ++
 .../pubsub_admin_tcp/src/pubsub_tcp_common.c       | 144 ++++
 .../pubsub_admin_tcp/src/pubsub_tcp_common.h       |  54 ++
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 743 ++++++++++++++++++++
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |  74 ++
 .../pubsub_admin_tcp/src/pubsub_tcp_msg_header.h   |  34 +
 .../src/pubsub_tcp_topic_receiver.c                | 747 +++++++++++++++++++++
 .../src/pubsub_tcp_topic_receiver.h                |  51 ++
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 592 ++++++++++++++++
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h |  58 ++
 bundles/pubsub/test/CMakeLists.txt                 | 103 ++-
 bundles/pubsub/test/meta_data/ping.properties      |   2 +
 .../{ping.properties => ping2.properties}          |   6 +-
 .../{ping.properties => pong2.properties}          |   7 +-
 bundles/pubsub/test/test/loopback_activator.c      |  98 +++
 bundles/pubsub/test/test/sut_endpoint_activator.c  | 116 ++++
 bundles/pubsub/test/test/test_endpoint_runner.cc   |  39 ++
 bundles/pubsub/test/test/tst_endpoint_activator.cc | 120 ++++
 23 files changed, 4053 insertions(+), 9 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 314673a..33cff8e 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -31,6 +31,7 @@ if (PUBSUB)
 	add_subdirectory(pubsub_discovery)
 	add_subdirectory(pubsub_serializer_json)
 	add_subdirectory(pubsub_admin_zmq)
+	add_subdirectory(pubsub_admin_tcp)
 	add_subdirectory(pubsub_admin_nanomsg)
 	add_subdirectory(pubsub_admin_udp_mc)
 	add_subdirectory(keygen)
diff --git a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
new file mode 100644
index 0000000..8db6125
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(Jansson REQUIRED)
+find_package(UUID REQUIRED)
+
+add_celix_bundle(celix_pubsub_admin_tcp
+	BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp"
+	VERSION "1.0.0"
+	GROUP "Celix/PubSub"
+	SOURCES
+		src/psa_activator.c
+		src/pubsub_tcp_admin.c
+		src/pubsub_tcp_topic_sender.c
+		src/pubsub_tcp_topic_receiver.c
+		src/pubsub_tcp_handler.c
+		src/pubsub_tcp_common.c
+)
+
+set_target_properties(celix_pubsub_admin_tcp PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_tcp PRIVATE
+		Celix::pubsub_spi
+		Celix::framework Celix::dfi Celix::log_helper
+)
+target_include_directories(celix_pubsub_admin_tcp PRIVATE ${JANSSON_INCLUDE_DIR} src)
+# cmake find package UUID set the wrong include dir for OSX
+if (NOT APPLE)
+	target_include_directories(celix_pubsub_admin_tcp SYSTEM PRIVATE ${UUID_INCLUDE_DIRS})
+endif()
+
+install_celix_bundle(celix_pubsub_admin_tcp EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
new file mode 100644
index 0000000..183c721
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
@@ -0,0 +1,128 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include <stdlib.h>
+
+#include "celix_api.h"
+#include "pubsub_serializer.h"
+#include "log_helper.h"
+
+#include "pubsub_admin.h"
+#include "pubsub_admin_metrics.h"
+#include "pubsub_tcp_admin.h"
+#include "command.h"
+
+typedef struct psa_tcp_activator {
+	log_helper_t *logHelper;
+
+	pubsub_tcp_admin_t *admin;
+
+	long serializersTrackerId;
+
+	pubsub_admin_service_t adminService;
+	long adminSvcId;
+
+	pubsub_admin_metrics_service_t adminMetricsService;
+	long adminMetricsSvcId;
+
+	command_service_t cmdSvc;
+	long cmdSvcId;
+} psa_tcp_activator_t;
+
+int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
+	act->adminSvcId = -1L;
+	act->cmdSvcId = -1L;
+	act->serializersTrackerId = -1L;
+
+	logHelper_create(ctx, &act->logHelper);
+	logHelper_start(act->logHelper);
+
+	act->admin = pubsub_tcpAdmin_create(ctx, act->logHelper);
+	celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+
+	//track serializers
+	if (status == CELIX_SUCCESS) {
+		celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+		opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+		opts.filter.ignoreServiceLanguage = true;
+		opts.callbackHandle = act->admin;
+		opts.addWithProperties = pubsub_tcpAdmin_addSerializerSvc;
+		opts.removeWithProperties = pubsub_tcpAdmin_removeSerializerSvc;
+		act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+	}
+
+	//register pubsub admin service
+	if (status == CELIX_SUCCESS) {
+		pubsub_admin_service_t *psaSvc = &act->adminService;
+		psaSvc->handle = act->admin;
+		psaSvc->matchPublisher = pubsub_tcpAdmin_matchPublisher;
+		psaSvc->matchSubscriber = pubsub_tcpAdmin_matchSubscriber;
+		psaSvc->matchDiscoveredEndpoint = pubsub_tcpAdmin_matchDiscoveredEndpoint;
+		psaSvc->setupTopicSender = pubsub_tcpAdmin_setupTopicSender;
+		psaSvc->teardownTopicSender = pubsub_tcpAdmin_teardownTopicSender;
+		psaSvc->setupTopicReceiver = pubsub_tcpAdmin_setupTopicReceiver;
+		psaSvc->teardownTopicReceiver = pubsub_tcpAdmin_teardownTopicReceiver;
+		psaSvc->addDiscoveredEndpoint = pubsub_tcpAdmin_addDiscoveredEndpoint;
+		psaSvc->removeDiscoveredEndpoint = pubsub_tcpAdmin_removeDiscoveredEndpoint;
+
+		celix_properties_t *props = celix_properties_create();
+		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_TCP_ADMIN_TYPE);
+
+		act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
+	}
+
+	if (status == CELIX_SUCCESS) {
+		act->adminMetricsService.handle = act->admin;
+		act->adminMetricsService.metrics = pubsub_tcpAdmin_metrics;
+
+		celix_properties_t *props = celix_properties_create();
+		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_TCP_ADMIN_TYPE);
+
+		act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
+	}
+
+	//register shell command service
+	{
+		act->cmdSvc.handle = act->admin;
+		act->cmdSvc.executeCommand = pubsub_tcpAdmin_executeCommand;
+		celix_properties_t *props = celix_properties_create();
+		celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_tcp");
+		celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_tcp");
+		celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
+		act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+	}
+
+	return status;
+}
+
+int psa_tcp_stop(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
+	celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+	celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+	celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
+	celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+	pubsub_tcpAdmin_destroy(act->admin);
+
+	logHelper_stop(act->logHelper);
+	logHelper_destroy(&act->logHelper);
+
+	return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_tcp_activator_t, psa_tcp_start, psa_tcp_stop);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
new file mode 100644
index 0000000..a957c83
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -0,0 +1,105 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_PSA_TCP_CONSTANTS_H_
+#define PUBSUB_PSA_TCP_CONSTANTS_H_
+
+#define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
+#define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
+
+#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
+#define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
+
+#define PSA_TCP_DEFAULT_BASE_PORT               5501
+#define PSA_TCP_DEFAULT_MAX_PORT                6000
+
+#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       16
+#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        6500000
+#define PSA_TCP_DEFAULT_TIMEOUT                 500
+
+#define PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE 	      30
+#define PSA_TCP_DEFAULT_QOS_CONTROL_SCORE 	    70
+#define PSA_TCP_DEFAULT_SCORE 				          30
+
+#define PSA_TCP_QOS_SAMPLE_SCORE_KEY 		    "PSA_TCP_QOS_SAMPLE_SCORE"
+#define PSA_TCP_QOS_CONTROL_SCORE_KEY 		  "PSA_TCP_QOS_CONTROL_SCORE"
+#define PSA_TCP_DEFAULT_SCORE_KEY 			    "PSA_TCP_DEFAULT_SCORE"
+
+
+#define PSA_TCP_METRICS_ENABLED "PSA_TCP_METRICS_ENABLED"
+#define PSA_TCP_DEFAULT_METRICS_ENABLED true
+
+#define PUBSUB_TCP_VERBOSE_KEY      "PSA_TCP_VERBOSE"
+#define PUBSUB_TCP_VERBOSE_DEFAULT  true
+
+#define PUBSUB_TCP_PSA_IP_KEY       "PSA_IP"
+#define PUBSUB_TCP_PSA_ITF_KEY	    "PSA_INTERFACE"
+
+#define PUBSUB_TCP_DEFAULT_IP       "127.0.0.1"
+
+#define PUBSUB_TCP_ADMIN_TYPE       "tcp"
+
+/**
+ * The TCP url key for the topic sender endpoints
+ */
+#define PUBSUB_TCP_URL_KEY          "tcp.url"
+
+/**
+ * Can be set in the topic properties to fix a static bind url
+ */
+#define PUBSUB_TCP_STATIC_BIND_URL       "tcp.static.bind.url"
+
+/**
+ * Can be set in the topic properties to fix a static url used for discovery
+ */
+#define PUBSUB_TCP_STATIC_DISCOVER_URL       "tcp.static.bind.url"
+
+/**
+ * If set true on the endpoint, the tcp TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_TCP_STATIC_CONFIGURED       "tcp.static.configured"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated.
+ * Can be set in the topic properties.
+ */
+#define PUBSUB_TCP_STATIC_CONNECT_URLS    "tcp.static.connect.urls"
+
+/**
+ * The static endpoint type which a static endpoint should be configured.
+ * Can be set in the topic properties.
+ */
+#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE    "tcp.static.endpoint.type"
+
+
+#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER "server"
+#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT "client"
+
+/**
+ * Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
+ * internal TCP threads.
+ * Can be set in the topic properties.
+ */
+#define PUBSUB_TCP_THREAD_REALTIME_PRIO    "thread.realtime.prio"
+#define PUBSUB_TCP_THREAD_REALTIME_SCHED   "thread.realtime.sched"
+
+
+#endif /* PUBSUB_PSA_TCP_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
new file mode 100644
index 0000000..f6d0720
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -0,0 +1,739 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <memory.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <ifaddrs.h>
+#include <pubsub_endpoint.h>
+#include <pubsub_serializer.h>
+
+#include "pubsub_utils.h"
+#include "pubsub_tcp_admin.h"
+#include "pubsub_tcp_handler.h"
+#include "pubsub_psa_tcp_constants.h"
+#include "pubsub_tcp_topic_sender.h"
+#include "pubsub_tcp_topic_receiver.h"
+
+#define L_DEBUG(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_tcp_admin {
+    celix_bundle_context_t *ctx;
+    log_helper_t *log;
+    const char *fwUUID;
+
+    char* ipAddress;
+
+    unsigned int basePort;
+    unsigned int maxPort;
+
+    double qosSampleScore;
+    double qosControlScore;
+    double defaultScore;
+
+    bool verbose;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = svcId, value = psa_tcp_serializer_entry_t*
+    } serializers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_tcp_topic_sender_t*
+    } topicSenders;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_tcp_topic_sender_t*
+    } topicReceivers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
+    } discoveredEndpoints;
+
+  pubsub_tcp_endPointStore_t endpointStore;
+};
+
+typedef struct psa_tcp_serializer_entry {
+    const char *serType;
+    long svcId;
+    pubsub_serializer_service_t *svc;
+} psa_tcp_serializer_entry_t;
+
+static celix_status_t tcp_getIpAddress(const char* interface, char** ip);
+static celix_status_t pubsub_tcpAdmin_connectEndpointToReceiver(pubsub_tcp_admin_t* psa, pubsub_tcp_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t* psa, pubsub_tcp_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+
+
+pubsub_tcp_admin_t* pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
+    pubsub_tcp_admin_t *psa = calloc(1, sizeof(*psa));
+    psa->ctx = ctx;
+    psa->log = logHelper;
+    psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_TCP_VERBOSE_KEY, PUBSUB_TCP_VERBOSE_DEFAULT);
+    psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+
+    char *ip = NULL;
+    const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY , NULL);
+    if (confIp != NULL) {
+        ip = strndup(confIp, 1024);
+    }
+
+    if (ip == NULL) {
+        //TODO try to get ip from subnet (CIDR)
+    }
+
+    if (ip == NULL) {
+        //try to get ip from itf
+        const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_ITF_KEY, NULL);
+        tcp_getIpAddress(interface, &ip);
+    }
+
+    if (ip == NULL) {
+        L_WARN("[PSA_TCP] Could not determine IP address for PSA, using default ip (%s)", PUBSUB_TCP_DEFAULT_IP);
+        ip = strndup(PUBSUB_TCP_DEFAULT_IP, 1024);
+    }
+
+    psa->ipAddress = ip;
+    if (psa->verbose) {
+        L_INFO("[PSA_TCP] Using %s for service annunciation", ip);
+    }
+
+
+    long basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_BASE_PORT, PSA_TCP_DEFAULT_BASE_PORT);
+    long maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_PORT, PSA_TCP_DEFAULT_MAX_PORT);
+    psa->basePort = (unsigned int)basePort;
+    psa->maxPort = (unsigned int)maxPort;
+    if (psa->verbose) {
+        L_INFO("[PSA_TCP] Using base till max port: %i till %i", psa->basePort, psa->maxPort);
+    }
+
+    psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_DEFAULT_SCORE_KEY, PSA_TCP_DEFAULT_SCORE);
+    psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_SAMPLE_SCORE_KEY, PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE);
+    psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_CONTROL_SCORE_KEY, PSA_TCP_DEFAULT_QOS_CONTROL_SCORE);
+
+    celixThreadMutex_create(&psa->serializers.mutex, NULL);
+    psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+    celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
+    psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
+    psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
+    psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->endpointStore.mutex, NULL);
+    psa->endpointStore.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    return psa;
+}
+
+void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
+    if (psa == NULL) {
+        return;
+    }
+
+    celixThreadMutex_lock(&psa->endpointStore.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->endpointStore.map);
+    while (hashMapIterator_hasNext(&iter)) {
+      pubsub_tcpHandler_t* tcpHandler = hashMapIterator_nextValue(&iter);
+      pubsub_tcpHandler_destroy(tcpHandler);
+    }
+    celixThreadMutex_unlock(&psa->endpointStore.mutex);
+
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_tcp_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        pubsub_tcpTopicSender_destroy(sender);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_tcp_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+        pubsub_tcpTopicReceiver_destroy(recv);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_properties_t *ep = hashMapIterator_nextValue(&iter);
+        celix_properties_destroy(ep);
+    }
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    iter = hashMapIterator_construct(psa->serializers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    //note assuming al psa register services and service tracker are removed.
+    celixThreadMutex_destroy(&psa->endpointStore.mutex);
+    hashMap_destroy(psa->endpointStore.map, true, false);
+
+    celixThreadMutex_destroy(&psa->topicSenders.mutex);
+    hashMap_destroy(psa->topicSenders.map, true, false);
+
+    celixThreadMutex_destroy(&psa->topicReceivers.mutex);
+    hashMap_destroy(psa->topicReceivers.map, true, false);
+
+    celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
+    hashMap_destroy(psa->discoveredEndpoints.map, false, false);
+
+    celixThreadMutex_destroy(&psa->serializers.mutex);
+    hashMap_destroy(psa->serializers.map, false, false);
+
+    free(psa->ipAddress);
+
+    free(psa);
+}
+
+void pubsub_tcpAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_tcp_admin_t *psa = handle;
+
+    const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    if (serType == NULL) {
+        L_INFO("[PSA_TCP] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
+        return;
+    }
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_tcp_serializer_entry_t *entry = hashMap_get(psa->serializers.map, (void*)svcId);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->serType = serType;
+        entry->svcId = svcId;
+        entry->svc = svc;
+        hashMap_put(psa->serializers.map, (void*)svcId, entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_tcp_admin_t *psa = handle;
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    //remove serializer
+    // 1) First find entry and
+    // 2) loop and destroy all topic sender using the serializer and
+    // 3) loop and destroy all topic receivers using the serializer
+    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_tcp_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (entry != NULL) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_tcp_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+            if (sender != NULL && entry->svcId == pubsub_tcpTopicSender_serializerSvcId(sender)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_tcpTopicSender_destroy(sender);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+            if (receiver != NULL && entry->svcId == pubsub_tcpTopicReceiver_serializerSvcId(receiver)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_tcpTopicReceiver_destroy(receiver);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+        free(entry);
+    }
+}
+
+celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+    pubsub_tcp_admin_t *psa = handle;
+    L_DEBUG("[PSA_TCP] pubsub_tcpAdmin_matchPublisher");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_TCP_ADMIN_TYPE,
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+    *outScore = score;
+
+    return status;
+}
+
+celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+    pubsub_tcp_admin_t *psa = handle;
+    L_DEBUG("[PSA_TCP] pubsub_tcpAdmin_matchSubscriber");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_TCP_ADMIN_TYPE,
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+    if (outScore != NULL) {
+        *outScore = score;
+    }
+    return status;
+}
+
+celix_status_t pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+    pubsub_tcp_admin_t *psa = handle;
+    L_DEBUG("[PSA_TCP] pubsub_tcpAdmin_matchEndpoint");
+    celix_status_t  status = CELIX_SUCCESS;
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_TCP_ADMIN_TYPE, NULL);
+    if (outMatch != NULL) {
+        *outMatch = match;
+    }
+    return status;
+}
+
+celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+    pubsub_tcp_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Create TopicSender
+    //2) Store TopicSender
+    //3) Connect existing endpoints
+    //4) set outPublisherEndpoint
+
+    celix_properties_t *newEndpoint = NULL;
+
+    const char *staticBindUrl = topicProperties != NULL ? celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL) : NULL;
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+    /* Check if it's a static client endpoint, needs also be declared static  */
+    bool isEndPointTypeClient = false;
+    const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
+    if (endPointType != NULL) {
+      if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) ==0) {
+        isEndPointTypeClient = true;
+      }
+    }
+    const char *staticConnectUrls = ((topicProperties != NULL) && isEndPointTypeClient) ? celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL) : NULL;
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    pubsub_tcp_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
+    if (sender == NULL) {
+        psa_tcp_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, topicProperties, &psa->endpointStore, serializerSvcId, serEntry->svc,
+                     psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
+        }
+        if (sender != NULL) {
+            const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+            celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
+
+            //if configured use a static discover url
+            const char *staticDiscUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
+            if (staticDiscUrl != NULL) {
+                celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, staticDiscUrl);
+            }
+            celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL || staticConnectUrls!=NULL);
+            celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
+
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
+            }
+            hashMap_put(psa->topicSenders.map, key, sender);
+        } else {
+            L_ERROR("[PSA TCP] Error creating a TopicSender");
+            free(key);
+        }
+    } else {
+        free(key);
+        L_ERROR("[PSA_TCP] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (sender != NULL && newEndpoint != NULL) {
+        //TODO connect endpoints to sender, NOTE is this needed for a tcp topic sender?
+    }
+
+    if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+        *outPublisherEndpoint = newEndpoint;
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_tcpAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+    pubsub_tcp_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Find and remove TopicSender from map
+    //2) destroy topic sender
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
+    if (entry != NULL) {
+        char *mapKey = hashMapEntry_getKey(entry);
+        pubsub_tcp_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
+        free(mapKey);
+        //TODO disconnect endpoints to sender. note is this needed for a tcp topic sender?
+        pubsub_tcpTopicSender_destroy(sender);
+    } else {
+        L_ERROR("[PSA TCP] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    free(key);
+
+    return status;
+}
+
+celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+    pubsub_tcp_admin_t *psa = handle;
+
+    celix_properties_t *newEndpoint = NULL;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+    if (receiver == NULL) {
+        psa_tcp_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, &psa->endpointStore,
+                    serializerSvcId, serEntry->svc);
+        } else {
+            L_ERROR("[PSA_TCP] Cannot find serializer for TopicSender %s/%s", scope, topic);
+        }
+        if (receiver != NULL) {
+            const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
+            }
+            hashMap_put(psa->topicReceivers.map, key, receiver);
+        } else {
+            L_ERROR("[PSA TCP] Error creating a TopicReceiver.");
+            free(key);
+        }
+    } else {
+        free(key);
+        L_ERROR("[PSA_TCP] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (receiver != NULL && newEndpoint != NULL) {
+        celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+            const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+                pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
+        }
+        celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+    }
+
+    if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+        *outSubscriberEndpoint = newEndpoint;
+    }
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_tcpAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
+    pubsub_tcp_admin_t *psa = handle;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+    free(key);
+    if (entry != NULL) {
+        char *receiverKey = hashMapEntry_getKey(entry);
+        pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+        hashMap_remove(psa->topicReceivers.map, receiverKey);
+
+        free(receiverKey);
+        pubsub_tcpTopicReceiver_destroy(receiver);
+    }
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+static celix_status_t pubsub_tcpAdmin_connectEndpointToReceiver(pubsub_tcp_admin_t* psa, pubsub_tcp_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
+    const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
+
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
+
+    if (url == NULL) {
+        const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+        const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+        L_WARN("[PSA TCP] Error got endpoint without a tcp url (admin: %s, type: %s)", admin , type);
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        if (eScope != NULL && eTopic != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 &&
+            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+            pubsub_tcpTopicReceiver_connectTo(receiver, url);
+        }
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_tcp_admin_t *psa = handle;
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    celix_properties_t *cpy = celix_properties_copy(endpoint);
+    const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
+    hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+
+static celix_status_t pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t* psa, pubsub_tcp_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
+    const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
+
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
+
+    if (url == NULL) {
+        L_WARN("[PSA TCP] Error got endpoint without tcp url");
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        if (eScope != NULL && eTopic != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 &&
+            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+            pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
+        }
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_tcpAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_tcp_admin_t *psa = handle;
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            pubsub_tcpAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+    celix_properties_t *found = hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid);
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    if (found != NULL) {
+        celix_properties_destroy(found);
+    }
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_tcpAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
+    pubsub_tcp_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    fprintf(out, "\n");
+    fprintf(out, "Topic Senders:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_tcp_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        long serSvcId = pubsub_tcpTopicSender_serializerSvcId(sender);
+        psa_tcp_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *scope = pubsub_tcpTopicSender_scope(sender);
+        const char *topic = pubsub_tcpTopicSender_topic(sender);
+        const char *url = pubsub_tcpTopicSender_url(sender);
+        const char *postUrl = pubsub_tcpTopicSender_isStatic(sender) ? " (static)" : "";
+        fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- url            = %s%s\n", url, postUrl);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    fprintf(out, "\n");
+    fprintf(out, "\nTopic Receivers:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        long serSvcId = pubsub_tcpTopicReceiver_serializerSvcId(receiver);
+        psa_tcp_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
+        const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
+
+        celix_array_list_t *connected = celix_arrayList_create();
+        celix_array_list_t *unconnected = celix_arrayList_create();
+        pubsub_tcpTopicReceiver_listConnections(receiver, connected, unconnected);
+
+        fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        for (int i = 0; i < celix_arrayList_size(connected); ++i) {
+            char *url = celix_arrayList_get(connected, i);
+            fprintf(out, "   |- connected url   = %s\n", url);
+            free(url);
+        }
+        for (int i = 0; i < celix_arrayList_size(unconnected); ++i) {
+            char *url = celix_arrayList_get(unconnected, i);
+            fprintf(out, "   |- unconnected url = %s\n", url);
+            free(url);
+        }
+        celix_arrayList_destroy(connected);
+        celix_arrayList_destroy(unconnected);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+    fprintf(out, "\n");
+
+    return status;
+}
+
+pubsub_admin_metrics_t* pubsub_tcpAdmin_metrics(void *handle) {
+    pubsub_tcp_admin_t *psa = handle;
+    pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
+    snprintf(result->psaType, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", PUBSUB_TCP_ADMIN_TYPE);
+    result->senders = celix_arrayList_create();
+    result->receivers = celix_arrayList_create();
+
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_tcp_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        pubsub_admin_sender_metrics_t *metrics = pubsub_tcpTopicSender_metrics(sender);
+        celix_arrayList_add(result->senders, metrics);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        pubsub_admin_receiver_metrics_t *metrics = pubsub_tcpTopicReceiver_metrics(receiver);
+        celix_arrayList_add(result->receivers, metrics);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    return result;
+}
+
+#ifndef ANDROID
+static celix_status_t tcp_getIpAddress(const char* interface, char** ip) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+    struct ifaddrs *ifaddr, *ifa;
+    char host[NI_MAXHOST];
+
+    if (getifaddrs(&ifaddr) != -1)
+    {
+        for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+        {
+            if (ifa->ifa_addr == NULL)
+                continue;
+
+            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+                if (interface == NULL) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+                else if (strcmp(ifa->ifa_name, interface) == 0) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+            }
+        }
+
+        freeifaddrs(ifaddr);
+    }
+
+    return status;
+}
+#endif
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h
new file mode 100644
index 0000000..d21220d
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h
@@ -0,0 +1,54 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_TCP_ADMIN_H
+#define CELIX_PUBSUB_TCP_ADMIN_H
+
+#include <pubsub_admin_metrics.h>
+#include "celix_api.h"
+#include "log_helper.h"
+#include "pubsub_psa_tcp_constants.h"
+
+typedef struct pubsub_tcp_admin pubsub_tcp_admin_t;
+
+pubsub_tcp_admin_t* pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa);
+
+celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+
+celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_tcpAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_tcpAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_tcpAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+
+void pubsub_tcpAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
+celix_status_t pubsub_tcpAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
+
+pubsub_admin_metrics_t* pubsub_tcpAdmin_metrics(void *handle);
+
+#endif //CELIX_PUBSUB_TCP_ADMIN_H
+
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
new file mode 100644
index 0000000..4420e0f
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
@@ -0,0 +1,144 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <memory.h>
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+#include "pubsub_psa_tcp_constants.h"
+#include "pubsub_tcp_common.h"
+
+int psa_tcp_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
+    *msgTypeId = utils_stringHash(msgType);
+    return 0;
+}
+
+bool psa_tcp_checkVersion(version_pt msgVersion, const pubsub_tcp_msg_header_t *hdr) {
+    bool check=false;
+    int major=0,minor=0;
+
+    if (hdr->major == 0 && hdr->minor == 0) {
+        //no check
+        return true;
+    }
+
+    if (msgVersion!=NULL) {
+        version_getMajor(msgVersion,&major);
+        version_getMinor(msgVersion,&minor);
+        if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+            check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+        }
+    }
+
+    return check;
+}
+
+void psa_tcp_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter) {
+    for (int i = 0; i < 5; ++i) {
+        filter[i] = '\0';
+    }
+    if (scope != NULL && strnlen(scope, 3) >= 2)  {
+        filter[0] = scope[0];
+        filter[1] = scope[1];
+    }
+    if (topic != NULL && strnlen(topic, 3) >= 2)  {
+        filter[2] = topic[0];
+        filter[3] = topic[1];
+    }
+}
+
+#if NOT
+static int readInt(const unsigned char *data, int offset, uint32_t *val) {
+    *val = ((data[offset+0] << 24) | (data[offset+1] << 16) | (data[offset+2] << 8) | (data[offset+3] << 0));
+    return offset + 4;
+}
+
+static int readLong(const unsigned char *data, int offset, uint64_t *val) {
+    *val = (
+            ((int64_t)data[offset+0] << 56) |
+            ((int64_t)data[offset+1] << 48) |
+            ((int64_t)data[offset+2] << 40) |
+            ((int64_t)data[offset+3] << 32) |
+            ((int64_t)data[offset+4] << 24) |
+            ((int64_t)data[offset+5] << 16) |
+            ((int64_t)data[offset+6] << 8 ) |
+            ((int64_t)data[offset+7] << 0 )
+    );
+    return offset + 8;
+}
+
+
+static int writeInt(unsigned char *data, int offset, int32_t val) {
+    data[offset+0] = (unsigned char)((val >> 24) & 0xFF);
+    data[offset+1] = (unsigned char)((val >> 16) & 0xFF);
+    data[offset+2] = (unsigned char)((val >> 8 ) & 0xFF);
+    data[offset+3] = (unsigned char)((val >> 0 ) & 0xFF);
+    return offset + 4;
+}
+
+static int writeLong(unsigned char *data, int offset, int64_t val) {
+    data[offset+0] = (unsigned char)((val >> 56) & 0xFF);
+    data[offset+1] = (unsigned char)((val >> 48) & 0xFF);
+    data[offset+2] = (unsigned char)((val >> 40) & 0xFF);
+    data[offset+3] = (unsigned char)((val >> 32) & 0xFF);
+    data[offset+4] = (unsigned char)((val >> 24) & 0xFF);
+    data[offset+5] = (unsigned char)((val >> 16) & 0xFF);
+    data[offset+6] = (unsigned char)((val >> 8 ) & 0xFF);
+    data[offset+7] = (unsigned char)((val >> 0 ) & 0xFF);
+    return offset + 8;
+}
+#endif
+
+void psa_tcp_setupTcpContext(log_helper_t *logHelper, celix_thread_t *thread, const celix_properties_t *topicProperties) {
+  //NOTE. TCP will abort when performing a sched_setscheduler without permission.
+  //As result permission has to be checked first.
+  //TODO update this to use cap_get_pid and cap-get_flag instead of check user is root (note adds dep to -lcap)
+  bool gotPermission = false;
+  if (getuid() == 0) {
+    gotPermission = true;
+  }
+
+  long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
+  const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
+  if (sched != NULL) {
+    int policy = SCHED_OTHER;
+    if (strncmp("SCHED_OTHER", sched, 16) == 0) {
+      policy = SCHED_OTHER;
+    } else if (strncmp("SCHED_BATCH", sched, 16) == 0) {
+      policy = SCHED_BATCH;
+    } else if (strncmp("SCHED_IDLE", sched, 16) == 0) {
+      policy = SCHED_IDLE;
+    } else if (strncmp("SCHED_FIFO", sched, 16) == 0) {
+      policy = SCHED_FIFO;
+    } else if (strncmp("SCHED_RR", sched, 16) == 0) {
+      policy = SCHED_RR;
+    }
+    if (gotPermission) {
+      if (prio > 0 && prio < 100) {
+        struct sched_param sch;
+        bzero(&sch, sizeof(struct sched_param));
+        sch.sched_priority = prio;
+        pthread_setschedparam(thread->thread, policy, &sch);
+      } else {
+        logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, "Skipping configuration of thread prio to %i and thread scheduling to %s. No permission\n", (int) prio, sched);
+      }
+    }
+  }
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
new file mode 100644
index 0000000..4bf8eb5
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
@@ -0,0 +1,54 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_TCP_COMMON_H
+#define CELIX_PUBSUB_TCP_COMMON_H
+
+#include <utils.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <log_helper.h>
+#include "version.h"
+#include "pubsub_tcp_msg_header.h"
+
+
+typedef struct pubsub_tcp_endPointStore{
+  celix_thread_mutex_t mutex;
+  hash_map_t *map;
+} pubsub_tcp_endPointStore_t;
+
+/*
+ * NOTE tcp is used by first sending three frames:
+ * 1) A subscription filter.
+ * This is a 5 char string of the first two chars of scope and topic combined and terminated with a '\0'.
+ *
+ * 2) The pubsub_tcp_msg_header_t is send containg the type id and major/minor version
+ *
+ * 3) The actual payload
+ */
+
+
+int psa_tcp_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
+void psa_tcp_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter);
+bool psa_tcp_checkVersion(version_pt msgVersion, const pubsub_tcp_msg_header_t *hdr);
+void psa_tcp_setupTcpContext(log_helper_t *logHelper, celix_thread_t *thread, const celix_properties_t *topicProperties);
+
+#endif //CELIX_PUBSUB_TCP_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
new file mode 100644
index 0000000..87fac38
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -0,0 +1,743 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_tcp_socket_handler.c
+ *
+ *  \date       Mar 1, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <array_list.h>
+#include <pthread.h>
+#include <sys/epoll.h>
+#include <assert.h>
+#include "ctype.h"
+#include <netdb.h>
+#include <signal.h>
+#include <fcntl.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include "hash_map.h"
+#include "utils.h"
+#include "pubsub_tcp_handler.h"
+
+#define IP_HEADER_SIZE  20
+#define TCP_HEADER_SIZE 20
+#define MAX_EPOLL_EVENTS   64
+#define MAX_MSG_VECTOR_LEN 256
+
+#define L_DEBUG(...) \
+    logHelper_log(handle->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(handle->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(handle->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(handle->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_tcpHandler {
+  array_list_pt bufferLists;
+  unsigned int bufferIdx;
+  unsigned int readSeqNr;
+  unsigned int writeSeqNr;
+  unsigned int msgIdOffset;
+  bool bypassHeader;
+  celix_thread_rwlock_t dbLock;
+  unsigned int timeout;
+  hash_map_t *url_map;
+  hash_map_t *fd_map;
+  int efd;
+  int fd;
+  char *url;
+  pubsub_tcpHandler_connectMessage_callback_t connectMessageCallback;
+  pubsub_tcpHandler_disconnectMessage_callback_t disconnectMessageCallback;
+  void* connectPayload;
+  pubsub_tcpHandler_processMessage_callback_t processMessageCallback;
+  void *processMessagePayload;
+  log_helper_t *logHelper;
+};
+
+typedef struct pubsub_tcpBufferPartList {
+  pubsub_tcp_msg_header_t default_header;
+  unsigned int bufferSize;
+  char *buffer;
+} *pubsub_tcpBufferPartList_pt;
+
+
+typedef struct psa_tcp_connection_entry {
+  char *url;
+  int fd;
+  struct sockaddr_in addr;
+  socklen_t len;
+} psa_tcp_connection_entry_t;
+
+static inline int pubsub_tcpHandler_setInAddr(pubsub_tcpHandler_pt handle, const char *hostname, int port, struct sockaddr_in *inp);
+
+
+//
+// Create a handle
+//
+pubsub_tcpHandler_pt pubsub_tcpHandler_create(log_helper_t *logHelper) {
+  pubsub_tcpHandler_pt handle = calloc(sizeof(*handle), 1);
+  if (handle != NULL) {
+    handle->fd = -1;
+    handle->efd = epoll_create1(0);
+    handle->bufferIdx = 0;
+    handle->url_map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    handle->fd_map = hashMap_create(NULL, NULL, NULL, NULL);
+    handle->timeout = 500;
+    handle->logHelper = logHelper;
+    celixThreadRwlock_create(&handle->dbLock, 0);
+    signal(SIGPIPE, SIG_IGN);
+  }
+  return handle;
+}
+
+
+//
+// Destroys the handle
+//
+void pubsub_tcpHandler_destroy(pubsub_tcpHandler_pt handle) {
+  printf("### Destroying BufferHAndler TCP\n");
+  if (handle != NULL) {
+    pubsub_tcpHandler_close(handle);
+    celixThreadRwlock_writeLock(&handle->dbLock);
+    hash_map_iterator_t iter = hashMapIterator_construct(handle->url_map);
+    while (hashMapIterator_hasNext(&iter)) {
+      psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+      if (entry != NULL) {
+        pubsub_tcpHandler_closeConnection(handle, entry->url);
+      }
+    }
+    if (handle->efd >= 0) close(handle->efd);
+    if (handle->url) free(handle->url);
+    hashMap_destroy(handle->url_map, false, false);
+    hashMap_destroy(handle->fd_map, false, false);
+
+    if (handle->bufferLists != NULL) {
+      int listSize = arrayList_size(handle->bufferLists);
+      int i;
+      for (i = 0; i < listSize; i++) {
+        pubsub_tcpBufferPartList_pt item = arrayList_get(handle->bufferLists, i);
+        if (item) {
+          if (item->buffer) {
+            free(item->buffer);
+            item->buffer = NULL;
+          }
+          free(item);
+        }
+      }
+      arrayList_destroy(handle->bufferLists);
+    }
+    handle->bufferLists = NULL;
+    celixThreadRwlock_unlock(&handle->dbLock);
+    celixThreadRwlock_destroy(&handle->dbLock);
+    free(handle);
+  }
+}
+
+// Destroys the handle
+//
+int pubsub_tcpHandler_open(pubsub_tcpHandler_pt handle, char *url) {
+  int rc = 0;
+  celixThreadRwlock_readLock(&handle->dbLock);
+  int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+  if (rc >= 0) {
+    int setting = 1;
+    if (rc == 0) {
+      rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &setting, sizeof(setting));
+      if (rc != 0) {
+        close(fd);
+        L_ERROR("[TCP Socket] Error setsockopt(SO_REUSEADDR): %s\n", strerror(errno));
+      }
+    }
+    if (rc == 0) {
+      rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting));
+      if (rc != 0) {
+        close(fd);
+        L_ERROR("[TCP Socket] Error setsockopt(TCP_NODELAY): %s\n", strerror(errno));
+      }
+    } else {
+      L_ERROR("[TCP Socket] Error creating socket: %s\n", strerror(errno));
+    }
+    struct sockaddr_in addr; // connector's address information
+    pubsub_tcpHandler_url_t url_info;
+    pubsub_tcpHandler_setUrlInfo(url, &url_info);
+    rc = pubsub_tcpHandler_setInAddr(handle, url_info.hostname, url_info.portnr, &addr);
+    if (rc == 0) {
+      rc = bind(fd, (struct sockaddr *) &addr, sizeof(struct sockaddr));
+      if (rc != 0) {
+        close(fd);
+        L_ERROR("[TCP Socket] Error bind: %s\n", strerror(errno));
+      }
+    }
+    pubsub_tcpHandler_free_setUrlInfo(&url_info);
+  }
+  celixThreadRwlock_unlock(&handle->dbLock);
+  return (!rc) ? fd : rc;
+}
+
+
+// Destroys the handle
+//
+int pubsub_tcpHandler_close(pubsub_tcpHandler_pt handle) {
+  int rc = 0;
+  if (handle != NULL) {
+    celixThreadRwlock_writeLock(&handle->dbLock);
+    if ((handle->efd >= 0)&&(handle->fd >=0)) {
+      struct epoll_event event;
+      bzero(&event, sizeof(struct epoll_event)); // zero the struct
+      rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, handle->fd, &event);
+      if (rc < 0) {
+        L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
+      }
+    }
+    if (handle->url) {
+      free(handle->url);
+      handle->url = NULL;
+    }
+    if (handle->fd >= 0) {
+      close(handle->fd);
+      handle->fd = -1;
+    }
+    celixThreadRwlock_unlock(&handle->dbLock);
+  }
+  return rc;
+}
+
+int pubsub_tcpHandler_connect(pubsub_tcpHandler_pt handle, char *url) {
+  pubsub_tcpHandler_url_t url_info;
+  pubsub_tcpHandler_setUrlInfo(url, &url_info);
+  psa_tcp_connection_entry_t *entry = NULL;
+  int fd = pubsub_tcpHandler_open(handle, NULL);
+  celixThreadRwlock_writeLock(&handle->dbLock);
+  int rc = fd;
+  struct sockaddr_in addr; // connector's address information
+  if (rc >= 0) {
+    rc = pubsub_tcpHandler_setInAddr(handle, url_info.hostname, url_info.portnr, &addr);
+    if (rc < 0) {
+      L_ERROR("[TCP Socket] Cannot create url\n");
+      close(fd);
+    }
+  }
+  if (rc >= 0) {
+    rc = connect(fd, (struct sockaddr *) &addr, sizeof(struct sockaddr));
+    if (rc < 0) {
+      L_ERROR("[TCP Socket] Cannot connect to %s:%d\n", url_info.hostname, url_info.portnr);
+      close(fd);
+      fd = -1;
+    } else {
+      struct sockaddr_in sin;
+      socklen_t  len = sizeof(sin);
+      entry = calloc(1, sizeof(*entry));
+      entry->url = strndup(url, 1024 * 1024);
+      entry->fd = fd;
+      if (getsockname(fd,(struct sockaddr *) &sin, &len) < 0) {
+        L_ERROR("[TCP Socket] getsockname %s\n",strerror(errno));
+      } else if (handle->url == NULL){
+        char* address = inet_ntoa(sin.sin_addr);
+        unsigned int port = ntohs(sin.sin_port);
+        asprintf(&handle->url, "tcp://%s:%u", address, port);
+      }
+    }
+  }
+  if (rc >= 0) {
+    struct epoll_event event;
+    bzero(&event, sizeof(struct epoll_event)); // zero the struct
+    event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET | EPOLLOUT;
+    event.data.fd = entry->fd;
+    rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
+    if (rc < 0) {
+      close(entry->fd);
+      free(entry->url);
+      free(entry);
+      L_ERROR("[TCP Socket] Cannot create epoll\n");
+    }
+  }
+  if ((rc >= 0) && (entry)) {
+    if (handle->connectMessageCallback) handle->connectMessageCallback(handle->connectPayload, entry->url, false);
+    hashMap_put(handle->url_map, entry->url, entry);
+    hashMap_put(handle->fd_map, (void *) (intptr_t) entry->fd, entry);
+  }
+  pubsub_tcpHandler_free_setUrlInfo(&url_info);
+  celixThreadRwlock_unlock(&handle->dbLock);
+  return rc;
+}
+
+// Destroys the handle
+//
+int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_pt handle, char *url) {
+  int rc = 0;
+  if (handle != NULL) {
+    celixThreadRwlock_writeLock(&handle->dbLock);
+    psa_tcp_connection_entry_t *entry = hashMap_remove(handle->url_map, url);
+    hashMap_remove(handle->fd_map, (void *) (intptr_t) entry->fd);
+    if ((handle->efd >= 0)) {
+      struct epoll_event event;
+      bzero(&event, sizeof(struct epoll_event)); // zero the struct
+      rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
+      if (rc < 0) {
+        L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
+      }
+    }
+    if (entry->fd >= 0) {
+      if (handle->disconnectMessageCallback) handle->disconnectMessageCallback(handle->connectPayload, entry->url);
+      close(entry->fd);
+      free(entry->url);
+      entry->url = NULL;
+      free(entry);
+    }
+    celixThreadRwlock_unlock(&handle->dbLock);
+  }
+  return rc;
+}
+
+// Destroys the handle
+//
+int pubsub_tcpHandler_closeConnectionFd(pubsub_tcpHandler_pt handle, int fd) {
+  int rc = 0;
+  if (handle != NULL) {
+    bool use_handle_fd = false;
+    psa_tcp_connection_entry_t *entry = NULL;
+    celixThreadRwlock_readLock(&handle->dbLock);
+    if (fd != handle->fd) {
+      entry = hashMap_get(handle->fd_map, (void *) (intptr_t) fd);
+    } else {
+      use_handle_fd = true;
+    }
+    celixThreadRwlock_unlock(&handle->dbLock);
+    if (use_handle_fd) {
+      rc = pubsub_tcpHandler_close(handle);
+    } else {
+      rc = pubsub_tcpHandler_closeConnection(handle, entry->url);
+    }
+  }
+  return rc;
+}
+
+int pubsub_tcpHandler_listen(pubsub_tcpHandler_pt handle, char *url) {
+  handle->fd = pubsub_tcpHandler_open(handle, url);
+  handle->url = strndup(url, 1024*1024);
+  int rc = handle->fd;
+  celixThreadRwlock_writeLock(&handle->dbLock);
+  if (rc >= 0) {
+    rc = listen(handle->fd, SOMAXCONN);
+    if (rc != 0) {
+      L_ERROR("[TCP Socket] Error listen: %s\n", strerror(errno));
+      close(handle->fd);
+      handle->fd = -1;
+      free(handle->url);
+      handle->url = NULL;
+    }
+  }
+  if (rc >=0) {
+    int flags = fcntl(handle->fd, F_GETFL, 0);
+    if (flags == -1) {
+      rc = flags;
+    } else {
+      rc = fcntl(handle->fd, F_SETFL, flags | O_NONBLOCK);
+      if (rc < 0) {
+        L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll\n");
+        close(handle->fd);
+        handle->fd = -1;
+        free(handle->url);
+        handle->url = NULL;
+      }
+    }
+  }
+
+  if ((rc >= 0) && (handle->efd >= 0)) {
+    struct epoll_event event;
+    bzero(&event, sizeof(event)); // zero the struct
+    event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET | EPOLLOUT;
+    event.data.fd = handle->fd;
+    rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, handle->fd, &event);
+    if (rc < 0) {
+      L_ERROR("[TCP Socket] Cannot create epoll\n");
+    }
+  }
+  celixThreadRwlock_unlock(&handle->dbLock);
+  return  rc;
+}
+
+
+int pubsub_tcpHandler_setInAddr(pubsub_tcpHandler_pt handle, const char *hostname, int port, struct sockaddr_in *inp) {
+  struct hostent *hp;
+  bzero(inp, sizeof(struct sockaddr_in)); // zero the struct
+  if (hostname == 0 || hostname[0] == 0) {
+    inp->sin_addr.s_addr = INADDR_ANY;
+  } else {
+    if (!inet_aton(hostname, &inp->sin_addr)) {
+      hp = gethostbyname(hostname);
+      if (hp == NULL) {
+        L_ERROR("[TCP Socket] set_in_addr: Unknown host name %s\n", hostname);
+        return -1;
+      }
+      inp->sin_addr = *(struct in_addr *) hp->h_addr;
+    }
+  }
+  inp->sin_family = AF_INET;
+  inp->sin_port = htons(port);
+  return 0;
+}
+
+
+void pubsub_tcpHandler_setUrlInfo(char *url, pubsub_tcpHandler_url_t *url_info) {
+  if (url_info) {
+    url_info->url = NULL;
+    url_info->protocol = NULL;
+    url_info->hostname = NULL;
+    url_info->portnr = 0;
+  }
+
+  if (url && url_info) {
+    url_info->url = strdup(url);
+    url_info->protocol = strtok(strdup(url_info->url) , "://");
+    if (url_info->protocol) {
+      url = strstr(url, "://");
+      if (url) {
+        url += 3;
+      }
+    }
+    char* hostname  = strdup(url);
+    if (hostname) {
+      char* port = strstr(hostname, ":");
+      url_info->hostname = strtok(strdup(hostname),":");
+      if (port) {
+        port += 1;
+        if (isdigit(atoi(port)) == 0) url_info->portnr = atoi(port);
+      }
+      free(hostname);
+    }
+  }
+}
+
+void pubsub_tcpHandler_free_setUrlInfo(pubsub_tcpHandler_url_t *url_info) {
+  if (url_info->hostname) free(url_info->hostname);
+  if (url_info->protocol) free(url_info->protocol);
+  if (url_info->url) free(url_info->url);
+  url_info->hostname = NULL;
+  url_info->protocol = NULL;
+  url_info->portnr = 0;
+}
+
+
+int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_pt handle, unsigned int maxNofBuffers,
+                                                  unsigned int bufferSize) {
+  if (handle != NULL) {
+    int i = 0;
+    celixThreadRwlock_writeLock(&handle->dbLock);
+    if (arrayList_create(&handle->bufferLists) != CELIX_SUCCESS) {
+      return -1;
+    }
+    for (i = 0; i < maxNofBuffers; i++) {
+      pubsub_tcpBufferPartList_pt item = calloc(1, sizeof(struct pubsub_tcpBufferPartList));
+      item->buffer = calloc(sizeof(char), bufferSize);
+      item->bufferSize = bufferSize;
+      arrayList_add(handle->bufferLists, item);
+    }
+    celixThreadRwlock_unlock(&handle->dbLock);
+  }
+  return 0;
+}
+
+void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_pt handle, unsigned int timeout) {
+  if (handle != NULL) {
+    celixThreadRwlock_writeLock(&handle->dbLock);
+    handle->timeout = timeout;
+    celixThreadRwlock_unlock(&handle->dbLock);
+  }
+}
+
+
+
+//
+// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
+// If the message is completely reassembled true is returned and the index and size have valid values
+//
+int  pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_pt handle, int fd, unsigned int *index, unsigned int *size) {
+  celixThreadRwlock_writeLock(&handle->dbLock);
+  if (handle->bufferLists == NULL) {
+    return -1;
+  }
+  int listSize = arrayList_size(handle->bufferLists);
+  pubsub_tcpBufferPartList_pt item = arrayList_get(handle->bufferLists, handle->bufferIdx);
+  if (!handle->bypassHeader) {
+    // Only read the header, we don't know yet where to store the payload
+    int nbytes = recv(fd, item->buffer, sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int), MSG_PEEK);
+    if (nbytes < 0) {
+      L_ERROR("[TCP Socket] read error \n");
+      celixThreadRwlock_unlock(&handle->dbLock);
+      return nbytes;
+    }
+    unsigned int *pBuffer_size = ((unsigned int *) &item->buffer[sizeof(pubsub_tcp_msg_header_t)]);
+    unsigned int buffer_size = *pBuffer_size + sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int);
+    if (buffer_size > item->bufferSize) {
+      free(item->buffer);
+      item->buffer = calloc(buffer_size, sizeof(char));
+      item->bufferSize = buffer_size;
+    }
+  }
+  int nbytes = recv(fd, item->buffer, item->bufferSize, 0);
+  if (nbytes < 0) {
+    L_ERROR("[TCP Socket] read error \n");
+    celixThreadRwlock_unlock(&handle->dbLock);
+    return nbytes;
+  }
+  if (!handle->bypassHeader) {
+    nbytes-= sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int);
+  }
+
+  *index = handle->bufferIdx;
+  *size  = nbytes;
+  handle->bufferIdx++;
+  handle->bufferIdx = handle->bufferIdx % listSize;
+  celixThreadRwlock_unlock(&handle->dbLock);
+  return nbytes;
+}
+
+//
+// Read out the message which is indicated available by the largeUdp_dataAvailable function
+//
+int
+pubsub_tcpHandler_read(pubsub_tcpHandler_pt handle, unsigned int index, pubsub_tcp_msg_header_t **header,
+                             void **buffer, unsigned int size) {
+  int result = 0;
+  celixThreadRwlock_readLock(&handle->dbLock);
+  pubsub_tcpBufferPartList_pt item = arrayList_get(handle->bufferLists, index);
+  if (item) {
+    if (handle->bypassHeader) {
+      *header = &item->default_header;
+      *buffer = item->buffer;
+      item->default_header.type = (unsigned int) item->buffer[handle->msgIdOffset];
+      item->default_header.seqNr = handle->readSeqNr++;
+      item->default_header.sendTimeNanoseconds = 0;
+      item->default_header.sendTimeNanoseconds = 0;
+    } else
+    {
+      *header = (pubsub_tcp_msg_header_t *) item->buffer;
+      *buffer = &item->buffer[sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int)];
+    }
+  } else {
+    result = -1;
+  }
+  celixThreadRwlock_unlock(&handle->dbLock);
+  return result;
+}
+
+int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_pt handle, void* payload, pubsub_tcpHandler_processMessage_callback_t processMessageCallback){
+  int result = 0;
+  celixThreadRwlock_writeLock(&handle->dbLock);
+  handle->processMessageCallback = processMessageCallback;
+  handle->processMessagePayload = payload;
+  celixThreadRwlock_unlock(&handle->dbLock);
+  return result;
+}
+
+int pubsub_tcpHandler_addConnectionCallback(pubsub_tcpHandler_pt handle, void* payload,  pubsub_tcpHandler_connectMessage_callback_t connectMessageCallback, pubsub_tcpHandler_disconnectMessage_callback_t disconnectMessageCallback){
+  int result = 0;
+  celixThreadRwlock_writeLock(&handle->dbLock);
+  handle->connectMessageCallback = connectMessageCallback;
+  handle->disconnectMessageCallback = disconnectMessageCallback;
+  handle->connectPayload = payload;
+  celixThreadRwlock_unlock(&handle->dbLock);
+  return result;
+}
+
+
+//
+// Write large data to TCP. .
+//
+int pubsub_tcpHandler_write(pubsub_tcpHandler_pt handle, pubsub_tcp_msg_header_t *header, void *buffer,
+                                  unsigned int size, int flags) {
+  celixThreadRwlock_readLock(&handle->dbLock);
+  int result = 0;
+  int written = 0;
+
+  hash_map_iterator_t iter = hashMapIterator_construct(handle->fd_map);
+  while (hashMapIterator_hasNext(&iter)) {
+    psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+
+    // struct iovec *largeMsg_iovec, int len, ,
+    struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
+    struct msghdr msg;
+    msg.msg_name = &entry->addr;
+    msg.msg_namelen = entry->len;
+    msg.msg_flags = flags;
+    msg.msg_iov = msg_iovec;
+
+    msg.msg_control = NULL;
+    msg.msg_controllen = 0;
+    if (!handle->bypassHeader) {
+      msg.msg_iov[0].iov_base = header;
+      msg.msg_iov[0].iov_len = sizeof(*header);
+      msg.msg_iov[1].iov_base = &size;
+      msg.msg_iov[1].iov_len = sizeof(size);
+      msg.msg_iov[2].iov_base = buffer;
+      msg.msg_iov[2].iov_len = size;
+      msg.msg_iovlen = 3;
+    } else {
+      msg.msg_iov[0].iov_base = buffer;
+      msg.msg_iov[0].iov_len = size;
+      msg.msg_iovlen = 1;
+    }
+
+    int nbytes = 0;
+    if (entry->fd >=0) nbytes = sendmsg(entry->fd, &msg, 0);
+
+    //  Several errors are OK. When speculative write is being done we may not
+    //  be able to write a single byte from the socket. Also, SIGSTOP issued
+    //  by a debugging tool can result in EINTR error.
+    if (nbytes == -1
+        && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+      result = 0;
+      break;
+    }
+    if (nbytes == -1) {
+      L_ERROR("[TCP Socket] Cannot send msg %s\n", strerror(errno));
+      result = -1;
+      break;
+    }
+    written += nbytes;
+    handle->writeSeqNr++;
+  }
+  celixThreadRwlock_unlock(&handle->dbLock);
+  return (result == 0 ? written : result);
+}
+
+const char* pubsub_tcpHandler_url(pubsub_tcpHandler_pt handle) {
+  return handle->url;
+}
+
+int pubsub_tcpHandler_handler(pubsub_tcpHandler_pt handle) {
+  int rc = 0;
+  if (handle->efd >= 0) {
+    struct epoll_event events[MAX_EPOLL_EVENTS];
+    int nof_events = epoll_wait(handle->efd, events, MAX_EPOLL_EVENTS, handle->timeout);
+    if (nof_events < 0) {
+      L_ERROR("[TCP Socket] Cannot create epoll wait\n");
+      return nof_events;
+    }
+    int i = 0;
+    for (i = 0; i < nof_events; i++) {
+      if ((handle->fd >= 0) && (events[i].data.fd == handle->fd)) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        // new connection available
+        struct sockaddr_in their_addr;
+        socklen_t len = sizeof(struct sockaddr_in);
+        rc = accept(handle->fd, &their_addr, &len);
+        if (rc == -1) {
+          if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
+            // already closed
+          } else L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno));
+        } else {
+          // handle new connection:
+          // add it to reactor, etc
+          struct epoll_event event;
+          bzero(&event, sizeof(event)); // zero the struct
+
+          char* address = inet_ntoa(their_addr.sin_addr);
+          unsigned int port = ntohs(their_addr.sin_port);
+          char* url = NULL;
+          asprintf(&url, "tcp://%s:%u", address, port);
+          psa_tcp_connection_entry_t *entry = calloc(1, sizeof(*entry));
+          entry->addr = their_addr;
+          entry->len = len;
+          entry->url = url;
+          entry->fd = rc;
+          event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET | EPOLLOUT;
+          event.data.fd = entry->fd;
+          rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
+          if (rc < 0) {
+            close(entry->fd);
+            free(entry->url);
+            free(entry);
+            L_ERROR("[TCP Socket] Cannot create epoll\n");
+          } else {
+            if (handle->connectMessageCallback) handle->connectMessageCallback(handle->connectPayload, entry->url, true);
+            hashMap_put(handle->fd_map, (void *) (intptr_t) entry->fd, entry);
+            hashMap_put(handle->url_map, entry->url, entry);
+            fprintf(stdout, "[TCP Socket] New connection to url: %s: \n", url);
+          }
+        }
+        celixThreadRwlock_unlock(&handle->dbLock);
+      } else if (events[i].events & EPOLLIN) {
+        int err = 0;
+        socklen_t len = sizeof(int);
+        rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
+        if (rc != 0) {
+          L_ERROR("[TCP Socket]: ERROR read from socket \n");
+          continue;
+        }
+        unsigned int index = 0;
+        unsigned int size = 0;
+        //printf("pubsub_tcpHandler_handler: FD: %d read:\n", events[i].data.fd);
+        rc = pubsub_tcpHandler_dataAvailable(handle, events[i].data.fd, &index, &size);
+        if (rc == 0) {
+          pubsub_tcpHandler_closeConnectionFd(handle, events[i].data.fd);
+          continue;
+        } else if (rc < 0) {
+          continue;
+        }
+        // Handle data
+        pubsub_tcp_msg_header_t *msgHeader = NULL;
+        void *buffer = NULL;
+        rc = pubsub_tcpHandler_read(handle, index, &msgHeader, &buffer, size);
+        if (rc != 0) {
+          L_ERROR("[TCP Socket]: ERROR read with index %d\n", index);
+          continue;
+        }
+        celixThreadRwlock_readLock(&handle->dbLock);
+        if (handle->processMessageCallback) {
+          struct timespec receiveTime;
+          clock_gettime(CLOCK_REALTIME, &receiveTime);
+          handle->processMessageCallback(handle->processMessagePayload, msgHeader, buffer, size, &receiveTime);
+         }
+        celixThreadRwlock_unlock(&handle->dbLock);
+      } else if (events[i].events & EPOLLOUT)  {
+        int err = 0;
+        socklen_t len = sizeof(int);
+        rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
+        if (rc != 0) {
+          L_ERROR("[TCP Socket]: ERROR read from socket \n");
+          continue;
+        }
+      } else if (events[i].events & EPOLLRDHUP)  {
+        int err = 0;
+        socklen_t len = sizeof(int);
+        rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
+        if (rc != 0) {
+          L_ERROR("[TCP Socket]: ERROR read from socket \n");
+          continue;
+        }
+        pubsub_tcpHandler_closeConnectionFd(handle, events[i].data.fd);
+      } else if (events[i].events & EPOLLERR) {
+        L_ERROR("[TCP Socket]: ERROR read from socket \n");
+        continue;
+      }
+    }
+  }
+  return rc;
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
new file mode 100644
index 0000000..f366875
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -0,0 +1,74 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_tcp_buffer_handler.h
+ *
+ *  \date       Mar 1, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef _PUBSUB_TCP_BUFFER_HANDLER_H_
+#define _PUBSUB_TCP_BUFFER_HANDLER_H_
+#include <stdbool.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <log_helper.h>
+#include "celix_threads.h"
+#include "pubsub_tcp_msg_header.h"
+
+
+typedef struct pubsub_tcpHandler_url {
+  char *url;
+  char *protocol;
+  char *hostname;
+  unsigned int portnr;
+} pubsub_tcpHandler_url_t;
+
+typedef struct pubsub_tcpHandler  pubsub_tcpHandler_t;
+typedef struct pubsub_tcpHandler *pubsub_tcpHandler_pt;
+typedef void (*pubsub_tcpHandler_processMessage_callback_t)(void* payload, const pubsub_tcp_msg_header_t* header, const unsigned char * buffer, size_t size, struct timespec *receiveTime);
+typedef void (*pubsub_tcpHandler_connectMessage_callback_t)(void* payload, const char *url, bool lock);
+typedef void (*pubsub_tcpHandler_disconnectMessage_callback_t)(void* payload, const char *url);
+
+pubsub_tcpHandler_pt pubsub_tcpHandler_create(log_helper_t *logHelper);
+void pubsub_tcpHandler_destroy(pubsub_tcpHandler_pt handle);
+int pubsub_tcpHandler_open(pubsub_tcpHandler_pt handle, char* url);
+int pubsub_tcpHandler_close(pubsub_tcpHandler_pt handle);
+int pubsub_tcpHandler_connect(pubsub_tcpHandler_pt handle, char* url);
+int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_pt handle, char* url);
+int pubsub_tcpHandler_listen(pubsub_tcpHandler_pt handle, char* url);
+
+int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_pt handle, unsigned int maxNofBuffers, unsigned int bufferSize);
+void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_pt handle, unsigned int timeout);
+
+int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_pt handle, int fd, unsigned int *index, unsigned int *size);
+int pubsub_tcpHandler_read(pubsub_tcpHandler_pt handle, unsigned int index, pubsub_tcp_msg_header_t** header, void ** buffer, unsigned int size);
+int pubsub_tcpHandler_handler(pubsub_tcpHandler_pt handle);
+int pubsub_tcpHandler_write(pubsub_tcpHandler_pt handle, pubsub_tcp_msg_header_t* header, void* buffer, unsigned int size, int flags);
+int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_pt handle, void* payload, pubsub_tcpHandler_processMessage_callback_t processMessageCallback);
+int pubsub_tcpHandler_addConnectionCallback(pubsub_tcpHandler_pt handle, void* payload, pubsub_tcpHandler_connectMessage_callback_t connectMessageCallback, pubsub_tcpHandler_disconnectMessage_callback_t disconnectMessageCallback);
+
+const char* pubsub_tcpHandler_url(pubsub_tcpHandler_pt handle);
+void pubsub_tcpHandler_setUrlInfo(char *url, pubsub_tcpHandler_url_t *url_info);
+void pubsub_tcpHandler_free_setUrlInfo(pubsub_tcpHandler_url_t *url_info);
+
+#endif /* _PUBSUB_TCP_BUFFER_HANDLER_H_ */
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h
new file mode 100644
index 0000000..35c423c
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h
@@ -0,0 +1,34 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_PSA_TCP_MSG_HEADER_H_
+#define PUBSUB_PSA_TCP_MSG_HEADER_H_
+
+typedef struct pubsub_tcp_msg_header {
+  uint32_t type; //msg type id (hash of fqn)
+  uint32_t seqNr;
+  uint8_t major;
+  uint8_t minor;
+  unsigned char originUUID[16];
+  uint64_t sendtimeSeconds; //seconds since epoch
+  uint64_t sendTimeNanoseconds; //ns since epoch
+} pubsub_tcp_msg_header_t;
+typedef struct pubsub_tcp_msg_header  *pubsub_tcp_msg_header_pt;
+
+#endif
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
new file mode 100644
index 0000000..2055052
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -0,0 +1,747 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <sys/epoll.h>
+#include <assert.h>
+#include <pubsub_endpoint.h>
+#include <arpa/inet.h>
+#include <log_helper.h>
+#include <math.h>
+#include "pubsub_tcp_handler.h"
+#include "pubsub_tcp_topic_receiver.h"
+#include "pubsub_psa_tcp_constants.h"
+#include "pubsub_tcp_common.h"
+
+#include <uuid/uuid.h>
+#include <pubsub_admin_metrics.h>
+
+#define MAX_EPOLL_EVENTS     16
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN  37
+#endif
+
+
+#define L_DEBUG(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_tcp_topic_receiver {
+  celix_bundle_context_t *ctx;
+  log_helper_t *logHelper;
+  long serializerSvcId;
+  pubsub_serializer_service_t *serializer;
+  char *scope;
+  char *topic;
+  char scopeAndTopicFilter[5];
+  bool metricsEnabled;
+  pubsub_tcpHandler_pt socketHandler;
+  pubsub_tcpHandler_pt sharedSocketHandler;
+
+  struct {
+    celix_thread_t thread;
+    celix_thread_mutex_t mutex;
+    bool running;
+  } thread;
+
+  struct {
+    celix_thread_mutex_t mutex;
+    hash_map_t *map; //key = tcp url, value = psa_tcp_requested_connection_entry_t*
+    bool allConnected; //true if all requestedConnectection are connected
+  } requestedConnections;
+
+  long subscriberTrackerId;
+  struct {
+    celix_thread_mutex_t mutex;
+    hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t
+    bool allInitialized;
+  } subscribers;
+};
+
+typedef struct psa_tcp_requested_connection_entry {
+  pubsub_tcp_topic_receiver_t *parent;
+  char *key;
+  char *url;
+  int  fd;
+  bool connected;
+  bool statically; //true if the connection is statically configured through the topic properties.
+} psa_tcp_requested_connection_entry_t;
+
+typedef struct psa_tcp_subscriber_metrics_entry_t {
+  unsigned int msgTypeId;
+  uuid_t origin;
+
+  unsigned long nrOfMessagesReceived;
+  unsigned long nrOfSerializationErrors;
+  struct timespec lastMessageReceived;
+  double averageTimeBetweenMessagesInSeconds;
+  double averageSerializationTimeInSeconds;
+  double averageDelayInSeconds;
+  double maxDelayInSeconds;
+  double minDelayInSeconds;
+  unsigned int lastSeqNr;
+  unsigned long nrOfMissingSeqNumbers;
+} psa_tcp_subscriber_metrics_entry_t;
+
+typedef struct psa_tcp_subscriber_entry {
+  int usageCount;
+  hash_map_t *msgTypes; //map from serializer svc
+  hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
+  pubsub_subscriber_t *svc;
+  bool initialized; //true if the init function is called through the receive thread
+} psa_tcp_subscriber_entry_t;
+
+
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+                                                  const celix_bundle_t *owner);
+
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
+                                                     const celix_bundle_t *owner);
+
+static void *psa_tcp_recvThread(void *data);
+
+static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
+
+static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
+
+static void processMsg(void* handle, const pubsub_tcp_msg_header_t *hdr, const unsigned char *payload, size_t payloadSize, struct timespec *receiveTime);
+static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
+static void psa_tcp_disConnectHandler(void *handle, const char *url);
+
+
+
+pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
+                                                            log_helper_t *logHelper,
+                                                            const char *scope,
+                                                            const char *topic,
+                                                            const celix_properties_t *topicProperties,
+                                                            pubsub_tcp_endPointStore_t* endPointStore,
+                                                            long serializerSvcId,
+                                                            pubsub_serializer_service_t *serializer) {
+  pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+  receiver->ctx = ctx;
+  receiver->logHelper = logHelper;
+  receiver->serializerSvcId = serializerSvcId;
+  receiver->serializer = serializer;
+  receiver->scope = strndup(scope, 1024 * 1024);
+  receiver->topic = strndup(topic, 1024 * 1024);
+
+  long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS, PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
+  long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE, PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
+  long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+  const char *staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
+
+  /* Check if it's a static endpoint */
+  bool isEndPointTypeClient = false;
+  bool isEndPointTypeServer = false;
+  const char *endPointType  = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
+  if (endPointType != NULL) {
+    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) ==0) {
+      isEndPointTypeClient = true;
+    }
+    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) ==0) {
+      isEndPointTypeServer = true;
+    }
+  }
+  // When endpoint is server, use the bind url as a key.
+  const char *staticBindUrl = ((topicProperties != NULL) && isEndPointTypeServer) ? celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL) : NULL;
+  /* When it's an endpoint share the socket with the receiver */
+  if (staticBindUrl != NULL  || (isEndPointTypeClient && staticConnectUrls != NULL)) {
+    celixThreadMutex_lock(&receiver->thread.mutex);
+    pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, (isEndPointTypeServer) ? staticBindUrl : staticConnectUrls);
+    if(entry != NULL) {
+      receiver->socketHandler = entry;
+      receiver->sharedSocketHandler = entry;
+    } else {
+      L_ERROR("[PSA_TCP] Cannot find static Endpoint URL for %s/%s", scope, topic);
+    }
+    celixThreadMutex_unlock(&receiver->thread.mutex);
+  }
+
+  if (receiver->socketHandler == NULL) {
+    receiver->socketHandler = pubsub_tcpHandler_create(receiver->logHelper);
+  }
+
+  if (receiver->socketHandler != NULL) {
+    pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int) sessions, (unsigned int) buffer_size);
+    pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
+    pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
+    pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,psa_tcp_disConnectHandler);
+  }
+
+  psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
+  receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
+                                                                        PSA_TCP_DEFAULT_METRICS_ENABLED);
+
+  celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+  celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+  celixThreadMutex_create(&receiver->thread.mutex, NULL);
+
+  receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+  receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+  receiver->requestedConnections.allConnected = false;
+
+  if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (staticBindUrl == NULL)) {
+    char *urlsCopy = strndup(staticConnectUrls, 1024*1024);
+    char* url;
+    char* save = urlsCopy;
+    while ((url = strtok_r(save, " ", &save))) {
+      psa_tcp_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+      entry->statically = true;
+      entry->connected = false;
+      entry->url = strndup(url, 1024*1024);
+      entry->parent = receiver;
+      hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+    }
+    free(urlsCopy);
+
+    // Configure Receiver thread
+    receiver->thread.running = true;
+    celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver);
+    char name[64];
+    snprintf(name, 64, "TCP TR %s/%s", scope, topic);
+    celixThread_setName(&receiver->thread.thread, name);
+    psa_tcp_setupTcpContext(receiver->logHelper, &receiver->thread.thread, topicProperties);
+  }
+
+  //track subscribers
+  if (receiver->socketHandler != NULL) {
+    int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+    char buf[size+1];
+    snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.ignoreServiceLanguage = true;
+    opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+    opts.filter.filter = buf;
+    opts.callbackHandle = receiver;
+    opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber;
+    opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber;
+    receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+  }
+
+  if (receiver->socketHandler == NULL) {
+    free(receiver->scope);
+    free(receiver->topic);
+    free(receiver);
+    receiver = NULL;
+    L_ERROR("[PSA_TCP] Cannot create TopicReceiver for %s/%s", scope, topic);
+  }
+  return receiver;
+}
+
+void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
+  if (receiver != NULL) {
+
+
+    celixThreadMutex_lock(&receiver->thread.mutex);
+    if (!receiver->thread.running) {
+      receiver->thread.running = false;
+      celixThreadMutex_unlock(&receiver->thread.mutex);
+      celixThread_join(receiver->thread.thread, NULL);
+    }
+
+    celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+      psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+      if (entry != NULL) {
+        receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+        free(entry);
+      }
+
+      hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+      while (hashMapIterator_hasNext(&iter2)) {
+        hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+        hashMap_destroy(origins, true, true);
+      }
+      hashMap_destroy(entry->metrics, false, false);
+    }
+    hashMap_destroy(receiver->subscribers.map, false, false);
+
+
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    iter = hashMapIterator_construct(receiver->requestedConnections.map);
+    while (hashMapIterator_hasNext(&iter)) {
+      psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+      if (entry != NULL) {
+        free(entry->url);
+        free(entry);
+      }
+    }
+    hashMap_destroy(receiver->requestedConnections.map, false, false);
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    celixThreadMutex_destroy(&receiver->subscribers.mutex);
+    celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+    celixThreadMutex_destroy(&receiver->thread.mutex);
+
+    pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, NULL);
+    pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, NULL, NULL, NULL);
+    if ((receiver->socketHandler)&&(receiver->sharedSocketHandler == NULL)) {
+      pubsub_tcpHandler_destroy(receiver->socketHandler);
+      receiver->socketHandler = NULL;
+    }
+
+    free(receiver->scope);
+    free(receiver->topic);
+  }
+  free(receiver);
+}
+
+const char *pubsub_tcpTopicReceiver_scope(pubsub_tcp_topic_receiver_t *receiver) {
+  return receiver->scope;
+}
+
+const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver) {
+  return receiver->topic;
+}
+
+long pubsub_tcpTopicReceiver_serializerSvcId(pubsub_tcp_topic_receiver_t *receiver) {
+  return receiver->serializerSvcId;
+}
+
+void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
+                                             celix_array_list_t *unconnectedUrls) {
+  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+  hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+  while (hashMapIterator_hasNext(&iter)) {
+    psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+    char *url = NULL;
+    asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
+    if (entry->connected) {
+      celix_arrayList_add(connectedUrls, url);
+    } else {
+      celix_arrayList_add(unconnectedUrls, url);
+    }
+  }
+  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+
+
+void pubsub_tcpTopicReceiver_connectTo(
+        pubsub_tcp_topic_receiver_t *receiver,
+        const char *url) {
+  L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", receiver->scope, receiver->topic, url);
+
+  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+  psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
+  if (entry == NULL) {
+    entry = calloc(1, sizeof(*entry));
+    entry->url = strndup(url, 1024*1024);
+    entry->connected = false;
+    entry->statically = false;
+    entry->parent = receiver;
+    hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
+    receiver->requestedConnections.allConnected = false;
+  }
+  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+  psa_tcp_connectToAllRequestedConnections(receiver);
+}
+
+void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url) {
+  L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", receiver->scope, receiver->topic, url);
+
+  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+  psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
+  if (entry != NULL) {
+    int rc = pubsub_tcpHandler_closeConnection(receiver->socketHandler, entry->url);
+    if (rc < 0) L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. (%s)", url, strerror(errno));
+  }
+  if (entry != NULL) {
+    free(entry->url);
+    free(entry);
+  }
+  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+                                                  const celix_bundle_t *bnd) {
+  pubsub_tcp_topic_receiver_t *receiver = handle;
+
+  long bndId = celix_bundle_getId(bnd);
+  const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+  if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+    //not the same scope. ignore
+    return;
+  }
+
+  celixThreadMutex_lock(&receiver->subscribers.mutex);
+  psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
+  if (entry != NULL) {
+    entry->usageCount += 1;
+  } else {
+    //new create entry
+    entry = calloc(1, sizeof(*entry));
+    entry->usageCount = 1;
+    entry->svc = svc;
+    entry->initialized = false;
+    receiver->subscribers.allInitialized = false;
+
+    int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
+                                                       &entry->msgTypes);
+
+    if (rc == 0) {
+      entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
+      hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
+      while (hashMapIterator_hasNext(&iter)) {
+        pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
+        hash_map_t *origins = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+        hashMap_put(entry->metrics, (void *) (uintptr_t) msgSer->msgId, origins);
+      }
+    }
+
+    if (rc == 0) {
+      hashMap_put(receiver->subscribers.map, (void *) bndId, entry);
+    } else {
+      L_ERROR("[PSA_TCP] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+      free(entry);
+    }
+  }
+  celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
+                                                     const celix_bundle_t *bnd) {
+  pubsub_tcp_topic_receiver_t *receiver = handle;
+
+  long bndId = celix_bundle_getId(bnd);
+
+  celixThreadMutex_lock(&receiver->subscribers.mutex);
+  psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
+  if (entry != NULL) {
+    entry->usageCount -= 1;
+  }
+  if (entry != NULL && entry->usageCount <= 0) {
+    //remove entry
+    hashMap_remove(receiver->subscribers.map, (void *) bndId);
+    int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+    if (rc != 0) {
+      L_ERROR("[PSA_TCP] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+    }
+    hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics);
+    while (hashMapIterator_hasNext(&iter)) {
+      hash_map_t *origins = hashMapIterator_nextValue(&iter);
+      hashMap_destroy(origins, true, true);
+    }
+    hashMap_destroy(entry->metrics, false, false);
+    free(entry);
+  }
+  celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static inline void
+processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
+                             const pubsub_tcp_msg_header_t *hdr, const unsigned char *payload, size_t payloadSize,
+                             struct timespec *receiveTime) {
+  //NOTE receiver->subscribers.mutex locked
+  pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) (hdr->type));
+  pubsub_subscriber_t *svc = entry->svc;
+  bool monitor = receiver->metricsEnabled;
+
+  //monitoring
+  struct timespec beginSer;
+  struct timespec endSer;
+  int updateReceiveCount = 0;
+  int updateSerError = 0;
+
+  if (msgSer != NULL) {
+    void *deserializedMsg = NULL;
+    bool validVersion = psa_tcp_checkVersion(msgSer->msgVersion, hdr);
+    if (validVersion) {
+      if (monitor) {
+        clock_gettime(CLOCK_REALTIME, &beginSer);
+      }
+      celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg);
+      if (monitor) {
+        clock_gettime(CLOCK_REALTIME, &endSer);
+      }
+      if (status == CELIX_SUCCESS) {
+        bool release = true;
+        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
+        if (release) {
+          msgSer->freeMsg(msgSer->handle, deserializedMsg);
+        }
+        updateReceiveCount += 1;
+      } else {
+        updateSerError += 1;
+        L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope,
+               receiver->topic);
+      }
+    }
+  } else {
+    L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X", hdr->type);
+  }
+
+  if (msgSer != NULL && monitor) {
+    hash_map_t *origins = hashMap_get(entry->metrics, (void *) (uintptr_t) hdr->type);
+    char uuidStr[UUID_STR_LEN + 1];
+    uuid_unparse(hdr->originUUID, uuidStr);
+    psa_tcp_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
+
+    if (metrics == NULL) {
+      metrics = calloc(1, sizeof(*metrics));
+      hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN + 1), metrics);
+      uuid_copy(metrics->origin, hdr->originUUID);
+      metrics->msgTypeId = hdr->type;
+      metrics->maxDelayInSeconds = -INFINITY;
+      metrics->minDelayInSeconds = INFINITY;
+      metrics->lastSeqNr = 0;
+    }
+
+    double diff = celix_difftime(&beginSer, &endSer);
+    long n = metrics->nrOfMessagesReceived;
+    metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n + 1);
+
+    diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
+    n = metrics->nrOfMessagesReceived;
+    if (metrics->nrOfMessagesReceived >= 1) {
+      metrics->averageTimeBetweenMessagesInSeconds =
+        (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
+    }
+    metrics->lastMessageReceived = *receiveTime;
+
+
+    int incr = hdr->seqNr - metrics->lastSeqNr;
+    if (metrics->lastSeqNr > 0 && incr > 1) {
+      metrics->nrOfMissingSeqNumbers += (incr - 1);
+      L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
+    }
+    metrics->lastSeqNr = hdr->seqNr;
+
+    struct timespec sendTime;
+    sendTime.tv_sec = (time_t) hdr->sendtimeSeconds;
+    sendTime.tv_nsec = (long) hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
+    diff = celix_difftime(&sendTime, receiveTime);
+    metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n + 1);
+    if (diff < metrics->minDelayInSeconds) {
+      metrics->minDelayInSeconds = diff;
+    }
+    if (diff > metrics->maxDelayInSeconds) {
+      metrics->maxDelayInSeconds = diff;
+    }
+
+    metrics->nrOfMessagesReceived += updateReceiveCount;
+    metrics->nrOfSerializationErrors += updateSerError;
+  }
+}
+static void
+processMsg(void* handle, const pubsub_tcp_msg_header_t *hdr, const unsigned char *payload, size_t payloadSize, struct timespec *receiveTime) {
+  pubsub_tcp_topic_receiver_t *receiver = handle;
+  celixThreadMutex_lock(&receiver->subscribers.mutex);
+  hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+  while (hashMapIterator_hasNext(&iter)) {
+    psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+    if (entry != NULL) {
+      processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize, receiveTime);
+    }
+  }
+  celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+
+
+static void *psa_tcp_recvThread(void *data) {
+  pubsub_tcp_topic_receiver_t *receiver = data;
+
+  celixThreadMutex_lock(&receiver->thread.mutex);
+  bool running = receiver->thread.running;
+  celixThreadMutex_unlock(&receiver->thread.mutex);
+
+  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+  bool allConnected = receiver->requestedConnections.allConnected;
+  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+  celixThreadMutex_lock(&receiver->subscribers.mutex);
+  bool allInitialized = receiver->subscribers.allInitialized;
+  celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+  while (running) {
+    if (!allConnected) {
+      psa_tcp_connectToAllRequestedConnections(receiver);
+    }
+    if (!allInitialized) {
+      psa_tcp_initializeAllSubscribers(receiver);
+    }
+    pubsub_tcpHandler_handler(receiver->socketHandler);
+
+    celixThreadMutex_lock(&receiver->thread.mutex);
+    running = receiver->thread.running;
+    celixThreadMutex_unlock(&receiver->thread.mutex);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    allConnected = receiver->requestedConnections.allConnected;
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    allInitialized = receiver->subscribers.allInitialized;
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+  } // while
+  return NULL;
+}
+
+pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
+  pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
+  snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
+  snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
+
+  int msgTypesCount = 0;
+  celixThreadMutex_lock(&receiver->subscribers.mutex);
+  hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+  while (hashMapIterator_hasNext(&iter)) {
+    psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+    while (hashMapIterator_hasNext(&iter2)) {
+      hashMapIterator_nextValue(&iter2);
+      msgTypesCount += 1;
+    }
+  }
+
+  result->nrOfMsgTypes = (unsigned long) msgTypesCount;
+  result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
+  int i = 0;
+  iter = hashMapIterator_construct(receiver->subscribers.map);
+  while (hashMapIterator_hasNext(&iter)) {
+    psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+    while (hashMapIterator_hasNext(&iter2)) {
+      hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+      result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
+      result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
+      int k = 0;
+      hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
+      while (hashMapIterator_hasNext(&iter3)) {
+        psa_tcp_subscriber_metrics_entry_t *metrics = hashMapIterator_nextValue(&iter3);
+        result->msgTypes[i].typeId = metrics->msgTypeId;
+        pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) metrics->msgTypeId);
+        if (msgSer) {
+          snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
+          uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin);
+          result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived;
+          result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors;
+          result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
+          result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
+          result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
+          result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+          result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
+          result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
+          result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
+
+          k += 1;
+        } else {
+          L_WARN("[PSA_TCP]: Error cannot find key 0x%X in msg map during metrics collection!\n", metrics->msgTypeId);
+        }
+      }
+      i += 1;
+    }
+  }
+  celixThreadMutex_unlock(&receiver->subscribers.mutex);
+  return result;
+}
+
+
+static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver) {
+  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+  if (!receiver->requestedConnections.allConnected) {
+    bool allConnected = true;
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+    while (hashMapIterator_hasNext(&iter)) {
+      psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+      if (!entry->connected){
+        entry->fd = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
+        if (entry->fd < 0) {
+          L_WARN("[PSA_TCP] Error connecting to tcp url %s\n", entry->url);
+          allConnected = false;
+        }
+      }
+    }
+    receiver->requestedConnections.allConnected = allConnected;
+  }
+  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_tcp_connectHandler(void *handle, const char *url, bool lock) {
+  pubsub_tcp_topic_receiver_t *receiver = handle;
+  L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", receiver->scope, receiver->topic, url);
+  if (lock) celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+  psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
+  if (entry == NULL) {
+    entry = calloc(1, sizeof(*entry));
+    entry->parent = receiver;
+    entry->url = strndup(url, 1024*1024);
+    entry->statically = true;
+    hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
+    receiver->requestedConnections.allConnected = false;
+  }
+  entry->connected = true;
+  if (lock) celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_tcp_disConnectHandler(void *handle, const char *url) {
+  pubsub_tcp_topic_receiver_t *receiver = handle;
+  L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", receiver->scope, receiver->topic, url);
+  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+  psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
+  if (entry != NULL) {
+    free(entry->url);
+    free(entry);
+  }
+  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+
+static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver) {
+  celixThreadMutex_lock(&receiver->subscribers.mutex);
+  if (!receiver->subscribers.allInitialized) {
+    bool allInitialized = true;
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+      psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+      if (!entry->initialized) {
+        int rc = 0;
+        if (entry->svc != NULL && entry->svc->init != NULL) {
+          rc = entry->svc->init(entry->svc->handle);
+        }
+        if (rc == 0) {
+          entry->initialized = true;
+        } else {
+          L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+          allInitialized = false;
+        }
+      }
+    }
+    receiver->subscribers.allInitialized = allInitialized;
+  }
+  celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
new file mode 100644
index 0000000..e9fb8a2
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
@@ -0,0 +1,51 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
+#define CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
+
+#include <pubsub_admin_metrics.h>
+#include "celix_bundle_context.h"
+#include "pubsub_tcp_common.h"
+
+typedef struct pubsub_tcp_topic_receiver pubsub_tcp_topic_receiver_t;
+
+pubsub_tcp_topic_receiver_t* pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
+                                                            log_helper_t *logHelper,
+                                                            const char *scope,
+                                                            const char *topic,
+                                                            const celix_properties_t *topicProperties,
+                                                            pubsub_tcp_endPointStore_t* endPointStore,
+                                                            long serializerSvcId,
+                                                            pubsub_serializer_service_t *serializer);
+void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver);
+
+const char* pubsub_tcpTopicReceiver_scope(pubsub_tcp_topic_receiver_t *receiver);
+const char* pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver);
+
+long pubsub_tcpTopicReceiver_serializerSvcId(pubsub_tcp_topic_receiver_t *receiver);
+void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls);
+
+void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
+void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
+
+
+pubsub_admin_receiver_metrics_t* pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver);
+
+
+#endif //CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
new file mode 100644
index 0000000..3cb9844
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -0,0 +1,592 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <zconf.h>
+#include <arpa/inet.h>
+#include <log_helper.h>
+#include "pubsub_tcp_topic_sender.h"
+#include "pubsub_tcp_handler.h"
+#include "pubsub_psa_tcp_constants.h"
+#include "pubsub_tcp_common.h"
+#include "pubsub_endpoint.h"
+#include <uuid/uuid.h>
+#include <constants.h>
+
+#define FIRST_SEND_DELAY_IN_SECONDS             2
+#define TCP_BIND_MAX_RETRY                      10
+
+#define L_DEBUG(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_tcp_topic_sender {
+  celix_bundle_context_t *ctx;
+  log_helper_t *logHelper;
+  long serializerSvcId;
+  pubsub_serializer_service_t *serializer;
+  uuid_t fwUUID;
+  bool metricsEnabled;
+  pubsub_tcpHandler_pt socketHandler;
+  pubsub_tcpHandler_pt sharedSocketHandler;
+
+  char *scope;
+  char *topic;
+  char scopeAndTopicFilter[5];
+  char *url;
+  bool isStatic;
+
+  struct {
+    celix_thread_mutex_t mutex;
+  } tcp;
+
+  struct {
+    celix_thread_t thread;
+    celix_thread_mutex_t mutex;
+    bool running;
+  } thread;
+
+  struct {
+    long svcId;
+    celix_service_factory_t factory;
+  } publisher;
+
+  struct {
+    celix_thread_mutex_t mutex;
+    hash_map_t *map;  //key = bndId, value = psa_tcp_bounded_service_entry_t
+  } boundedServices;
+};
+
+typedef struct psa_tcp_send_msg_entry {
+  pubsub_tcp_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
+  pubsub_msg_serializer_t *msgSer;
+  celix_thread_mutex_t sendLock; //protects send & Seqnr
+  int seqNr;
+  struct {
+    celix_thread_mutex_t mutex; //protects entries in struct
+    long nrOfMessagesSend;
+    long nrOfMessagesSendFailed;
+    long nrOfSerializationErrors;
+    struct timespec lastMessageSend;
+    double averageTimeBetweenMessagesInSeconds;
+    double averageSerializationTimeInSeconds;
+  } metrics;
+} psa_tcp_send_msg_entry_t;
+
+typedef struct psa_tcp_bounded_service_entry {
+  pubsub_tcp_topic_sender_t *parent;
+  pubsub_publisher_t service;
+  long bndId;
+  hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+  hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t
+  int getCount;
+} psa_tcp_bounded_service_entry_t;
+
+static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                         const celix_properties_t *svcProperties);
+
+static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                          const celix_properties_t *svcProperties);
+
+static unsigned int rand_range(unsigned int min, unsigned int max);
+
+static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender);
+
+static void *psa_tcp_sendThread(void *data);
+
+static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg);
+
+pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
+        celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        const celix_properties_t *topicProperties,
+        pubsub_tcp_endPointStore_t* endPointStore,
+        long serializerSvcId,
+        pubsub_serializer_service_t *ser,
+        const char *bindIP,
+        const char *staticBindUrl,
+        unsigned int basePort,
+        unsigned int maxPort) {
+  pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
+  sender->ctx = ctx;
+  sender->logHelper = logHelper;
+  sender->serializerSvcId = serializerSvcId;
+  sender->serializer = ser;
+  sender->socketHandler = pubsub_tcpHandler_create(sender->logHelper);
+  psa_tcp_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+  const char *uuid = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+  if (uuid != NULL) {
+    uuid_parse(uuid, sender->fwUUID);
+  }
+  sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
+
+  /* Check if it's a static endpoint */
+  bool isEndPointTypeClient = false;
+  bool isEndPointTypeServer = false;
+  const char *endPointType  = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
+  if (endPointType != NULL) {
+    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) ==0) {
+      isEndPointTypeClient = true;
+    }
+    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) ==0) {
+      isEndPointTypeServer = true;
+    }
+  }
+
+  // When endpoint is client, use the connection urls as a key.
+  const char *staticConnectUrls = ((topicProperties != NULL) && isEndPointTypeClient) ? celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL) : NULL;
+
+  /* When it's an endpoint share the socket with the receiver */
+  if (staticConnectUrls != NULL  || (isEndPointTypeServer && staticBindUrl != NULL)) {
+    celixThreadMutex_lock(&endPointStore->mutex);
+    sender->sharedSocketHandler = sender->socketHandler;
+    pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, staticConnectUrls);
+    if (entry == NULL) {
+      entry = sender->socketHandler;
+      hashMap_put(endPointStore->map, (void *)(isEndPointTypeClient ? staticConnectUrls : staticBindUrl), entry);
+    }
+    celixThreadMutex_unlock(&endPointStore->mutex);
+  }
+
+  //setting up tcp socket for TCP TopicSender
+  {
+    if (staticConnectUrls != NULL) {
+      // Store url for client static endpoint
+      sender->url = strndup(staticConnectUrls, 1024 * 1024);
+      sender->isStatic = true;
+    }
+    else if (staticBindUrl != NULL) {
+      int rv = pubsub_tcpHandler_listen(sender->socketHandler, (char *) staticBindUrl);
+      if (rv == -1) {
+        L_WARN("Error for tcp_bind using static bind url '%s'. %s", staticBindUrl, strerror(errno));
+      } else {
+        sender->url = strndup(staticBindUrl, 1024 * 1024);
+        sender->isStatic = true;
+      }
+    } else {
+      int retry = 0;
+      while (sender->url == NULL && retry < TCP_BIND_MAX_RETRY) {
+        /* Randomized part due to same bundle publishing on different topics */
+        unsigned int port = rand_range(basePort, maxPort);
+        char *url = NULL;
+        asprintf(&url, "tcp://%s:%u", bindIP, port);
+        char *bindUrl = NULL;
+        asprintf(&bindUrl, "tcp://0.0.0.0:%u", port);
+        int rv = pubsub_tcpHandler_listen(sender->socketHandler, bindUrl);
+        if (rv == -1) {
+          L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", bindUrl, strerror(errno));
+          free(url);
+        } else {
+          sender->url = url;
+        }
+        retry++;
+        free(bindUrl);
+      }
+    }
+  }
+
+  if (sender->url != NULL) {
+    sender->scope = strndup(scope, 1024 * 1024);
+    sender->topic = strndup(topic, 1024 * 1024);
+
+    celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+    celixThreadMutex_create(&sender->tcp.mutex, NULL);
+    celixThreadMutex_create(&sender->thread.mutex, NULL);
+    sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+  }
+
+  if (sender->socketHandler != NULL) {
+    sender->thread.running = true;
+    celixThread_create(&sender->thread.thread, NULL, psa_tcp_sendThread, sender);
+    char name[64];
+    snprintf(name, 64, "TCP TS %s/%s", scope, topic);
+    celixThread_setName(&sender->thread.thread, name);
+    psa_tcp_setupTcpContext(sender->logHelper, &sender->thread.thread, topicProperties);
+  }
+
+
+  //register publisher services using a service factory
+  if (sender->url != NULL) {
+    sender->publisher.factory.handle = sender;
+    sender->publisher.factory.getService = psa_tcp_getPublisherService;
+    sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService;
+
+    celix_properties_t *props = celix_properties_create();
+    celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+    celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+
+    celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+    opts.factory = &sender->publisher.factory;
+    opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+    opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
+    opts.properties = props;
+
+    sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+  }
+
+  if (sender->url == NULL) {
+    free(sender);
+    sender = NULL;
+  }
+
+  return sender;
+}
+
+void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
+  if (sender != NULL) {
+    celixThreadMutex_lock(&sender->thread.mutex);
+    if (!sender->thread.running) {
+      sender->thread.running = false;
+      celixThreadMutex_unlock(&sender->thread.mutex);
+      celixThread_join(sender->thread.thread, NULL);
+    }
+    celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
+    while (hashMapIterator_hasNext(&iter)) {
+      psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+      if (entry != NULL) {
+        sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter2)) {
+          psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
+          celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+          free(msgEntry);
+        }
+        hashMap_destroy(entry->msgEntries, false, false);
+        free(entry);
+      }
+    }
+    hashMap_destroy(sender->boundedServices.map, false, false);
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    celixThreadMutex_destroy(&sender->boundedServices.mutex);
+    celixThreadMutex_destroy(&sender->tcp.mutex);
+
+    if ((sender->socketHandler)&&(sender->sharedSocketHandler == NULL)) {
+      pubsub_tcpHandler_destroy(sender->socketHandler);
+      sender->socketHandler = NULL;
+    }
+
+    free(sender->scope);
+    free(sender->topic);
+    free(sender->url);
+    free(sender);
+  }
+}
+
+long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender) {
+  return sender->serializerSvcId;
+}
+
+const char *pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender) {
+  return sender->scope;
+}
+
+const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
+  return sender->topic;
+}
+
+const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
+  return pubsub_tcpHandler_url(sender->socketHandler);
+}
+
+bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
+  return sender->isStatic;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+  //TODO subscriber count -> topic info
+}
+
+void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+  //TODO
+}
+
+static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                         const celix_properties_t *svcProperties __attribute__((unused))) {
+  pubsub_tcp_topic_sender_t *sender = handle;
+  long bndId = celix_bundle_getId(requestingBundle);
+
+  celixThreadMutex_lock(&sender->boundedServices.mutex);
+  psa_tcp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId);
+  if (entry != NULL) {
+    entry->getCount += 1;
+  } else {
+    entry = calloc(1, sizeof(*entry));
+    entry->getCount = 1;
+    entry->parent = sender;
+    entry->bndId = bndId;
+    entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+
+    int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t *) requestingBundle,
+                                                     &entry->msgTypes);
+    if (rc == 0) {
+      hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
+      while (hashMapIterator_hasNext(&iter)) {
+        hash_map_entry_t *hashMapEntry = hashMapIterator_nextEntry(&iter);
+        void *key = hashMapEntry_getKey(hashMapEntry);
+        psa_tcp_send_msg_entry_t *sendEntry = calloc(1, sizeof(*sendEntry));
+        sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
+        sendEntry->header.type = (int32_t) sendEntry->msgSer->msgId;
+        int major;
+        int minor;
+        version_getMajor(sendEntry->msgSer->msgVersion, &major);
+        version_getMinor(sendEntry->msgSer->msgVersion, &minor);
+        sendEntry->header.major = (int8_t) major;
+        sendEntry->header.minor = (int8_t) minor;
+        uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
+        celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
+        hashMap_put(entry->msgEntries, key, sendEntry);
+      }
+      entry->service.handle = entry;
+      entry->service.localMsgTypeIdForMsgType = psa_tcp_localMsgTypeIdForMsgType;
+      entry->service.send = psa_tcp_topicPublicationSend;
+      hashMap_put(sender->boundedServices.map, (void *) bndId, entry);
+    } else {
+      L_ERROR("Error creating serializer map for TCP TopicSender %s/%s", sender->scope, sender->topic);
+    }
+  }
+  celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+  return &entry->service;
+}
+
+static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                          const celix_properties_t *svcProperties __attribute__((unused))) {
+  pubsub_tcp_topic_sender_t *sender = handle;
+  long bndId = celix_bundle_getId(requestingBundle);
+
+  celixThreadMutex_lock(&sender->boundedServices.mutex);
+  psa_tcp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId);
+  if (entry != NULL) {
+    entry->getCount -= 1;
+  }
+  if (entry != NULL && entry->getCount == 0) {
+    //free entry
+    hashMap_remove(sender->boundedServices.map, (void *) bndId);
+    int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+    if (rc != 0) {
+      L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
+    }
+
+    hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
+    while (hashMapIterator_hasNext(&iter)) {
+      psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
+      celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+      free(msgEntry);
+    }
+    hashMap_destroy(entry->msgEntries, false, false);
+    free(entry);
+  }
+  celixThreadMutex_unlock(&sender->boundedServices.mutex);
+}
+
+static void *psa_tcp_sendThread(void *data) {
+  pubsub_tcp_topic_sender_t *sender = data;
+
+  celixThreadMutex_lock(&sender->thread.mutex);
+  bool running = sender->thread.running;
+  celixThreadMutex_unlock(&sender->thread.mutex);
+
+  while (running) {
+    pubsub_tcpHandler_handler(sender->socketHandler);
+
+    celixThreadMutex_lock(&sender->thread.mutex);
+    running = sender->thread.running;
+    celixThreadMutex_unlock(&sender->thread.mutex);
+
+  } // while
+  return NULL;
+}
+
+pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) {
+  pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
+  snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
+  snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
+  celixThreadMutex_lock(&sender->boundedServices.mutex);
+  size_t count = 0;
+  hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
+  while (hashMapIterator_hasNext(&iter)) {
+    psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
+    while (hashMapIterator_hasNext(&iter2)) {
+      hashMapIterator_nextValue(&iter2);
+      count += 1;
+    }
+  }
+
+  result->msgMetrics = calloc(count, sizeof(*result));
+
+  iter = hashMapIterator_construct(sender->boundedServices.map);
+  int i = 0;
+  while (hashMapIterator_hasNext(&iter)) {
+    psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
+    while (hashMapIterator_hasNext(&iter2)) {
+      psa_tcp_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2);
+      celixThreadMutex_lock(&mEntry->metrics.mutex);
+      result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend;
+      result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
+      result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
+      result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
+      result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+      result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
+      result->msgMetrics[i].bndId = entry->bndId;
+      result->msgMetrics[i].typeId = mEntry->header.type;
+      snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName);
+      i += 1;
+      celixThreadMutex_unlock(&mEntry->metrics.mutex);
+    }
+  }
+
+  celixThreadMutex_unlock(&sender->boundedServices.mutex);
+  result->nrOfmsgMetrics = (int) count;
+  return result;
+}
+
+static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) {
+  int status = CELIX_SUCCESS;
+  psa_tcp_bounded_service_entry_t *bound = handle;
+  pubsub_tcp_topic_sender_t *sender = bound->parent;
+  bool monitor = sender->metricsEnabled;
+
+  psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
+
+  //metrics updates
+  struct timespec sendTime;
+  struct timespec serializationStart;
+  struct timespec serializationEnd;
+  //int unknownMessageCountUpdate = 0;
+  int sendErrorUpdate = 0;
+  int serializationErrorUpdate = 0;
+  int sendCountUpdate = 0;
+
+  if (entry != NULL) {
+    delay_first_send_for_late_joiners(sender);
+    if (monitor) {
+      clock_gettime(CLOCK_REALTIME, &serializationStart);
+    }
+
+    void *serializedOutput = NULL;
+    size_t serializedOutputLen = 0;
+    status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, &serializedOutput, &serializedOutputLen);
+
+    if (monitor) {
+      clock_gettime(CLOCK_REALTIME, &serializationEnd);
+    }
+
+    if (status == CELIX_SUCCESS /*ser ok*/) {
+      //TODO refactor, is the mutex really needed?
+      celixThreadMutex_lock(&entry->sendLock);
+      pubsub_tcp_msg_header_t msg_hdr = entry->header;
+      msg_hdr.seqNr = -1;
+      msg_hdr.sendtimeSeconds = 0;
+      msg_hdr.sendTimeNanoseconds = 0;
+      if (monitor) {
+        clock_gettime(CLOCK_REALTIME, &sendTime);
+        msg_hdr.sendtimeSeconds = (int64_t) sendTime.tv_sec;
+        msg_hdr.sendTimeNanoseconds = (int64_t) sendTime.tv_nsec;
+        msg_hdr.seqNr = entry->seqNr++;
+      }
+
+      errno = 0;
+      bool sendOk = true;
+      {
+        int rc = pubsub_tcpHandler_write(sender->socketHandler, &msg_hdr, serializedOutput, serializedOutputLen, 0);
+        if (rc < 0) {
+          status = -1;
+          sendOk = false;
+        }
+        free(serializedOutput);
+      }
+
+      celixThreadMutex_unlock(&entry->sendLock);
+      if (sendOk) {
+        sendCountUpdate = 1;
+      } else {
+        sendErrorUpdate = 1;
+        L_WARN("[PSA_TCP_TS] Error sending tcp. %s", strerror(errno));
+      }
+    } else {
+      serializationErrorUpdate = 1;
+      L_WARN("[PSA_TCP_TS] Error serialize message of type %s for scope/topic %s/%s", entry->msgSer->msgName,
+             sender->scope, sender->topic);
+    }
+  } else {
+    //unknownMessageCountUpdate = 1;
+    status = CELIX_SERVICE_EXCEPTION;
+    L_WARN("[PSA_TCP_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId,
+           sender->scope, sender->topic);
+  }
+
+
+  if (monitor && entry != NULL) {
+    celixThreadMutex_lock(&entry->metrics.mutex);
+
+    long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed;
+    double diff = celix_difftime(&serializationStart, &serializationEnd);
+    double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n + 1);
+    entry->metrics.averageSerializationTimeInSeconds = average;
+
+    if (entry->metrics.nrOfMessagesSend > 2) {
+      diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
+      n = entry->metrics.nrOfMessagesSend;
+      average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
+      entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+    }
+
+    entry->metrics.lastMessageSend = sendTime;
+    entry->metrics.nrOfMessagesSend += sendCountUpdate;
+    entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
+    entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
+
+    celixThreadMutex_unlock(&entry->metrics.mutex);
+  }
+
+  return status;
+}
+
+static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender) {
+
+  static bool firstSend = true;
+
+  if (firstSend) {
+    L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
+    sleep(FIRST_SEND_DELAY_IN_SECONDS);
+    firstSend = false;
+  }
+}
+
+static unsigned int rand_range(unsigned int min, unsigned int max) {
+  double scaled = ((double) random()) / ((double) RAND_MAX);
+  return (unsigned int) ((max - min + 1) * scaled + min);
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
new file mode 100644
index 0000000..7f298fd
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
@@ -0,0 +1,58 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_TCP_TOPIC_SENDER_H
+#define CELIX_PUBSUB_TCP_TOPIC_SENDER_H
+
+#include "celix_bundle_context.h"
+#include "pubsub_admin_metrics.h"
+#include "pubsub_tcp_common.h"
+
+typedef struct pubsub_tcp_topic_sender pubsub_tcp_topic_sender_t;
+
+pubsub_tcp_topic_sender_t* pubsub_tcpTopicSender_create(
+        celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        const celix_properties_t *topicProperties,
+        pubsub_tcp_endPointStore_t* endPointStore,
+        long serializerSvcId,
+        pubsub_serializer_service_t *ser,
+        const char *bindIP,
+        const char *staticBindUrl,
+        unsigned int basePort,
+        unsigned int maxPort);
+void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender);
+
+const char* pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender);
+const char* pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender);
+const char* pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
+bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
+
+long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
+void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
+
+/**
+ * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
+ */
+pubsub_admin_sender_metrics_t* pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender);
+
+#endif //CELIX_PUBSUB_TCP_TOPIC_SENDER_H
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index bfb7317..d37eb23 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -18,6 +18,61 @@
 find_package(CppUTest REQUIRED)
 find_package(Jansson REQUIRED)
 
+add_celix_bundle(pubsub_endpoint_sut
+        #"Vanilla" bundle which is under test
+        SOURCES
+        test/sut_endpoint_activator.c
+        VERSION 1.0.0
+        )
+target_include_directories(pubsub_endpoint_sut PRIVATE test)
+target_link_libraries(pubsub_endpoint_sut PRIVATE Celix::pubsub_api)
+celix_bundle_files(pubsub_endpoint_sut
+        meta_data/msg.descriptor
+        DESTINATION "META-INF/descriptors"
+        )
+celix_bundle_files(pubsub_endpoint_sut
+        meta_data/ping2.properties
+        DESTINATION "META-INF/topics/pub"
+        )
+
+add_celix_bundle(pubsub_endpoint_tst
+        #Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
+        SOURCES
+        test/tst_endpoint_activator.cc
+        VERSION 1.0.0
+        )
+target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
+celix_bundle_files(pubsub_endpoint_tst
+        meta_data/msg.descriptor
+        DESTINATION "META-INF/descriptors"
+        )
+celix_bundle_files(pubsub_endpoint_tst
+        meta_data/ping2.properties
+        DESTINATION "META-INF/topics/sub"
+        )
+
+
+add_celix_bundle(pubsub_loopback
+        #"Vanilla" bundle which is under test
+        SOURCES
+        test/loopback_activator.c
+        VERSION 1.0.0
+        )
+target_include_directories(pubsub_loopback PRIVATE test)
+target_link_libraries(pubsub_loopback PRIVATE Celix::pubsub_api)
+celix_bundle_files(pubsub_loopback
+        meta_data/msg.descriptor
+        DESTINATION "META-INF/descriptors"
+        )
+celix_bundle_files(pubsub_loopback
+        meta_data/pong2.properties
+        DESTINATION "META-INF/topics/pub"
+        )
+celix_bundle_files(pubsub_loopback
+        meta_data/pong2.properties
+        DESTINATION "META-INF/topics/sub"
+        )
+
 add_celix_bundle(pubsub_sut
     #"Vanilla" bundle which is under test
     SOURCES
@@ -71,6 +126,52 @@ message(WARNING "TODO fix issues with UDPMC and reanble test again")
 #add_test(NAME pubsub_udpmc_tests COMMAND pubsub_udpmc_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_udpmc_tests,CONTAINER_LOC>)
 #SETUP_TARGET_FOR_COVERAGE(pubsub_udpmc_tests_cov pubsub_udpmc_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_udpmc_tests/pubsub_udpmc_tests ..)
 
+add_celix_container(pubsub_tcp_tests
+        USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+        DIR ${CMAKE_CURRENT_BINARY_DIR}
+        PROPERTIES
+        LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+        BUNDLES
+        Celix::shell
+        Celix::shell_tui
+        Celix::pubsub_serializer_json
+        Celix::pubsub_topology_manager
+        Celix::pubsub_admin_tcp
+        pubsub_sut
+        pubsub_tst
+        )
+target_link_libraries(pubsub_tcp_tests PRIVATE Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
+target_include_directories(pubsub_tcp_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
+add_test(NAME pubsub_tcp_tests COMMAND pubsub_tcp_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_tests,CONTAINER_LOC>)
+SETUP_TARGET_FOR_COVERAGE(pubsub_tcp_tests_cov pubsub_tcp_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_tcp_tests/pubsub_tcp_tests ..)
+
+
+add_celix_container(pubsub_tcp_endpoint_tests
+        USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_endpoint_runner.cc
+        DIR ${CMAKE_CURRENT_BINARY_DIR}
+        PROPERTIES
+        LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+        BUNDLES
+        Celix::shell
+        Celix::shell_tui
+        Celix::pubsub_serializer_json
+        Celix::pubsub_topology_manager
+        Celix::pubsub_admin_tcp
+        pubsub_loopback
+        pubsub_endpoint_sut
+        pubsub_endpoint_tst
+        )
+target_link_libraries(pubsub_tcp_endpoint_tests PRIVATE Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
+target_include_directories(pubsub_tcp_endpoint_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
+
+#TCP Endpoint test is disabled because the test is not stable when running on Travis
+if (ENABLE_PUBSUB_PSA_TCP_ENDPOINT_TEST)
+add_test(NAME pubsub_tcp_endpoint_tests COMMAND pubsub_tcp_endpoint_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_endpoint_tests,CONTAINER_LOC>)
+SETUP_TARGET_FOR_COVERAGE(pubsub_tcp_endpoint_tests_cov pubsub_tcp_endpoint_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_tcp_endpoint_tests/pubsub_tcp_endpoint_tests ..)
+endif()
+
 if (BUILD_PUBSUB_PSA_ZMQ)
     add_celix_container(pubsub_zmq_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
@@ -109,4 +210,4 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     target_include_directories(pubsub_zmq_zerocopy_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
     add_test(NAME pubsub_zmq_zerocopy_tests COMMAND pubsub_zmq_zerocopy_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_tests,CONTAINER_LOC>)
     SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_zerocopy_tests_cov pubsub_zmq_zerocopy_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_zmq_tests/pubsub_zmq_zerocopy_tests ..)
-endif ()
\ No newline at end of file
+endif ()
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/ping.properties
index 96d2267..b73435d 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/ping.properties
@@ -16,6 +16,8 @@
 # under the License.
 zmq.static.bind.url=ipc:///tmp/pubsub-pingtest
 zmq.static.connect.urls=ipc:///tmp/pubsub-pingtest
+tcp.static.bind.url=tcp://127.0.0.1:9000
+tcp.static.connect.urls=tcp://127.0.0.1:9000
 udpmc.static.bind.port=50678
 udpmc.static.connect.socket_addresses=224.100.0.1:50678
 
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/ping2.properties
similarity index 82%
copy from bundles/pubsub/test/meta_data/ping.properties
copy to bundles/pubsub/test/meta_data/ping2.properties
index 96d2267..01f5fec 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/ping2.properties
@@ -14,10 +14,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-zmq.static.bind.url=ipc:///tmp/pubsub-pingtest
-zmq.static.connect.urls=ipc:///tmp/pubsub-pingtest
-udpmc.static.bind.port=50678
-udpmc.static.connect.socket_addresses=224.100.0.1:50678
+tcp.static.bind.url=tcp://127.0.0.1:9500
+tcp.static.endpoint.type=server
 
 #note only effective if run as root
 thread.realtime.shed=SCHED_FIFO
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/pong2.properties
similarity index 82%
copy from bundles/pubsub/test/meta_data/ping.properties
copy to bundles/pubsub/test/meta_data/pong2.properties
index 96d2267..a746995 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/pong2.properties
@@ -14,10 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-zmq.static.bind.url=ipc:///tmp/pubsub-pingtest
-zmq.static.connect.urls=ipc:///tmp/pubsub-pingtest
-udpmc.static.bind.port=50678
-udpmc.static.connect.socket_addresses=224.100.0.1:50678
+tcp.static.bind.url=tcp://127.0.0.1:9501
+tcp.static.connect.urls=tcp://127.0.0.1:9500
+tcp.static.endpoint.type=client
 
 #note only effective if run as root
 thread.realtime.shed=SCHED_FIFO
diff --git a/bundles/pubsub/test/test/loopback_activator.c b/bundles/pubsub/test/test/loopback_activator.c
new file mode 100644
index 0000000..7c7496f
--- /dev/null
+++ b/bundles/pubsub/test/test/loopback_activator.c
@@ -0,0 +1,98 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <constants.h>
+#include <unistd.h>
+
+#include "celix_api.h"
+#include "pubsub/api.h"
+#include "msg.h"
+
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+static void sut_pubSet(void *handle, void *service);
+
+struct activator {
+	long pubTrkId;
+  long subSvcId;
+	pubsub_publisher_t* pubSvc;
+  pubsub_subscriber_t subSvc;
+  pthread_mutex_t mutex;
+  unsigned int count;
+  unsigned int msgId;
+};
+
+celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
+
+	char filter[512];
+	snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "pong2");
+	celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+	opts.set = sut_pubSet;
+	opts.callbackHandle = act;
+	opts.filter.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+	opts.filter.filter = filter;
+	act->pubTrkId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+	act->count = 0;
+  act->msgId = 0;
+
+  pthread_mutex_init(&act->mutex, NULL);
+
+  celix_properties_t *props = celix_properties_create();
+  celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "pong2");
+  act->subSvc.handle = act;
+  act->subSvc.receive = tst_receive;
+  act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
+
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
+	celix_bundleContext_stopTracker(ctx, act->pubTrkId);
+  celix_bundleContext_unregisterService(ctx, act->subSvcId);
+  pthread_mutex_destroy(&act->mutex);
+	return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop);
+
+static void sut_pubSet(void *handle, void *service) {
+	struct activator* act = handle;
+	pthread_mutex_lock(&act->mutex);
+	act->pubSvc = service;
+	pthread_mutex_unlock(&act->mutex);
+}
+
+
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void * voidMsg, bool *release) {
+  struct activator *act =handle;
+  msg_t *msg = voidMsg;
+  msg_t send_msg = *msg;
+  pthread_mutex_lock(&act->mutex);
+  if (act->pubSvc != NULL) {
+    if (act->count == 0) {
+      act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &act->msgId);
+    }
+    act->pubSvc->send(act->pubSvc->handle, act->msgId, &send_msg);
+    act->count += 1;
+  }
+  pthread_mutex_unlock(&act->mutex);
+  return CELIX_SUCCESS;
+}
diff --git a/bundles/pubsub/test/test/sut_endpoint_activator.c b/bundles/pubsub/test/test/sut_endpoint_activator.c
new file mode 100644
index 0000000..9e80b37
--- /dev/null
+++ b/bundles/pubsub/test/test/sut_endpoint_activator.c
@@ -0,0 +1,116 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <constants.h>
+#include <unistd.h>
+
+#include "celix_api.h"
+#include "pubsub/api.h"
+#include "msg.h"
+
+static void sut_pubSet(void *handle, void *service);
+static void* sut_sendThread(void *data);
+
+struct activator {
+	long pubTrkId;
+
+	pthread_t sendThread;
+
+	pthread_mutex_t mutex;
+	bool running;
+	pubsub_publisher_t* pubSvc;
+};
+
+celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
+
+	char filter[512];
+	snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "ping2");
+	celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+	opts.set = sut_pubSet;
+	opts.callbackHandle = act;
+	opts.filter.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+	opts.filter.filter = filter;
+	act->pubTrkId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+	act->running = true;
+	pthread_create(&act->sendThread, NULL, sut_sendThread, act);
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
+	pthread_mutex_lock(&act->mutex);
+	act->running = false;
+	pthread_mutex_unlock(&act->mutex);
+	pthread_join(act->sendThread, NULL);
+	pthread_mutex_destroy(&act->mutex);
+
+	celix_bundleContext_stopTracker(ctx, act->pubTrkId);
+	return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop);
+
+static void sut_pubSet(void *handle, void *service) {
+	struct activator* act = handle;
+	pthread_mutex_lock(&act->mutex);
+	act->pubSvc = service;
+	pthread_mutex_unlock(&act->mutex);
+}
+
+static void* sut_sendThread(void *data) {
+	struct activator *act = data;
+
+	pthread_mutex_lock(&act->mutex);
+	bool running = act->running;
+	pthread_mutex_unlock(&act->mutex);
+
+	unsigned int msgId = 0;
+	msg_t msg;
+	msg.seqNr = 1;
+
+	while (running) {
+		pthread_mutex_lock(&act->mutex);
+		if (act->pubSvc != NULL) {
+		    if (msgId == 0) {
+		        act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
+		    }
+
+			act->pubSvc->send(act->pubSvc->handle, msgId, &msg);
+            if (msg.seqNr % 1000 == 0) {
+                printf("Send %i messages\n", msg.seqNr);
+            }
+
+		    msg.seqNr += 1;
+
+        }
+        pthread_mutex_unlock(&act->mutex);
+
+		usleep(10000);
+
+		pthread_mutex_lock(&act->mutex);
+		running = act->running;
+		pthread_mutex_unlock(&act->mutex);
+	}
+    printf("Send %i messages\n", msg.seqNr);
+
+    return NULL;
+}
diff --git a/bundles/pubsub/test/test/test_endpoint_runner.cc b/bundles/pubsub/test/test/test_endpoint_runner.cc
new file mode 100644
index 0000000..407378c
--- /dev/null
+++ b/bundles/pubsub/test/test/test_endpoint_runner.cc
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "celix_api.h"
+#include "unistd.h"
+
+#include <CppUTest/TestHarness.h>
+#include <CppUTest/CommandLineTestRunner.h>
+
+int main(int argc, char **argv) {
+    celix_framework_t *fw = NULL;
+    celixLauncher_launch("config.properties", &fw);
+
+    MemoryLeakWarningPlugin::turnOffNewDeleteOverloads();
+    usleep(1000000);
+    int rc = RUN_ALL_TESTS(argc, argv);
+
+    celixLauncher_stop(fw);
+    celixLauncher_waitForShutdown(fw);
+    celixLauncher_destroy(fw);
+
+    return rc;
+}
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.cc b/bundles/pubsub/test/test/tst_endpoint_activator.cc
new file mode 100644
index 0000000..652a9af
--- /dev/null
+++ b/bundles/pubsub/test/test/tst_endpoint_activator.cc
@@ -0,0 +1,120 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "celix_api.h"
+#include "pubsub/api.h"
+
+#include "msg.h"
+
+#include <CppUTest/TestHarness.h>
+#include <CppUTestExt/MockSupport.h>
+#include <constants.h>
+
+extern "C" {
+
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+
+struct activator {
+    pubsub_subscriber_t subSvc;
+    long subSvcId;
+
+    pthread_mutex_t mutex;
+    unsigned int count = 0;
+};
+
+static struct activator *g_act = NULL; //global
+
+celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
+    pthread_mutex_init(&act->mutex, NULL);
+
+    celix_properties_t *props = celix_properties_create();
+    celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping2");
+    act->subSvc.handle = act;
+    act->subSvc.receive = tst_receive;
+    act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
+
+    g_act = act;
+
+    return CELIX_SUCCESS;
+}
+
+celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
+    celix_bundleContext_unregisterService(ctx, act->subSvcId);
+    pthread_mutex_destroy(&act->mutex);
+    return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
+
+
+static int tst_receive(void *handle, const char * /*msgType*/, unsigned int /*msgTypeId*/, void * voidMsg, bool */*release*/) {
+    struct activator *act = static_cast<struct activator *>(handle);
+
+    msg_t *msg = static_cast<msg_t*>(voidMsg);
+    static int prevSeqNr = 0;
+    int delta = msg->seqNr - prevSeqNr;
+    if (delta != 1) {
+        fprintf(stderr, "Warning: missing messages. seq jumped from %i to %i\n", prevSeqNr, msg->seqNr);
+    }
+    prevSeqNr = msg->seqNr;
+
+    pthread_mutex_lock(&act->mutex);
+    act->count += 1;
+    pthread_mutex_unlock(&act->mutex);
+    return CELIX_SUCCESS;
+}
+
+} //end extern C
+
+TEST_GROUP(PUBSUB_INT_GROUP)
+{
+	void setup() {
+	    //nop
+	}
+
+	void teardown() {
+		//nop
+	}
+};
+
+TEST(PUBSUB_INT_GROUP, recvTest) {
+    constexpr int TRIES = 25;
+    constexpr int TIMEOUT = 250000;
+    constexpr int MSG_COUNT = 100;
+
+    int count = 0;
+
+    for (int i = 0; i < TRIES; ++i) {
+        pthread_mutex_lock(&g_act->mutex);
+        count = g_act->count;
+        pthread_mutex_unlock(&g_act->mutex);
+        printf("Current msg count is %i, waiting for at least %i\n", count, MSG_COUNT);
+        if (count >= MSG_COUNT) {
+            break;
+        }
+        usleep(TIMEOUT);
+    }
+    CHECK(count >= MSG_COUNT);
+
+}