You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2019/01/07 10:53:05 UTC

[celix] branch develop updated (b3d28d1 -> 2426cf9)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git.


    from b3d28d1  Removing bad character from cmake_commands/README.md
     add b2548c8  CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.
     add ec7fdcf  CELIX-454: Refactors the pubsub tests and adds backtrace printing support for framework_log and loghelper
     add fe86866  CELIX-454: Fixes some mocking issues and configured pubsub_zmq_tests to run under `make test`
     add 2b8f92d  CELIX-454: Fixes a compile error for a older C++ compiler
     add 88a575f  CELIX-454: Refactors pubsub msg serializer so that it uses the same handle approach as services to prevent confusion.
     add ac47373  CELIX-454: Fixes some testing using concerning the mocks
     add 1df73a0  CELIX-454: Adds pubsub_updmc_tests
     add 11472da  CELIX-454: Small fix for osx compilation
     add 5410a08  CELIX-454: Add ignore calls to bundle_test cpputest.
     add eb450c7  CELIX-454: Fixes compile issue with osx
     add d1d001e  Merge branch 'develop' into feature/CELIX-454-pubsub-disc
     new 363d491  Merge branch 'feature/CELIX-454-pubsub-disc' into develop
     new 2426cf9  CELIX-454: Updates some logging in pubsub discovery from error level to warning level

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .travis.yml                                        |    1 +
 bundles/log_service/src/log_helper.c               |   52 +-
 bundles/pubsub/CMakeLists.txt                      |    2 +-
 bundles/pubsub/examples/CMakeLists.txt             |   42 +-
 bundles/pubsub/examples/mp_pubsub/CMakeLists.txt   |   23 -
 .../pubsub/examples/mp_pubsub/common/include/ew.h  |   53 -
 .../pubsub/examples/mp_pubsub/common/include/ide.h |   49 -
 .../examples/mp_pubsub/common/include/kinematics.h |   55 -
 .../mp_pubsub/msg_descriptors/msg_ew.descriptor    |    9 -
 .../mp_pubsub/msg_descriptors/msg_ide.descriptor   |    9 -
 .../msg_descriptors/msg_kinematics.descriptor      |   10 -
 .../examples/mp_pubsub/publisher/CMakeLists.txt    |   43 -
 .../private/include/mp_publisher_private.h         |   58 -
 .../publisher/private/src/mp_pub_activator.c       |  144 --
 .../mp_pubsub/publisher/private/src/mp_publisher.c |  160 ---
 .../examples/mp_pubsub/subscriber/CMakeLists.txt   |   43 -
 .../private/include/mp_subscriber_private.h        |   50 -
 .../subscriber/private/src/mp_sub_activator.c      |  108 --
 .../subscriber/private/src/mp_subscriber.c         |  119 --
 .../private/include/pubsub_subscriber_private.h    |    2 +-
 .../subscriber/private/src/pubsub_subscriber.c     |    2 +-
 bundles/pubsub/mock/src/publisher_mock.cc          |   14 -
 .../src/pubsub_nanomsg_admin.cc                    |   39 +-
 .../src/pubsub_nanomsg_admin.h                     |   35 +-
 .../src/pubsub_nanomsg_topic_receiver.cc           |    2 +-
 .../src/pubsub_nanomsg_topic_sender.cc             |    1 -
 .../pubsub/pubsub_admin_udp_mc/src/psa_activator.c |    6 +-
 .../src/pubsub_psa_udpmc_constants.h               |   29 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   |   67 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h   |   21 +-
 .../src/pubsub_udpmc_topic_receiver.c              |  242 +++-
 .../src/pubsub_udpmc_topic_receiver.h              |    8 +-
 .../src/pubsub_udpmc_topic_sender.c                |   36 +-
 .../src/pubsub_udpmc_topic_sender.h                |    5 +-
 .../pubsub/pubsub_admin_zmq/src/psa_activator.c    |    6 +-
 .../src/pubsub_psa_zmq_constants.h                 |   42 +-
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c |   50 +-
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h |   27 +-
 .../src/pubsub_zmq_topic_receiver.c                |  127 +-
 .../src/pubsub_zmq_topic_receiver.h                |    1 +
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c |   68 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h |    2 +
 .../sync.h => pubsub_api/include/pubsub/api.h}     |   13 +-
 .../pubsub/pubsub_api/include/pubsub/publisher.h   |   21 +-
 .../pubsub/pubsub_api/include/pubsub/subscriber.h  |   20 +-
 .../pubsub/pubsub_discovery/src/psd_activator.c    |    2 +-
 .../pubsub_discovery/src/pubsub_discovery_impl.c   |   20 +-
 .../pubsub_discovery/src/pubsub_discovery_impl.h   |    2 +-
 .../src/ps_json_serializer_activator.c             |    8 +-
 .../src/pubsub_serializer_impl.c                   |   76 +-
 .../src/pubsub_serializer_impl.h                   |   18 +-
 bundles/pubsub/pubsub_spi/include/pubsub_admin.h   |   18 +-
 .../pubsub/pubsub_spi/include/pubsub_listeners.h   |    7 +-
 .../pubsub/pubsub_spi/include/pubsub_serializer.h  |    1 +
 bundles/pubsub/pubsub_spi/include/pubsub_utils.h   |   39 +-
 bundles/pubsub/pubsub_spi/src/pubsub_utils.c       |    4 +-
 bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c |   12 +-
 .../src/pubsub_topology_manager.c                  | 1429 +++++++++++---------
 .../src/pubsub_topology_manager.h                  |    2 +-
 bundles/pubsub/test/CMakeLists.txt                 |  146 +-
 .../{msg_descriptors => meta_data}/msg.descriptor  |    2 +-
 .../test/meta_data/ping.properties}                |    7 +-
 .../pubsub/test/msg_descriptors/sync.descriptor    |    9 -
 bundles/pubsub/test/test/sut_activator.c           |  123 +-
 bundles/pubsub/test/test/test_runner.cc            |   19 +
 bundles/pubsub/test/test/tst_activator.cc          |  120 ++
 bundles/pubsub/test/test/tst_activator.cpp         |  221 ---
 cmake/cmake_celix/ContainerPackaging.cmake         |    6 +-
 libs/framework/include/celix_bundle_activator.h    |    2 +-
 libs/framework/include/celix_log.h                 |    2 +
 libs/framework/private/test/bundle_cache_test.cpp  |    2 +
 libs/framework/private/test/bundle_test.cpp        |   36 +-
 libs/framework/src/bundle.c                        |   15 +-
 libs/framework/src/bundle_context.c                |    3 +-
 libs/framework/src/bundle_private.h                |    1 +
 libs/framework/src/celix_log.c                     |   31 +-
 libs/utils/CMakeLists.txt                          |    9 +-
 libs/utils/src/properties.c                        |    4 +-
 78 files changed, 1893 insertions(+), 2419 deletions(-)
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/common/include/ew.h
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/common/include/ide.h
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
 delete mode 100644 bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
 rename bundles/pubsub/{test/test/sync.h => pubsub_api/include/pubsub/api.h} (84%)
 rename bundles/pubsub/test/{msg_descriptors => meta_data}/msg.descriptor (90%)
 copy bundles/{remote_services/remote_service_admin_dfi/test/config.properties.in => pubsub/test/meta_data/ping.properties} (81%)
 delete mode 100644 bundles/pubsub/test/msg_descriptors/sync.descriptor
 create mode 100644 bundles/pubsub/test/test/test_runner.cc
 create mode 100644 bundles/pubsub/test/test/tst_activator.cc
 delete mode 100644 bundles/pubsub/test/test/tst_activator.cpp


[celix] 02/02: CELIX-454: Updates some logging in pubsub discovery from error level to warning level

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 2426cf97647d20dec83a65a7df5a0c3996b15246
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jan 7 11:52:23 2019 +0100

    CELIX-454: Updates some logging in pubsub discovery from error level to warning level
---
 bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 973b38b..4e902e3 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -264,7 +264,7 @@ void* psd_refresh(void *data) {
                 //only refresh ttl -> no index update -> no watch trigger
                 int rc = etcd_refresh(entry->key, disc->ttlForEntries);
                 if (rc != ETCDLIB_RC_OK) {
-                    L_ERROR("[PSD] Warning: error refreshing etcd key %s\n", entry->key);
+                    L_WARN("[PSD] Warning: error refreshing etcd key %s\n", entry->key);
                     entry->isSet = false;
                     entry->errorCount += 1;
                 } else {
@@ -277,7 +277,7 @@ void* psd_refresh(void *data) {
                     entry->isSet = true;
                     entry->setCount += 1;
                 } else {
-                    L_ERROR("[PSD] Warning: error setting endpoint in etcd for key %s\n", entry->key);
+                    L_WARN("[PSD] Warning: error setting endpoint in etcd for key %s\n", entry->key);
                     entry->errorCount += 1;
                 }
                 free(str);
@@ -545,7 +545,7 @@ celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, con
 
     bool valid = pubsubEndpoint_isValid(props, true, true);
     if (!valid) {
-        L_ERROR("[PSD] Warning retrieved endpoint is not valid\n");
+        L_WARN("[PSD] Warning retrieved endpoint is not valid\n");
         celix_properties_destroy(props);
         props = NULL;
     }


[celix] 01/02: Merge branch 'feature/CELIX-454-pubsub-disc' into develop

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 363d491f4ae2e977735c94d56c0ade5c799c6ce0
Merge: b3d28d1 d1d001e
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jan 7 11:47:33 2019 +0100

    Merge branch 'feature/CELIX-454-pubsub-disc' into develop

 .travis.yml                                        |    1 +
 bundles/log_service/src/log_helper.c               |   52 +-
 bundles/pubsub/CMakeLists.txt                      |    2 +-
 bundles/pubsub/examples/CMakeLists.txt             |   42 +-
 .../pubsub/examples/mp_pubsub/common/include/ew.h  |   53 -
 .../pubsub/examples/mp_pubsub/common/include/ide.h |   49 -
 .../examples/mp_pubsub/common/include/kinematics.h |   55 -
 .../mp_pubsub/msg_descriptors/msg_ew.descriptor    |    9 -
 .../mp_pubsub/msg_descriptors/msg_ide.descriptor   |    9 -
 .../msg_descriptors/msg_kinematics.descriptor      |   10 -
 .../examples/mp_pubsub/publisher/CMakeLists.txt    |   43 -
 .../private/include/mp_publisher_private.h         |   58 -
 .../publisher/private/src/mp_pub_activator.c       |  144 --
 .../mp_pubsub/publisher/private/src/mp_publisher.c |  160 ---
 .../examples/mp_pubsub/subscriber/CMakeLists.txt   |   43 -
 .../private/include/mp_subscriber_private.h        |   50 -
 .../subscriber/private/src/mp_sub_activator.c      |  108 --
 .../subscriber/private/src/mp_subscriber.c         |  119 --
 .../private/include/pubsub_subscriber_private.h    |    2 +-
 .../subscriber/private/src/pubsub_subscriber.c     |    2 +-
 bundles/pubsub/mock/src/publisher_mock.cc          |   14 -
 .../src/pubsub_nanomsg_admin.cc                    |   39 +-
 .../src/pubsub_nanomsg_admin.h                     |   35 +-
 .../src/pubsub_nanomsg_topic_receiver.cc           |    2 +-
 .../src/pubsub_nanomsg_topic_sender.cc             |    1 -
 .../pubsub/pubsub_admin_udp_mc/src/psa_activator.c |    6 +-
 .../src/pubsub_psa_udpmc_constants.h               |   29 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   |   67 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h   |   21 +-
 .../src/pubsub_udpmc_topic_receiver.c              |  242 +++-
 .../src/pubsub_udpmc_topic_receiver.h              |    8 +-
 .../src/pubsub_udpmc_topic_sender.c                |   36 +-
 .../src/pubsub_udpmc_topic_sender.h                |    5 +-
 .../pubsub/pubsub_admin_zmq/src/psa_activator.c    |    6 +-
 .../src/pubsub_psa_zmq_constants.h                 |   42 +-
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c |   50 +-
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h |   27 +-
 .../src/pubsub_zmq_topic_receiver.c                |  127 +-
 .../src/pubsub_zmq_topic_receiver.h                |    1 +
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c |   68 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h |    2 +
 .../sync.h => pubsub_api/include/pubsub/api.h}     |   13 +-
 .../pubsub/pubsub_api/include/pubsub/publisher.h   |   21 +-
 .../pubsub/pubsub_api/include/pubsub/subscriber.h  |   20 +-
 .../pubsub/pubsub_discovery/src/psd_activator.c    |    2 +-
 .../pubsub_discovery/src/pubsub_discovery_impl.c   |   14 +-
 .../pubsub_discovery/src/pubsub_discovery_impl.h   |    2 +-
 .../src/ps_json_serializer_activator.c             |    8 +-
 .../src/pubsub_serializer_impl.c                   |   76 +-
 .../src/pubsub_serializer_impl.h                   |   18 +-
 bundles/pubsub/pubsub_spi/include/pubsub_admin.h   |   18 +-
 .../pubsub/pubsub_spi/include/pubsub_listeners.h   |    7 +-
 .../pubsub/pubsub_spi/include/pubsub_serializer.h  |    1 +
 bundles/pubsub/pubsub_spi/include/pubsub_utils.h   |   39 +-
 bundles/pubsub/pubsub_spi/src/pubsub_utils.c       |    4 +-
 bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c |   12 +-
 .../src/pubsub_topology_manager.c                  | 1429 +++++++++++---------
 .../src/pubsub_topology_manager.h                  |    2 +-
 bundles/pubsub/test/CMakeLists.txt                 |  146 +-
 .../{msg_descriptors => meta_data}/msg.descriptor  |    2 +-
 .../meta_data/ping.properties}                     |   14 +-
 .../pubsub/test/msg_descriptors/sync.descriptor    |    9 -
 bundles/pubsub/test/test/sut_activator.c           |  123 +-
 bundles/pubsub/test/test/test_runner.cc            |   19 +
 bundles/pubsub/test/test/tst_activator.cc          |  120 ++
 bundles/pubsub/test/test/tst_activator.cpp         |  221 ---
 cmake/cmake_celix/ContainerPackaging.cmake         |    6 +-
 libs/framework/include/celix_bundle_activator.h    |    2 +-
 libs/framework/include/celix_log.h                 |    2 +
 libs/framework/private/test/bundle_cache_test.cpp  |    2 +
 libs/framework/private/test/bundle_test.cpp        |   36 +-
 libs/framework/src/bundle.c                        |   15 +-
 libs/framework/src/bundle_context.c                |    3 +-
 libs/framework/src/bundle_private.h                |    1 +
 libs/framework/src/celix_log.c                     |   31 +-
 libs/utils/CMakeLists.txt                          |    9 +-
 libs/utils/src/properties.c                        |    4 +-
 77 files changed, 1892 insertions(+), 2398 deletions(-)

diff --cc bundles/pubsub/examples/CMakeLists.txt
index 126db2d,2e43e6d..dd9ca54
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@@ -251,104 -223,6 +223,92 @@@ if (BUILD_PUBSUB_PSA_ZMQ
                  etcd
              USE_TERM
          )
- 
-         #Runtime starting a multipart (multiple message in one send) publish and subscriber for zmq
-         add_celix_runtime(pubsub_rt_multipart_zmq
-             NAME zmq_multipart
-             GROUP pubsub
-             CONTAINERS
-                 pubsub_mp_subscriber_zmq
-                 pubsub_mp_publisher_zmq
-             COMMANDS
-                 etcd
-             USE_TERM
-         )
      endif ()
 +endif()
 +if (BUILD_PUBSUB_PSA_NANOMSG)
 +    add_celix_container("pubsub_publisher1_nanomsg"
 +            GROUP "pubsub"
 +            BUNDLES
 +            Celix::shell
 +            Celix::shell_tui
 +            Celix::pubsub_serializer_json
 +            Celix::pubsub_discovery_etcd
 +            Celix::pubsub_topology_manager
 +            Celix::pubsub_admin_nanomsg
 +            celix_pubsub_poi_publisher
 +            PROPERTIES
 +            PSA_NANOMSG_VERBOSE=true
 +            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
 +            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
 +            )
 +    target_link_libraries(pubsub_publisher1_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
 +
 +    add_celix_container("pubsub_publisher2_nanomsg"
 +            GROUP "pubsub"
 +            BUNDLES
 +            Celix::shell
 +            Celix::shell_tui
 +            Celix::pubsub_serializer_json
 +            Celix::pubsub_discovery_etcd
 +            Celix::pubsub_topology_manager
 +            Celix::pubsub_admin_nanomsg
 +            celix_pubsub_poi_publisher
 +            PROPERTIES
 +            PSA_NANOMSG_VERBOSE=true
 +            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
 +            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
 +            )
 +    target_link_libraries(pubsub_publisher2_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
 +
 +    add_celix_container(pubsub_subscriber1_nanomsg
 +            GROUP "pubsub"
 +            BUNDLES
 +            Celix::shell
 +            Celix::shell_tui
 +            Celix::pubsub_serializer_json
 +            Celix::pubsub_discovery_etcd
 +            Celix::pubsub_topology_manager
 +            Celix::pubsub_admin_nanomsg
 +            celix_pubsub_poi_subscriber
 +            PROPERTIES
 +            PSA_NANOMSG_VERBOSE=true
 +            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
 +            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
 +            )
 +    target_link_libraries(pubsub_subscriber1_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
 +
 +    add_celix_container(pubsub_subscriber2_nanomsg
 +            GROUP "pubsub"
 +            BUNDLES
 +            Celix::shell
 +            Celix::shell_tui
 +            Celix::pubsub_serializer_json
 +            Celix::pubsub_discovery_etcd
 +            Celix::pubsub_topology_manager
 +            Celix::pubsub_admin_nanomsg
 +            celix_pubsub_poi_subscriber
 +            PROPERTIES
 +            PSA_NANOMSG_VERBOSE=true
 +            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
 +            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
 +            )
 +    target_link_libraries(pubsub_subscriber2_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
 +
 +
 +    if (ETCD_CMD AND XTERM_CMD)
 +        #Runtime starting a publish and 2 subscribers for zmq
 +        add_celix_runtime(pubsub_rt_nanomsg
-                 NAME zmq
++                NAME nanomsg
 +                GROUP pubsub
 +                CONTAINERS
 +                pubsub_publisher1_nanomsg
 +                pubsub_publisher2_nanomsg
 +                pubsub_subscriber1_nanomsg
 +                pubsub_subscriber2_nanomsg
 +                COMMANDS
 +                etcd
 +                USE_TERM
 +                )
 +    endif ()
  
  endif()
diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 8f3c229,0000000..f44a067
mode 100644,000000..100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@@ -1,609 -1,0 +1,614 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <string>
 +#include <vector>
 +#include <functional>
 +#include <memory.h>
 +#include <iostream>
 +#include <sys/socket.h>
 +#include <netinet/in.h>
 +#include <arpa/inet.h>
 +#include <netdb.h>
 +#include <ifaddrs.h>
 +#include <pubsub_endpoint.h>
 +#include <algorithm>
 +
 +#include "pubsub_utils.h"
 +#include "pubsub_nanomsg_admin.h"
 +#include "pubsub_psa_nanomsg_constants.h"
 +
 +
 +static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
 +
 +pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx):
 +    ctx{_ctx},
 +    L{ctx, "pubsub_nanomsg_admin"} {
 +    verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
 +    fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, nullptr);
 +
 +    char *ip = nullptr;
 +    const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , nullptr);
 +    if (confIp != nullptr) {
 +        ip = strndup(confIp, 1024);
 +    }
 +
 +    if (ip == nullptr) {
 +        //TODO try to get ip from subnet (CIDR)
 +    }
 +
 +    if (ip == nullptr) {
 +        //try to get ip from itf
 +        const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, nullptr);
 +        nanoMsg_getIpAddress(interface, &ip);
 +    }
 +
 +    if (ip == nullptr) {
 +        L.WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using default ip (", PUBSUB_NANOMSG_DEFAULT_IP, ")");
 +        ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024);
 +    }
 +
 +    ipAddress = ip;
 +    if (verbose) {
 +        L.INFO("[PSA_NANOMSG] Using ", ip, " for service annunciation.");
 +    }
 +
 +
 +    long _basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT);
 +    long _maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT);
 +    basePort = (unsigned int)_basePort;
 +    maxPort = (unsigned int)_maxPort;
 +    if (verbose) {
 +        L.INFO("[PSA_NANOMSG] Using base till max port: ", _basePort, " till ", _maxPort);
 +    }
 +
 +
 +    defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
 +    qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
 +    qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
 +}
 +
 +pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
 +    //note assuming al psa register services and service tracker are removed.
 +    {
 +//        std::lock_guard<std::mutex> lock(topicSenders.mutex);
 +//        for (auto &kv : topicSenders.map) {
 +//            auto &sender = kv.second;
 +//            delete (sender);
 +//        }
 +    }
 +
 +    {
 +        std::lock_guard<std::mutex> lock(topicReceivers.mutex);
 +        for (auto &kv: topicReceivers.map) {
 +            delete kv.second;
 +        }
 +    }
 +
 +    {
 +        std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
 +        for (auto &entry : discoveredEndpoints.map) {
 +            auto *ep = entry.second;
 +            celix_properties_destroy(ep);
 +        }
 +    }
 +
 +    {
 +        std::lock_guard<std::mutex> lock(serializers.mutex);
 +        serializers.map.clear();
 +    }
 +
 +    free(ipAddress);
 +
 +}
 +
 +void pubsub_nanomsg_admin::start() {
 +    adminService.handle = this;
-     adminService.matchPublisher = [](void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId) {
++    adminService.matchPublisher = [](void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *score, long *serializerSvcId) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
-         return me->matchPublisher(svcRequesterBndId, svcFilter,score, serializerSvcId);
++        return me->matchPublisher(svcRequesterBndId, svcFilter, outTopicProperties, score, serializerSvcId);
 +    };
-     adminService.matchSubscriber = [](void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId) {
++    adminService.matchSubscriber = [](void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *score, long *serializerSvcId) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
-         return me->matchSubscriber(svcProviderBndId, svcProperties, score, serializerSvcId);
++        return me->matchSubscriber(svcProviderBndId, svcProperties, outTopicProperties, score, serializerSvcId);
 +    };
-     adminService.matchEndpoint = [](void *handle, const celix_properties_t *endpoint, bool *match) {
++    adminService.matchDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint, bool *match) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->matchEndpoint(endpoint, match);
 +    };
-     adminService.setupTopicSender = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint) {
++    adminService.setupTopicSender = [](void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
-         return me->setupTopicSender(scope, topic, serializerSvcId, publisherEndpoint);
++        return me->setupTopicSender(scope, topic, topicProperties, serializerSvcId, publisherEndpoint);
 +    };
 +    adminService.teardownTopicSender = [](void *handle, const char *scope, const char *topic) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->teardownTopicSender(scope, topic);
 +    };
-     adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
++    adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
-         return me->setupTopicReceiver(std::string(scope), std::string(topic),serializerSvcId, subscriberEndpoint);
++        return me->setupTopicReceiver(std::string(scope), std::string(topic), topicProperties, serializerSvcId, subscriberEndpoint);
 +    };
 +
 +    adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->teardownTopicReceiver(scope, topic);
 +    };
-     adminService.addEndpoint = [](void *handle, const celix_properties_t *endpoint) {
++    adminService.addDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->addEndpoint(endpoint);
 +    };
-     adminService.removeEndpoint = [](void *handle, const celix_properties_t *endpoint) {
++    adminService.removeDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->removeEndpoint(endpoint);
 +    };
 +
 +    celix_properties_t *props = celix_properties_create();
 +    celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE);
 +
 +    adminSvcId = celix_bundleContext_registerService(ctx, static_cast<void*>(&adminService), PUBSUB_ADMIN_SERVICE_NAME, props);
 +
 +
 +    celix_service_tracking_options_t opts{};
 +    opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
 +    opts.filter.ignoreServiceLanguage = true;
 +    opts.callbackHandle = this;
 +    opts.addWithProperties = [](void *handle, void *svc, const celix_properties_t *props) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        me->addSerializerSvc(svc, props);
 +    };
 +    opts.removeWithProperties = [](void *handle, void *svc, const celix_properties_t *props) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        me->removeSerializerSvc(svc, props);
 +    };
 +    serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
 +
 +    //register shell command service
 +    cmdSvc.handle = this;
 +    cmdSvc.executeCommand = [](void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->executeCommand(commandLine, outStream, errorStream);
 +    };
 +
 +    celix_properties_t* shellProps = celix_properties_create();
 +    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
 +    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
 +    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the nanomsg PSA");
 +    cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps);
 +
 +}
 +
 +void pubsub_nanomsg_admin::stop() {
 +    celix_bundleContext_unregisterService(ctx, adminSvcId);
 +    celix_bundleContext_unregisterService(ctx, cmdSvcId);
 +    celix_bundleContext_stopTracker(ctx, serializersTrackerId);
 +}
 +
 +void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t *props) {
 +    const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, nullptr);
 +    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
 +
 +    if (serType == nullptr) {
 +        L.INFO("[PSA_NANOMSG] Ignoring serializer service without ", PUBSUB_SERIALIZER_TYPE_KEY, " property");
 +        return;
 +    }
 +
 +    {
 +        std::lock_guard<std::mutex> lock(serializers.mutex);
 +        auto it = serializers.map.find(svcId);
 +        if (it == serializers.map.end()) {
 +            serializers.map.emplace(std::piecewise_construct,
 +                    std::forward_as_tuple(svcId),
 +                    std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc)));
 +        }
 +    }
 +}
 +
 +
 +void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_properties_t *props) {
 +    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
 +
 +    //remove serializer
 +    // 1) First find entry and
 +    // 2) loop and destroy all topic sender using the serializer and
 +    // 3) loop and destroy all topic receivers using the serializer
 +    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
 +
 +    std::lock_guard<std::mutex> lock(serializers.mutex);
 +
 +    auto kvsm = serializers.map.find(svcId);
 +    if (kvsm != serializers.map.end()) {
 +        auto &entry = kvsm->second;
 +        {
 +            std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
 +            for(auto it = topicSenders.map.begin(); it != topicSenders.map.end(); /*nothing*/) {
 +                auto &sender = it->second;
 +                if (entry.svcId == sender.getSerializerSvcId()) {
 +                    it = topicSenders.map.erase(it);
 +                } else {
 +                    ++it;
 +                }
 +            }
 +        }
 +
 +        {
 +            std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
 +            for (auto iter = topicReceivers.map.begin(); iter != topicReceivers.map.end();) {
 +                auto *receiver = iter->second;
 +                if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
 +                    auto key = iter->first;
 +                    topicReceivers.map.erase(iter++);
 +                    delete receiver;
 +                } else {
 +                    ++iter;
 +                }
 +            }
 +        }
 +
 +    }
 +}
 +
- celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter,
++celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties,
 +                                                  double *outScore, long *outSerializerSvcId) {
 +    L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher");
 +    celix_status_t  status = CELIX_SUCCESS;
 +    double score = pubsub_utils_matchPublisher(ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
-             qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId);
++            qosSampleScore, qosControlScore, defaultScore, outTopicProperties, outSerializerSvcId);
 +    *outScore = score;
 +
 +    return status;
 +}
 +
- celix_status_t pubsub_nanomsg_admin::matchSubscriber(long svcProviderBndId,
-                                                    const celix_properties_t *svcProperties, double *outScore,
-                                                    long *outSerializerSvcId) {
++celix_status_t pubsub_nanomsg_admin::matchSubscriber(
++        long svcProviderBndId,
++        const celix_properties_t *svcProperties,
++        celix_properties_t **outTopicProperties,
++        double *outScore,
++        long *outSerializerSvcId) {
 +    L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber");
 +    celix_status_t  status = CELIX_SUCCESS;
 +    double score = pubsub_utils_matchSubscriber(ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
-             qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId);
++            qosSampleScore, qosControlScore, defaultScore, outTopicProperties, outSerializerSvcId);
 +    if (outScore != nullptr) {
 +        *outScore = score;
 +    }
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t *endpoint, bool *outMatch) {
 +    L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint");
 +    celix_status_t  status = CELIX_SUCCESS;
 +    bool match = pubsub_utils_matchEndpoint(ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, nullptr);
 +    if (outMatch != nullptr) {
 +        *outMatch = match;
 +    }
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic,
++                                                    const celix_properties_t */*topicProperties*/,
 +                                                    long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
 +    celix_status_t status = CELIX_SUCCESS;
 +
 +    //1) Create TopicSender
 +    //2) Store TopicSender
 +    //3) Connect existing endpoints
 +    //4) set outPublisherEndpoint
 +
 +    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 +    std::lock_guard<std::mutex> serializerLock(serializers.mutex);
 +    std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
 +    auto sender = topicSenders.map.find(key);
 +    if (sender == topicSenders.map.end()) {
 +        //psa_nanomsg_serializer_entry *serEntry = nullptr;
 +        auto kv = serializers.map.find(serializerSvcId);
 +        if (kv != serializers.map.end()) {
 +            auto &serEntry = kv->second;
 +            auto e = topicSenders.map.emplace(std::piecewise_construct,
 +                    std::forward_as_tuple(key),
 +                    std::forward_as_tuple(ctx, scope, topic, serializerSvcId, serEntry.svc, ipAddress,
 +                                          basePort, maxPort));
 +            celix_properties_t *newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE,
 +                    PUBSUB_NANOMSG_ADMIN_TYPE, serEntry.serType, nullptr);
 +            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, e.first->second.getUrl().c_str());
 +            //if available also set container name
 +            const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
 +            if (cn != nullptr) {
 +                celix_properties_set(newEndpoint, "container_name", cn);
 +            }
 +            if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) {
 +                *outPublisherEndpoint = newEndpoint;
 +            }
 +        } else {
 +            L.ERROR("[PSA NANOMSG] Error creating a TopicSender");
 +        }
 +    } else {
 +        L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic ", scope,"/", topic);
 +    }
 +    free(key);
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, const char *topic) {
 +    celix_status_t  status = CELIX_SUCCESS;
 +
 +    //1) Find and remove TopicSender from map
 +    //2) destroy topic sender
 +
 +    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 +    std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
 +    if (topicSenders.map.erase(key) == 0) {
 +        L.ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic ", scope, "/", topic, " Does not exists");
 +    }
 +    free(key);
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope, const std::string &topic,
++                                                      const celix_properties_t */*topicProperties*/,
 +                                                      long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
 +
 +    celix_properties_t *newEndpoint = nullptr;
 +
 +    auto key = pubsubEndpoint_createScopeTopicKey(scope.c_str(), topic.c_str());
 +    pubsub::nanomsg::topic_receiver * receiver = nullptr;
 +    {
 +        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
 +        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
 +         auto trkv = topicReceivers.map.find(key);
 +         if (trkv != topicReceivers.map.end()) {
 +             receiver = trkv->second;
 +         }
 +        if (receiver == nullptr) {
 +            auto kvs = serializers.map.find(serializerSvcId);
 +            if (kvs != serializers.map.end()) {
 +                auto serEntry = kvs->second;
 +                receiver = new pubsub::nanomsg::topic_receiver(ctx, scope, topic, serializerSvcId, serEntry.svc);
 +            } else {
 +                L.ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender ", scope, "/", topic);
 +            }
 +            if (receiver != nullptr) {
 +                const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
 +                const char *serType = kvs->second.serType;
 +                newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
 +                                                    serType, nullptr);
 +                //if available also set container name
 +                const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
 +                if (cn != nullptr) {
 +                    celix_properties_set(newEndpoint, "container_name", cn);
 +                }
 +                topicReceivers.map[key] = receiver;
 +            } else {
 +                L.ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
 +            }
 +        } else {
 +            L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic ", scope, "/", topic);
 +        }
 +    }
 +    if (receiver != nullptr && newEndpoint != nullptr) {
 +        std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
 +        for (auto entry : discoveredEndpoints.map) {
 +            auto *endpoint = entry.second;
 +            const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 +            if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
 +                connectEndpointToReceiver(receiver, endpoint);
 +            }
 +        }
 +    }
 +
 +    if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) {
 +        *outSubscriberEndpoint = newEndpoint;
 +    }
 +    free(key);
 +    celix_status_t  status = CELIX_SUCCESS;
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) {
 +    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 +    std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
 +    auto entry = topicReceivers.map.find(key);
 +    free(key);
 +    if (entry != topicReceivers.map.end()) {
 +        auto receiverKey = entry->first;
 +        pubsub::nanomsg::topic_receiver *receiver = entry->second;
 +        topicReceivers.map.erase(receiverKey);
 +
 +        delete receiver;
 +    }
 +
 +    celix_status_t  status = CELIX_SUCCESS;
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
 +                                                                    const celix_properties_t *endpoint) {
 +    //note can be called with discoveredEndpoint.mutex lock
 +    celix_status_t status = CELIX_SUCCESS;
 +
 +    auto scope = receiver->scope();
 +    auto topic = receiver->topic();
 +
 +    std::string eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
 +    std::string eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
 +    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
 +
 +    if (url == nullptr) {
 +//        L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type);
 +        status = CELIX_BUNDLE_EXCEPTION;
 +    } else {
 +        if ((eScope == scope) && (eTopic == topic)) {
 +            receiver->connectTo(url);
 +        }
 +    }
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpoint) {
 +    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 +
 +    if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
 +        std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
 +        for (auto &entry: topicReceivers.map) {
 +            pubsub::nanomsg::topic_receiver *receiver = entry.second;
 +            connectEndpointToReceiver(receiver, endpoint);
 +        }
 +    }
 +
 +    std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
 +    celix_properties_t *cpy = celix_properties_copy(endpoint);
 +    //TODO : check if properties are never deleted before map.
 +    const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr);
 +    discoveredEndpoints.map[uuid] = cpy;
 +
 +    celix_status_t  status = CELIX_SUCCESS;
 +    return status;
 +}
 +
 +
 +celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
 +                                                                            const celix_properties_t *endpoint) {
 +    //note can be called with discoveredEndpoint.mutex lock
 +    celix_status_t status = CELIX_SUCCESS;
 +
 +    auto scope = receiver->scope();
 +    auto topic = receiver->topic();
 +
 +    auto eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
 +    auto eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
 +    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
 +
 +    if (url == nullptr) {
 +        L.WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
 +        status = CELIX_BUNDLE_EXCEPTION;
 +    } else {
 +        if ((eScope == scope) && (eTopic == topic)) {
 +            receiver->disconnectFrom(url);
 +        }
 +    }
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *endpoint) {
 +    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 +
 +    if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
 +        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
 +        for (auto &entry : topicReceivers.map) {
 +            pubsub::nanomsg::topic_receiver *receiver = entry.second;
 +            disconnectEndpointFromReceiver(receiver, endpoint);
 +        }
 +    }
 +    {
 +        std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
 +        const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
 +        discoveredEndpoints.map.erase(uuid);
 +    }
 +    return CELIX_SUCCESS;;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribute__((unused)), FILE *out,
 +                                                  FILE *errStream __attribute__((unused))) {
 +    celix_status_t  status = CELIX_SUCCESS;
 +    fprintf(out, "\n");
 +    fprintf(out, "Topic Senders:\n");
 +    {
 +        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
 +        std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
 +        for (auto &senderEntry: topicSenders.map) {
 +            auto &sender = senderEntry.second;
 +            long serSvcId = sender.getSerializerSvcId();
 +            auto kvs = serializers.map.find(serSvcId);
 +            const char* serType = ( kvs == serializers.map.end() ? "!Error" :  kvs->second.serType);
 +            const auto scope = sender.getScope();
 +            const auto topic = sender.getTopic();
 +            const auto url = sender.getUrl();
 +            fprintf(out, "|- Topic Sender %s/%s\n", scope.c_str(), topic.c_str());
 +            fprintf(out, "   |- serializer type = %s\n", serType);
 +            fprintf(out, "   |- url             = %s\n", url.c_str());
 +        }
 +    }
 +
 +    {
 +        fprintf(out, "\n");
 +        fprintf(out, "\nTopic Receivers:\n");
 +        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
 +        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
 +        for (auto &entry : topicReceivers.map) {
 +            pubsub::nanomsg::topic_receiver *receiver = entry.second;
 +            long serSvcId = receiver->serializerSvcId();
 +            auto kv =  serializers.map.find(serSvcId);
 +            const char *serType = (kv == serializers.map.end() ? "!Error!" : kv->second.serType);
 +            auto scope = receiver->scope();
 +            auto topic = receiver->topic();
 +
 +            std::vector<std::string> connected{};
 +            std::vector<std::string> unconnected{};
 +            receiver->listConnections(connected, unconnected);
 +
 +            fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str());
 +            fprintf(out, "   |- serializer type = %s\n", serType);
 +            for (auto &url : connected) {
 +                fprintf(out, "   |- connected url   = %s\n", url.c_str());
 +            }
 +            for (auto &url : unconnected) {
 +                fprintf(out, "   |- unconnected url = %s\n", url.c_str());
 +            }
 +        }
 +    }
 +    fprintf(out, "\n");
 +
 +    return status;
 +}
 +
 +#ifndef ANDROID
 +static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip) {
 +    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 +
 +    struct ifaddrs *ifaddr, *ifa;
 +    char host[NI_MAXHOST];
 +
 +    if (getifaddrs(&ifaddr) != -1)
 +    {
 +        for (ifa = ifaddr; ifa != nullptr && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
 +        {
 +            if (ifa->ifa_addr == nullptr)
 +                continue;
 +
 +            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
 +                if (interface == nullptr) {
 +                    *ip = strdup(host);
 +                    status = CELIX_SUCCESS;
 +                }
 +                else if (strcmp(ifa->ifa_name, interface) == 0) {
 +                    *ip = strdup(host);
 +                    status = CELIX_SUCCESS;
 +                }
 +            }
 +        }
 +
 +        freeifaddrs(ifaddr);
 +    }
 +
 +    return status;
 +}
 +#endif
diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 768accc,0000000..dccf280
mode 100644,000000..100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@@ -1,136 -1,0 +1,153 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#ifndef CELIX_PUBSUB_ZMQ_ADMIN_H
 +#define CELIX_PUBSUB_ZMQ_ADMIN_H
 +
 +#include <mutex>
 +#include <map>
 +#include <pubsub_admin.h>
 +#include "celix_api.h"
 +#include "pubsub_nanomsg_topic_receiver.h"
 +#include <pubsub_serializer.h>
 +#include "LogHelper.h"
 +#include "command.h"
 +#include "pubsub_nanomsg_topic_sender.h"
 +#include "pubsub_nanomsg_topic_receiver.h"
 +
 +#define PUBSUB_NANOMSG_ADMIN_TYPE       "zmq"
 +#define PUBSUB_NANOMSG_URL_KEY          "zmq.url"
 +
 +#define PUBSUB_NANOMSG_VERBOSE_KEY      "PSA_ZMQ_VERBOSE"
 +#define PUBSUB_NANOMSG_VERBOSE_DEFAULT  true
 +
 +#define PUBSUB_NANOMSG_PSA_IP_KEY       "PSA_IP"
 +#define PUBSUB_NANOMSG_PSA_ITF_KEY		"PSA_INTERFACE"
 +
 +#define PUBSUB_NANOMSG_DEFAULT_IP       "127.0.0.1"
 +
 +template <typename key, typename value>
 +struct ProtectedMap {
 +    std::mutex mutex{};
 +    std::map<key, value> map{};
 +};
 +
 +class pubsub_nanomsg_admin {
 +public:
 +    pubsub_nanomsg_admin(celix_bundle_context_t *ctx);
 +    pubsub_nanomsg_admin(const pubsub_nanomsg_admin&) = delete;
 +    pubsub_nanomsg_admin& operator=(const pubsub_nanomsg_admin&) = delete;
 +    ~pubsub_nanomsg_admin();
 +    void start();
 +    void stop();
 +
 +private:
 +    void addSerializerSvc(void *svc, const celix_properties_t *props);
 +    void removeSerializerSvc(void */*svc*/, const celix_properties_t *props);
-     celix_status_t matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter,
-                                                       double *score, long *serializerSvcId);
-     celix_status_t matchSubscriber(long svcProviderBndId,
-                                                        const celix_properties_t *svcProperties, double *score,
-                                                        long *serializerSvcId);
++    celix_status_t matchPublisher(
++            long svcRequesterBndId,
++            const celix_filter_t *svcFilter,
++            celix_properties_t **outTopicProperties,
++            double *outScore,
++            long *outsSerializerSvcId);
++    celix_status_t matchSubscriber(
++            long svcProviderBndId,
++            const celix_properties_t *svcProperties,
++            celix_properties_t **outTopicProperties,
++            double *outScope,
++            long *outSerializerSvcId);
 +    celix_status_t matchEndpoint(const celix_properties_t *endpoint, bool *match);
 +
-     celix_status_t setupTopicSender(const char *scope, const char *topic,
-                                                         long serializerSvcId, celix_properties_t **publisherEndpoint);
++    celix_status_t setupTopicSender(
++            const char *scope,
++            const char *topic,
++            const celix_properties_t *topicProperties,
++            long serializerSvcId,
++            celix_properties_t **outPublisherEndpoint);
++
 +    celix_status_t teardownTopicSender(const char *scope, const char *topic);
 +
-     celix_status_t setupTopicReceiver(const std::string &scope, const std::string &topic,
-                                                           long serializerSvcId, celix_properties_t **subscriberEndpoint);
++    celix_status_t setupTopicReceiver(
++            const std::string &scope,
++            const std::string &topic,
++            const celix_properties_t *topicProperties,
++            long serializerSvcId,
++            celix_properties_t **outSubscriberEndpoint);
++
 +    celix_status_t teardownTopicReceiver(const char *scope, const char *topic);
 +
 +    celix_status_t addEndpoint(const celix_properties_t *endpoint);
 +    celix_status_t removeEndpoint(const celix_properties_t *endpoint);
 +
 +    celix_status_t executeCommand(char *commandLine __attribute__((unused)), FILE *out,
 +                                                        FILE *errStream __attribute__((unused)));
 +
 +    celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
 +                                                                   const celix_properties_t *endpoint);
 +
 +    celix_status_t disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
 +                                                                        const celix_properties_t *endpoint);
 +    celix_bundle_context_t *ctx;
 +    celix::pubsub::nanomsg::LogHelper L;
 +    pubsub_admin_service_t adminService{};
 +    long adminSvcId = -1L;
 +    long cmdSvcId = -1L;
 +    command_service_t cmdSvc{};
 +    long serializersTrackerId = -1L;
 +
 +    const char *fwUUID{};
 +
 +    char* ipAddress{};
 +
 +    unsigned int basePort{};
 +    unsigned int maxPort{};
 +
 +    double qosSampleScore{};
 +    double qosControlScore{};
 +    double defaultScore{};
 +
 +    bool verbose{};
 +
 +    class psa_nanomsg_serializer_entry {
 +    public:
 +        psa_nanomsg_serializer_entry(const char*_serType, long _svcId, pubsub_serializer_service_t *_svc) :
 +            serType{_serType}, svcId{_svcId}, svc{_svc} {
 +
 +        }
 +
 +        const char *serType;
 +        long svcId;
 +        pubsub_serializer_service_t *svc;
 +    };
 +    ProtectedMap<long, psa_nanomsg_serializer_entry> serializers{};
 +    ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender> topicSenders{};
 +    ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
 +    ProtectedMap<const std::string, celix_properties_t *> discoveredEndpoints{};
 +};
 +
 +#ifdef __cplusplus
 +extern "C" {
 +#endif
 +
 +#ifdef __cplusplus
 +}
 +#endif
 +
 +
 +#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index d35de89,0000000..b27eb76
mode 100644,000000..100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@@ -1,319 -1,0 +1,319 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <iostream>
 +#include <mutex>
 +#include <memory.h>
 +#include <vector>
 +#include <string>
 +#include <sstream>
 +
 +#include <stdlib.h>
 +#include <assert.h>
 +
 +#include <sys/epoll.h>
 +#include <arpa/inet.h>
 +
 +#include <nanomsg/nn.h>
 +#include <nanomsg/bus.h>
 +
 +#include <pubsub_serializer.h>
 +#include <pubsub/subscriber.h>
 +#include <pubsub_constants.h>
 +#include <pubsub_endpoint.h>
 +#include <LogHelper.h>
 +
 +#include "pubsub_nanomsg_topic_receiver.h"
 +#include "pubsub_psa_nanomsg_constants.h"
 +#include "pubsub_nanomsg_common.h"
 +#include "pubsub_topology_manager.h"
 +
 +//TODO see if block and wakeup (reset) also works
 +#define PSA_NANOMSG_RECV_TIMEOUT 100 //100 msec timeout
 +
 +/*
 +#define L_DEBUG(...) \
 +    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
 +#define L_INFO(...) \
 +    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
 +#define L_WARN(...) \
 +    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
 +#define L_ERROR(...) \
 +    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 +*/
 +//#define L_DEBUG printf
 +//#define L_INFO printf
 +//#define L_WARN printf
 +//#define L_ERROR printf
 +
 +
 +pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
 +        const std::string &_scope,
 +        const std::string &_topic,
 +        long _serializerSvcId,
 +        pubsub_serializer_service_t *_serializer) : L{_ctx, "NANOMSG_topic_receiver"}, m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
 +    ctx = _ctx;
 +    serializer = _serializer;
 +
 +    m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
 +    if (m_nanoMsgSocket < 0) {
 +        L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for scope/topic: ", m_scope.c_str(), "/", m_topic.c_str());
 +        std::bad_alloc{};
 +    } else {
 +        int timeout = PSA_NANOMSG_RECV_TIMEOUT;
 +        if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
 +                          sizeof (timeout)) < 0) {
 +            L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for ",m_scope, "/",m_topic, ", set sockopt RECV_TIMEO failed");
 +            std::bad_alloc{};
 +        }
 +
 +        auto subscriberFilter = celix::pubsub::nanomsg::setScopeAndTopicFilter(m_scope, m_topic);
 +
 +        auto opts = createOptions();
 +
 +        subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
 +        recvThread.running = true;
 +        free ((void*)opts.filter.filter);
 +        recvThread.thread = std::thread([this]() {this->recvThread_exec();});
 +    }
 +}
 +
 +celix_service_tracking_options_t pubsub::nanomsg::topic_receiver::createOptions() {
 +    std::stringstream filter_str;
 +
 +    filter_str << "(" << PUBSUB_SUBSCRIBER_TOPIC << "=" <<  m_topic << ")";
 +    celix_service_tracking_options_t opts{};
 +    opts.filter.ignoreServiceLanguage = true;
 +    opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
 +    opts.filter.filter = strdup(filter_str.str().c_str()); // TODO : memory leak ??
 +    opts.callbackHandle = this;
 +    opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) {
 +            static_cast<pubsub::nanomsg::topic_receiver*>(handle)->addSubscriber(svc, props, svcOwner);
 +        };
 +    opts.removeWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) {
 +            static_cast<pubsub::nanomsg::topic_receiver*>(handle)->removeSubscriber(svc, props, svcOwner);
 +        };
 +    return opts;
 +}
 +
 +pubsub::nanomsg::topic_receiver::~topic_receiver() {
 +
 +        {
 +            std::lock_guard<std::mutex> _lock(recvThread.mutex);
 +            recvThread.running = false;
 +        }
 +        recvThread.thread.join();
 +
 +        celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
 +
 +        {
 +            std::lock_guard<std::mutex> _lock(subscribers.mutex);
 +            for(auto elem : subscribers.map) {
 +                serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes);
 +            }
 +            subscribers.map.clear();
 +        }
 +        nn_close(m_nanoMsgSocket);
 +
 +}
 +
 +std::string pubsub::nanomsg::topic_receiver::scope() const {
 +    return m_scope;
 +}
 +
 +std::string pubsub::nanomsg::topic_receiver::topic() const {
 +    return m_topic;
 +}
 +
 +long pubsub::nanomsg::topic_receiver::serializerSvcId() const {
 +    return m_serializerSvcId;
 +}
 +
 +void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls,
 +                                                 std::vector<std::string> &unconnectedUrls) {
 +    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
 +    for (auto entry : requestedConnections.map) {
 +        if (entry.second.isConnected()) {
 +            connectedUrls.emplace_back(entry.second.getUrl());
 +        } else {
 +            unconnectedUrls.emplace_back(entry.second.getUrl());
 +        }
 +    }
 +}
 +
 +
 +void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
 +    L.DBG("[PSA_NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " connecting to nanomsg url ", url);
 +
 +    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
 +    auto entry  = requestedConnections.map.find(url);
 +    if (entry == requestedConnections.map.end()) {
 +        requestedConnections.map.emplace(
 +                std::piecewise_construct,
 +                std::forward_as_tuple(std::string(url)),
 +                std::forward_as_tuple(url, -1));
 +        entry = requestedConnections.map.find(url);
 +    }
 +    if (!entry->second.isConnected()) {
 +        int connection_id = nn_connect(m_nanoMsgSocket, url);
 +        if (connection_id >= 0) {
 +            entry->second.setConnected(true);
 +            entry->second.setId(connection_id);
 +        } else {
 +            L.WARN("[PSA_NANOMSG] Error connecting to NANOMSG url ", url, " (",strerror(errno), ")");
 +        }
 +    }
 +}
 +
 +void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
 +    L.DBG("[PSA NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " disconnect from nanomsg url ", url);
 +
 +    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
 +    auto entry = requestedConnections.map.find(url);
 +    if (entry != requestedConnections.map.end()) {
 +        if (entry->second.isConnected()) {
 +            if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) {
 +                entry->second.setConnected(false);
 +            } else {
 +                L.WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url ", url, ", id: ", entry->second.getId(), " (",strerror(errno),")");
 +            }
 +        }
 +        requestedConnections.map.erase(url);
 +        std::cerr << "REMOVING connection " << url << std::endl;
 +    } else {
 +        std::cerr << "Disconnecting from unknown URL " << url << std::endl;
 +    }
 +}
 +
 +void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props,
 +                                                      const celix_bundle_t *bnd) {
 +    long bndId = celix_bundle_getId(bnd);
 +    std::string subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
 +    if (subScope != m_scope) {
 +        //not the same scope. ignore
 +        return;
 +    }
 +
 +    std::lock_guard<std::mutex> _lock(subscribers.mutex);
 +    auto entry = subscribers.map.find(bndId);
 +    if (entry != subscribers.map.end()) {
 +        entry->second.usageCount += 1;
 +    } else {
 +        //new create entry
 +        subscribers.map.emplace(std::piecewise_construct,
 +                std::forward_as_tuple(bndId),
 +                std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1));
 +        entry = subscribers.map.find(bndId);
 +
 +        int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes);
 +        if (rc != 0) {
 +            L.ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver ", m_scope.c_str(), "/", m_topic.c_str());
 +            subscribers.map.erase(bndId);
 +        }
 +    }
 +}
 +
 +void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
 +                                                         const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
 +    long bndId = celix_bundle_getId(bnd);
 +
 +    std::lock_guard<std::mutex> _lock(subscribers.mutex);
 +    auto entry = subscribers.map.find(bndId);
 +    if (entry != subscribers.map.end()) {
 +        entry->second.usageCount -= 1;
 +        if (entry->second.usageCount <= 0) {
 +            //remove entry
 +            int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
 +            if (rc != 0) {
 +                L.ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver ", m_scope.c_str(), "/",m_topic.c_str(),"\n");
 +            }
 +            subscribers.map.erase(bndId);
 +        }
 +    }
 +}
 +
 +void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const celix::pubsub::nanomsg::msg_header *hdr, const char* payload, size_t payloadSize) {
 +    pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
 +    pubsub_subscriber_t *svc = entry->svc;
 +
 +    if (msgSer!= NULL) {
 +        void *deserializedMsg = NULL;
 +        bool validVersion = celix::pubsub::nanomsg::checkVersion(msgSer->msgVersion, hdr);
 +        if (validVersion) {
 +            celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
 +            if(status == CELIX_SUCCESS) {
 +                bool release = false;
-                 svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
++                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
 +                if (release) {
 +                    msgSer->freeMsg(msgSer->handle, deserializedMsg);
 +                }
 +            } else {
 +                L.WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type ", msgSer->msgName , "for scope/topic ", scope(), "/", topic());
 +            }
 +        }
 +    } else {
 +        L.WARN("[PSA_NANOMSG_TR] Cannot find serializer for type id ", hdr->type);
 +    }
 +}
 +
 +void pubsub::nanomsg::topic_receiver::processMsg(const celix::pubsub::nanomsg::msg_header *hdr, const char *payload, size_t payloadSize) {
 +    std::lock_guard<std::mutex> _lock(subscribers.mutex);
 +    for (auto entry : subscribers.map) {
 +        processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize);
 +    }
 +}
 +
 +struct Message {
 +    celix::pubsub::nanomsg::msg_header header;
 +    char payload[];
 +};
 +
 +void pubsub::nanomsg::topic_receiver::recvThread_exec() {
 +    while (recvThread.running) {
 +        Message *msg = nullptr;
 +        nn_iovec iov[2];
 +        iov[0].iov_base = &msg;
 +        iov[0].iov_len = NN_MSG;
 +
 +        nn_msghdr msgHdr;
 +        memset(&msgHdr, 0, sizeof(msgHdr));
 +
 +        msgHdr.msg_iov = iov;
 +        msgHdr.msg_iovlen = 1;
 +
 +        msgHdr.msg_control = nullptr;
 +        msgHdr.msg_controllen = 0;
 +
 +        errno = 0;
 +        int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0);
 +        if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(celix::pubsub::nanomsg::msg_header)) {
 +            processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
 +            nn_freemsg(msg);
 +        } else if (recvBytes >= 0) {
 +            L.ERROR("[PSA_NANOMSG_TR] Error receiving nanomsg msg, size (", recvBytes,") smaller than header\n");
 +        } else if (errno == EAGAIN || errno == ETIMEDOUT) {
 +            // no data: go to next cycle
 +        } else if (errno == EINTR) {
 +            L.DBG("[PSA_NANOMSG_TR] nn_recvmsg interrupted");
 +        } else {
 +            L.WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno ", errno, " : ",  strerror(errno), "\n");
 +        }
 +    } // while
 +
 +}
diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index 154bd11,0000000..4190729
mode 100644,000000..100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@@ -1,267 -1,0 +1,266 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <memory.h>
 +#include <iostream>
 +#include <sstream>
 +#include <stdlib.h>
 +#include <utils.h>
 +#include <arpa/inet.h>
 +#include <zconf.h>
 +#include <LogHelper.h>
 +#include <nanomsg/nn.h>
 +#include <nanomsg/bus.h>
 +
 +
 +#include <pubsub_constants.h>
 +#include <pubsub_common.h>
 +#include "pubsub_nanomsg_topic_sender.h"
 +#include "pubsub_psa_nanomsg_constants.h"
 +#include "pubsub_nanomsg_common.h"
 +
 +#define FIRST_SEND_DELAY_IN_SECONDS                 2
 +#define NANOMSG_BIND_MAX_RETRY                      10
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max);
 +static void delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper);
 +
 +pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx,
 +                                                         const char *_scope,
 +                                                         const char *_topic,
 +                                                         long _serializerSvcId,
 +                                                         pubsub_serializer_service_t *_ser,
 +                                                         const char *_bindIp,
 +                                                         unsigned int _basePort,
 +                                                         unsigned int _maxPort) :
 +        ctx{_ctx},
 +        L{ctx, "PSA_ZMQ_TS"},
 +        serializerSvcId {_serializerSvcId},
 +        serializer{_ser},
 +        scope{_scope},
 +        topic{_topic}{
 +
 +    scopeAndTopicFilter = celix::pubsub::nanomsg::setScopeAndTopicFilter(_scope, _topic);
 +
 +    //setting up nanomsg socket for nanomsg TopicSender
 +    int nnSock = nn_socket(AF_SP, NN_BUS);
 +    if (nnSock == -1) {
 +        perror("Error for nanomsg_socket");
 +    }
 +
 +    int rv = -1, retry=0;
 +    while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
 +        /* Randomized part due to same bundle publishing on different topics */
 +        unsigned int port = rand_range(_basePort,_maxPort);
 +        std::stringstream _url;
 +        _url << "tcp://" << _bindIp << ":" << port;
 +
 +        std::stringstream bindUrl;
 +        bindUrl << "tcp://0.0.0.0:" << port;
 +
 +        rv = nn_bind (nnSock, bindUrl.str().c_str());
 +        if (rv == -1) {
 +            perror("Error for nn_bind");
 +        } else {
 +            this->url = _url.str();
 +            nanomsg.socket = nnSock;
 +        }
 +        retry++;
 +    }
 +
 +    if (!url.empty()) {
 +
 +        //register publisher services using a service factory
 +        publisher.factory.handle = this;
 +        publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
 +            return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService(
 +                    requestingBundle,
 +                    svcProperties);
 +        };
 +        publisher.factory.ungetService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
 +            return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService(
 +                    requestingBundle,
 +                    svcProperties);
 +        };
 +
 +        celix_properties_t *props = celix_properties_create();
 +        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic.c_str());
 +        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope.c_str());
 +
 +        celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
 +        opts.factory = &publisher.factory;
 +        opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
 +        opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
 +        opts.properties = props;
 +
 +        publisher.svcId = celix_bundleContext_registerServiceWithOptions(_ctx, &opts);
 +    }
 +
 +}
 +
 +pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() {
 +    celix_bundleContext_unregisterService(ctx, publisher.svcId);
 +
 +    nn_close(nanomsg.socket);
 +    std::lock_guard<std::mutex> lock(boundedServices.mutex);
 +    for  (auto &it: boundedServices.map) {
 +            serializer->destroySerializerMap(serializer->handle, it.second.msgTypes);
 +    }
 +    boundedServices.map.clear();
 +
 +}
 +
 +long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const {
 +    return serializerSvcId;
 +}
 +
 +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const {
 +    return scope;
 +}
 +
 +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const {
 +    return topic;
 +}
 +
 +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const  {
 +    return url;
 +}
 +
 +
 +void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle,
 +                                             const celix_properties_t *svcProperties __attribute__((unused))) {
 +    long bndId = celix_bundle_getId(requestingBundle);
 +    void *service{nullptr};
 +    std::lock_guard<std::mutex> lock(boundedServices.mutex);
 +    auto existingEntry = boundedServices.map.find(bndId);
 +    if (existingEntry != boundedServices.map.end()) {
 +        existingEntry->second.getCount += 1;
 +        service = &existingEntry->second.service;
 +    } else {
 +        auto entry = boundedServices.map.emplace(std::piecewise_construct,
 +                                    std::forward_as_tuple(bndId),
 +                                    std::forward_as_tuple(scope, topic, bndId, nanomsg.socket, ctx));
 +        int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes);
 +
 +        if (rc == 0) {
 +            entry.first->second.service.handle = &entry.first->second;
 +            entry.first->second.service.localMsgTypeIdForMsgType = celix::pubsub::nanomsg::localMsgTypeIdForMsgType;
 +            entry.first->second.service.send = [](void *handle, unsigned int msgTypeId, const void *msg) {
 +                return static_cast<pubsub::nanomsg::bounded_service_entry*>(handle)->topicPublicationSend(msgTypeId, msg);
 +            };
-             entry.first->second.service.sendMultipart = nullptr; //not supported TODO remove
 +            service = &entry.first->second.service;
 +        } else {
 +            boundedServices.map.erase(bndId);
 +            L.ERROR("Error creating serializer map for NanoMsg TopicSender. Scope: ", scope, ", Topic: ", topic);
 +        }
 +    }
 +
 +    return service;
 +}
 +
 +void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle,
 +                                                                         const celix_properties_t */*svcProperties*/) {
 +    long bndId = celix_bundle_getId(requestingBundle);
 +
 +    std::lock_guard<std::mutex> lock(boundedServices.mutex);
 +    auto entry = boundedServices.map.find(bndId);
 +    if (entry != boundedServices.map.end()) {
 +        entry->second.getCount -= 1;
 +        if (entry->second.getCount == 0) {
 +            int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
 +            if (rc != 0) {
 +                L.ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
 +            }
 +            boundedServices.map.erase(bndId);
 +        }
 +    }
 +}
 +
 +int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int msgTypeId, const void *inMsg) {
 +    int status;
 +    auto msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, (void*)(uintptr_t)msgTypeId));
 +
 +    if (msgSer != nullptr) {
 +        delay_first_send_for_late_joiners(L);
 +
 +        int major = 0, minor = 0;
 +
 +        celix::pubsub::nanomsg::msg_header msg_hdr{};
 +        msg_hdr.type = msgTypeId;
 +
 +        if (msgSer->msgVersion != nullptr) {
 +            version_getMajor(msgSer->msgVersion, &major);
 +            version_getMinor(msgSer->msgVersion, &minor);
 +            msg_hdr.major = (unsigned char) major;
 +            msg_hdr.minor = (unsigned char) minor;
 +        }
 +
 +        void *serializedOutput = nullptr;
 +        size_t serializedOutputLen = 0;
 +        status = msgSer->serialize(msgSer, inMsg, &serializedOutput, &serializedOutputLen);
 +        if (status == CELIX_SUCCESS) {
 +            nn_iovec data[2];
 +
 +            nn_msghdr msg{};
 +            msg.msg_iov = data;
 +            msg.msg_iovlen = 2;
 +            msg.msg_iov[0].iov_base = static_cast<void*>(&msg_hdr);
 +            msg.msg_iov[0].iov_len = sizeof(msg_hdr);
 +            msg.msg_iov[1].iov_base = serializedOutput;
 +            msg.msg_iov[1].iov_len = serializedOutputLen;
 +            msg.msg_control = nullptr;
 +            msg.msg_controllen = 0;
 +            errno = 0;
 +            int rc = nn_sendmsg(nanoMsgSocket, &msg, 0 );
 +            free(serializedOutput);
 +            if (rc < 0) {
 +                L.WARN("[PSA_ZMQ_TS] Error sending zmsg, rc: ", rc, ", error: ",  strerror(errno));
 +            } else {
 +                L.INFO("[PSA_ZMQ_TS] Send message with size ",  rc, "\n");
 +                L.INFO("[PSA_ZMQ_TS] Send message ID ", msg_hdr.type,
 +                        " major: ", (int)msg_hdr.major,
 +                        " minor: ",  (int)msg_hdr.minor,"\n");
 +            }
 +        } else {
 +            L.WARN("[PSA_ZMQ_TS] Error serialize message of type ", msgSer->msgName,
 +                    " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n");
 +        }
 +    } else {
 +        status = CELIX_SERVICE_EXCEPTION;
 +        L.WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id ", msgTypeId,
 +                " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n");
 +    }
 +    return status;
 +}
 +
 +static void delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper) {
 +
 +    static bool firstSend = true;
 +
 +    if(firstSend){
 +        logHelper.INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
 +        sleep(FIRST_SEND_DELAY_IN_SECONDS);
 +        firstSend = false;
 +    }
 +}
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max){
 +    double scaled = ((double)random())/((double)RAND_MAX);
 +    return (unsigned int)((max-min+1)*scaled + min);
 +}
diff --cc bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 2d36579,d7add8b..b7fc8bd
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@@ -576,8 -592,6 +592,7 @@@ celix_status_t pubsub_udpmcAdmin_execut
      celixThreadMutex_unlock(&psa->serializers.mutex);
      fprintf(out, "\n");
  
-     //TODO topic receivers/senders connection count
 +
      return status;
  }
  
@@@ -681,4 -695,4 +696,4 @@@ static celix_status_t udpmc_getIpAddres
  
      return status;
  }
--#endif
++#endif
diff --cc bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index cada021,469fa9c..c71341b
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@@ -22,19 -22,9 +22,8 @@@
  
  #include "celix_api.h"
  #include "log_helper.h"
- 
- #define PUBSUB_UDPMC_ADMIN_TYPE                     "udp_mc"
- #define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY			"udpmc.socket_address"
- #define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY            "udpmc.socket_port"
- 
- #define PUBSUB_UDPMC_IP_KEY 	                    "PSA_IP"
- #define PUBSUB_UDPMC_ITF_KEY	                    "PSA_INTERFACE"
- #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY        "PSA_MC_PREFIX"
- #define PUBSUB_UDPMC_VERBOSE_KEY                    "PSA_UDPMC_VERBOSE"
- 
- #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT    "224.100"
- #define PUBSUB_UDPMC_VERBOSE_DEFAULT                true
+ #include "pubsub_psa_udpmc_constants.h"
  
 -
  typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
  
  pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
diff --cc bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 64d7a23,1b7c1db..b5ca4fd
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@@ -339,3 -345,7 +345,7 @@@ static unsigned int rand_range(unsigne
  long pubsub_udpmcTopicSender_serializerSvcId(pubsub_udpmc_topic_sender_t *sender) {
      return sender->serializerSvcId;
  }
+ 
+ bool pubsub_udpmcTopicSender_isStatic(pubsub_udpmc_topic_sender_t *sender) {
+     return sender->staticallyConfigured;
 -}
++}
diff --cc bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index a13cb26,8c9e163..ea7653e
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@@ -370,6 -373,22 +373,22 @@@ celix_status_t pubsub_zmqAdmin_setupTop
              newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
                                                  serType, NULL);
              celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
+ 
+             //if configured use a static discover url
+             const char *staticDiscUrl = celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_DISCOVER_URL, NULL);
+             if (staticDiscUrl != NULL) {
+                 celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl);
+             }
+             celix_properties_setBool(newEndpoint, PUBSUB_ZMQ_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL);
+ 
+             //if url starts with ipc:// constrain discovery to host visibility, else use system visibility
+             const char *u = celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, "");
+             if (strncmp("ipc://", u, strlen("ipc://")) == 0) {
 -                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_HOST_VISIBLITY);
++                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_HOST_VISIBILITY);
+             } else {
 -                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBLITY);
++                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
+             }
+ 
              //if available also set container name
              const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
              if (cn != NULL) {
diff --cc bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index e4983b1,9d707f3..396ae6f
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@@ -38,26 -36,40 +38,43 @@@ extern "C" 
   * present a allocated scope string.
   * The caller is owner of the topic and scope output string.
   */
- celix_status_t pubsub_getPubSubInfoFromFilter(const char *filterstr, char **topic, char **scope);
+ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topic, char **scope);
  
- char *pubsub_getKeysBundleDir(bundle_context_pt ctx);
+ char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
  
- double
- pubsub_utils_matchPublisher(celix_bundle_context_t *ctx, long bundleId, const char *filter, const char *adminType,
-                             double sampleScore, double controlScore, double defaultScore, long *outSerializerSvcId);
+ double pubsub_utils_matchPublisher(
+         celix_bundle_context_t *ctx,
+         long bundleId,
+         const char *filter,
+         const char *adminType,
+         double sampleScore,
+         double controlScore,
+         double defaultScore,
+         celix_properties_t **outTopicProperties,
+         long *outSerializerSvcId);
  
- double pubsub_utils_matchSubscriber(celix_bundle_context_t *ctx, long svcProviderBundleId,
-                                     const celix_properties_t *svcProperties, const char *adminType, double sampleScore,
-                                     double controlScore, double defaultScore, long *outSerializerSvcId);
+ double pubsub_utils_matchSubscriber(
+         celix_bundle_context_t *ctx,
+         long svcProviderBundleId,
+         const celix_properties_t *svcProperties,
+         const char *adminType,
+         double sampleScore,
+         double controlScore,
+         double defaultScore,
+         celix_properties_t **outTopicProperties,
+         long *outSerializerSvcId);
  
- bool pubsub_utils_matchEndpoint(celix_bundle_context_t *ctx, const celix_properties_t *endpoint, const char *adminType,
-                                 long *outSerializerSvcId);
+ bool pubsub_utils_matchEndpoint(
+         celix_bundle_context_t *ctx,
+         const celix_properties_t *endpoint,
+         const char *adminType,
+         long *outSerializerSvcId);
  
  
- celix_properties_t *pubsub_utils_getTopicProperties(const celix_bundle_t *bundle, const char *topic, bool isPublisher);
+ celix_properties_t* pubsub_utils_getTopicProperties(const celix_bundle_t *bundle, const char *topic, bool isPublisher);
  
 +#ifdef __cplusplus
 +}
 +#endif
  
  #endif /* PUBSUB_UTILS_H_ */