You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2023/12/17 16:14:43 UTC
(celix) 02/02: Remove psa nanomsg from experimental
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/509-remove-pubsub
in repository https://gitbox.apache.org/repos/asf/celix.git
commit efad168a5d2a501cf72e3734b0242fb95bdf406c
Author: Pepijn Noltes <pn...@apache.org>
AuthorDate: Sun Dec 17 17:14:33 2023 +0100
Remove psa nanomsg from experimental
---
.github/workflows/ubuntu.yml | 1 -
documents/building/README.md | 4 +-
misc/experimental/bundles/CMakeLists.txt | 1 -
.../bundles/pubsub_admin_nanomsg/CMakeLists.txt | 46 --
.../bundles/pubsub_admin_nanomsg/src/LogHelper.h | 103 ----
.../src/psa_nanomsg_activator.cc | 87 ---
.../src/pubsub_nanomsg_admin.cc | 625 ---------------------
.../src/pubsub_nanomsg_admin.h | 154 -----
.../src/pubsub_nanomsg_common.cc | 56 --
.../src/pubsub_nanomsg_common.h | 56 --
.../src/pubsub_nanomsg_topic_receiver.cc | 319 -----------
.../src/pubsub_nanomsg_topic_receiver.h | 127 -----
.../src/pubsub_nanomsg_topic_sender.cc | 265 ---------
.../src/pubsub_nanomsg_topic_sender.h | 114 ----
.../src/pubsub_psa_nanomsg_constants.h | 39 --
15 files changed, 1 insertion(+), 1996 deletions(-)
diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml
index e1bd8ef0..df0d9f40 100644
--- a/.github/workflows/ubuntu.yml
+++ b/.github/workflows/ubuntu.yml
@@ -119,7 +119,6 @@ jobs:
cmake \
libffi-dev \
libxml2-dev \
- libczmq-dev \
libcpputest-dev \
rapidjson-dev \
libavahi-compat-libdnssd-dev \
diff --git a/documents/building/README.md b/documents/building/README.md
index 420150cb..c6435100 100644
--- a/documents/building/README.md
+++ b/documents/building/README.md
@@ -137,7 +137,6 @@ The following packages (libraries + headers) should be installed on your system:
* libffi (for libdfi)
* libxml2 (for remote services and bonjour shell)
* rapidjson (for C++ remote service discovery)
- * libczmq (for PubSubAdmin ZMQ)
For Ubuntu 22.04, use the following commands:
@@ -155,7 +154,6 @@ sudo apt-get install -yq --no-install-recommends \
libffi-dev \
libzip-dev \
libxml2-dev \
- libczmq-dev \
libcpputest-dev \
rapidjson-dev
```
@@ -163,7 +161,7 @@ sudo apt-get install -yq --no-install-recommends \
For OSX systems with brew installed, use the following commands:
```bash
brew update && \
-brew install lcov libffi libzip czmq rapidjson libxml2 cmake jansson && \
+brew install lcov libffi libzip rapidjson libxml2 cmake jansson && \
brew link --force libffi
```
diff --git a/misc/experimental/bundles/CMakeLists.txt b/misc/experimental/bundles/CMakeLists.txt
index b96038d7..fdd442fb 100644
--- a/misc/experimental/bundles/CMakeLists.txt
+++ b/misc/experimental/bundles/CMakeLists.txt
@@ -19,5 +19,4 @@ if (NOT APPLE)
#Note note sure if these bundles build on OSX
add_subdirectory(config_admin)
add_subdirectory(event_admin)
- add_subdirectory(pubsub_admin_nanomsg)
endif ()
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/CMakeLists.txt b/misc/experimental/bundles/pubsub_admin_nanomsg/CMakeLists.txt
deleted file mode 100644
index 27e80830..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/CMakeLists.txt
+++ /dev/null
@@ -1,46 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if (BUILD_PUBSUB_PSA_NANOMSG)
-
- find_package(NanoMsg REQUIRED)
-
- add_celix_bundle(celix_pubsub_admin_nanomsg
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_nanomsg"
- VERSION "1.0.0"
- GROUP "Celix/PubSub"
- SOURCES
- src/psa_nanomsg_activator.cc
- src/pubsub_nanomsg_admin.cc
- src/pubsub_nanomsg_topic_sender.cc
- src/pubsub_nanomsg_topic_receiver.cc
- src/pubsub_nanomsg_common.cc
- )
-
- target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE
- Celix::pubsub_spi
- Celix::framework Celix::dfi Celix::log_helper
- NANOMSG::lib
- )
- target_include_directories(celix_pubsub_admin_nanomsg PRIVATE
- src
- )
-
- install_celix_bundle(celix_pubsub_admin_nanomsg EXPORT celix COMPONENT pubsub)
- target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE Celix::shell_api)
- add_library(Celix::pubsub_admin_nanomsg ALIAS celix_pubsub_admin_nanomsg)
-endif()
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/LogHelper.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/LogHelper.h
deleted file mode 100644
index d5d2f0b8..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/LogHelper.h
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-#include <sstream>
-#include "log_helper.h"
-#include <mutex>
-namespace celix {
- namespace pubsub {
- namespace nanomsg {
- /*
- * Not that the loghelper is created in the firs log-call. This is because when a log-helper is started
- * during registration of a service with a service-factory a dead-lock can occur
- * This prevents it.
- */
- class LogHelper {
- public:
- LogHelper(bundle_context_t *_ctx, const std::string& _componentName ) : ctx{_ctx}, helperCreated{true}, componentName{_componentName}{
- }
-
- LogHelper(const LogHelper& ) = delete;
- LogHelper& operator=(const LogHelper&) = delete;
-
-
- ~LogHelper() {
- if (helperCreated && _logHelper) {
- logHelper_stop(_logHelper);
- logHelper_destroy(&_logHelper);
- }
- std::cerr << "Destroyed loghelper for " << componentName << std::endl;
- }
- template<typename... Args>
- void ERROR(Args... args) {
- auto ss = LOG_STREAM(args...);
- log_string(OSGI_LOGSERVICE_ERROR, ss.str());
- }
-
- template<typename... Args>
- void WARN(Args... args) {
- auto ss = LOG_STREAM(args...);
- log_string(OSGI_LOGSERVICE_WARNING, ss.str());
- }
-
- template<typename... Args>
- void INFO(Args... args) {
- auto ss = LOG_STREAM(args...);
- log_string(OSGI_LOGSERVICE_INFO, ss.str());
- }
-
- template<typename... Args>
- void DBG(Args... args) {
- auto ss = LOG_STREAM(args...);
- log_string(OSGI_LOGSERVICE_DEBUG, ss.str());
- }
-
- private:
- bundle_context_t *ctx;
- bool helperCreated{false};
- log_helper_t *_logHelper{};
- std::string componentName{};
- template<typename T>
- std::stringstream LOG_STREAM(T first) const {
- std::stringstream ss;
- ss << first;
- return ss;
- }
-
- template<typename T, typename... Args>
- std::stringstream LOG_STREAM(T first, Args... args) const {
- std::stringstream ss;
- ss << "[" << componentName << "] " << first << LOG_STREAM(args...).str();
- return ss;
- }
-
- void log_string(log_level_t level, const std::string& msg) {
- if (_logHelper == nullptr) {
- helperCreated = true;
- logHelper_create(ctx, &_logHelper);
- logHelper_start(_logHelper);
- }
- logHelper_log(_logHelper, level, msg.c_str());
- }
- };
-
- }
- }
-}
\ No newline at end of file
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
deleted file mode 100644
index 20c23920..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <stdlib.h>
-#include <new>
-#include <iostream>
-#include "celix_api.h"
-#include "pubsub_serializer.h"
-#include "LogHelper.h"
-#include "pubsub_admin.h"
-#include "pubsub_nanomsg_admin.h"
-
-namespace celix { namespace pubsub { namespace nanomsg {
- class Activator {
- public:
- Activator(celix_bundle_context_t *ctx) :
- context{ctx},
- L{context, std::string("PSA_NANOMSG_ACTIVATOR")},
- admin(context)
- {
- }
- Activator(const Activator&) = delete;
- Activator& operator=(const Activator&) = delete;
-
- ~Activator() = default;
-
- celix_status_t start() {
- admin.start();
- return CELIX_SUCCESS;
- }
-
- celix_status_t stop() {
- admin.stop();
- return CELIX_SUCCESS;
- };
-
- private:
- celix_bundle_context_t *context{};
- celix::pubsub::nanomsg::LogHelper L;
- pubsub_nanomsg_admin admin;
-
- };
-}}}
-
-celix_status_t celix_bundleActivator_create(celix_bundle_context_t *ctx , void **userData) {
- celix_status_t status = CELIX_SUCCESS;
- auto data = new (std::nothrow) celix::pubsub::nanomsg::Activator{ctx};
- if (data != NULL) {
- *userData = data;
- } else {
- status = CELIX_ENOMEM;
- }
- return status;
-}
-
-celix_status_t celix_bundleActivator_start(void *userData, celix_bundle_context_t *) {
- auto act = static_cast<celix::pubsub::nanomsg::Activator*>(userData);
- return act->start();
-}
-
-celix_status_t celix_bundleActivator_stop(void *userData, celix_bundle_context_t *) {
- auto act = static_cast<celix::pubsub::nanomsg::Activator*>(userData);
- return act->stop();
-}
-
-
-celix_status_t celix_bundleActivator_destroy(void *userData, celix_bundle_context_t *) {
- auto act = static_cast<celix::pubsub::nanomsg::Activator*>(userData);
- delete act;
- return CELIX_SUCCESS;
-}
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
deleted file mode 100644
index 064f3e73..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <string>
-#include <vector>
-#include <functional>
-#include <memory.h>
-#include <iostream>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netdb.h>
-#include <ifaddrs.h>
-#include <pubsub_endpoint.h>
-#include <algorithm>
-
-#include "pubsub_utils.h"
-#include "pubsub_nanomsg_admin.h"
-#include "pubsub_psa_nanomsg_constants.h"
-
-#include "celix_compiler.h"
-
-
-static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
-
-pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx):
- ctx{_ctx},
- L{ctx, "pubsub_nanomsg_admin"} {
- verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
- fwUUID = celix_bundleContext_getProperty(ctx, CELIX_FRAMEWORK_UUID, nullptr);
-
- char *ip = nullptr;
- const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , nullptr);
- if (confIp != NULL) {
- if (strchr(confIp, '/') != NULL) {
- // IP with subnet prefix specified
- ip = ipUtils_findIpBySubnet(confIp);
- if (ip == NULL) {
- L_WARN("[PSA_NANOMSG] Could not find interface for requested subnet %s", confIp);
- }
- } else {
- // IP address specified
- ip = strndup(confIp, 1024);
- }
- }
-
- if (ip == nullptr) {
- //TODO try to get ip from subnet (CIDR)
- }
-
- if (ip == nullptr) {
- //try to get ip from itf
- const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, nullptr);
- nanoMsg_getIpAddress(interface, &ip);
- }
-
- if (ip == nullptr) {
- L.WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using default ip (", PUBSUB_NANOMSG_DEFAULT_IP, ")");
- ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024);
- }
-
- ipAddress = ip;
- if (verbose) {
- L.INFO("[PSA_NANOMSG] Using ", ip, " for service annunciation.");
- }
-
-
- long _basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT);
- long _maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT);
- basePort = (unsigned int)_basePort;
- maxPort = (unsigned int)_maxPort;
- if (verbose) {
- L.INFO("[PSA_NANOMSG] Using base till max port: ", _basePort, " till ", _maxPort);
- }
-
-
- defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
- qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
- qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
-}
-
-pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
- //note assuming al psa register services and service tracker are removed.
- {
-// std::lock_guard<std::mutex> lock(topicSenders.mutex);
-// for (auto &kv : topicSenders.map) {
-// auto &sender = kv.second;
-// delete (sender);
-// }
- }
-
- {
- std::lock_guard<std::mutex> lock(topicReceivers.mutex);
- for (auto &kv: topicReceivers.map) {
- delete kv.second;
- }
- }
-
- {
- std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
- for (auto &entry : discoveredEndpoints.map) {
- auto *ep = entry.second;
- celix_properties_destroy(ep);
- }
- }
-
- {
- std::lock_guard<std::mutex> lock(serializers.mutex);
- serializers.map.clear();
- }
-
- free(ipAddress);
-
-}
-
-void pubsub_nanomsg_admin::start() {
- adminService.handle = this;
- adminService.matchPublisher = [](void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *score, long *serializerSvcId) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->matchPublisher(svcRequesterBndId, svcFilter, outTopicProperties, score, serializerSvcId);
- };
- adminService.matchSubscriber = [](void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *score, long *serializerSvcId) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->matchSubscriber(svcProviderBndId, svcProperties, outTopicProperties, score, serializerSvcId);
- };
- adminService.matchDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint, bool *match) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->matchEndpoint(endpoint, match);
- };
- adminService.setupTopicSender = [](void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->setupTopicSender(scope, topic, topicProperties, serializerSvcId, publisherEndpoint);
- };
- adminService.teardownTopicSender = [](void *handle, const char *scope, const char *topic) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->teardownTopicSender(scope, topic);
- };
- adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->setupTopicReceiver(std::string(scope), std::string(topic), topicProperties, serializerSvcId, subscriberEndpoint);
- };
-
- adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->teardownTopicReceiver(scope, topic);
- };
- adminService.addDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->addEndpoint(endpoint);
- };
- adminService.removeDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->removeEndpoint(endpoint);
- };
-
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE);
-
- adminSvcId = celix_bundleContext_registerService(ctx, static_cast<void*>(&adminService), PUBSUB_ADMIN_SERVICE_NAME, props);
-
-
- celix_service_tracking_options_t opts{};
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = this;
- opts.addWithProperties = [](void *handle, void *svc, const celix_properties_t *props) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- me->addSerializerSvc(svc, props);
- };
- opts.removeWithProperties = [](void *handle, void *svc, const celix_properties_t *props) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- me->removeSerializerSvc(svc, props);
- };
- serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-
- //register shell command service
- cmdSvc.handle = this;
- cmdSvc.executeCommand = [](void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
- auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->executeCommand(commandLine, outStream, errorStream);
- };
-
- celix_properties_t* shellProps = celix_properties_create();
- celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
- celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
- celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the nanomsg PSA");
- cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps);
-
-}
-
-void pubsub_nanomsg_admin::stop() {
- celix_bundleContext_unregisterService(ctx, adminSvcId);
- celix_bundleContext_unregisterService(ctx, cmdSvcId);
- celix_bundleContext_stopTracker(ctx, serializersTrackerId);
-}
-
-void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t *props) {
- const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, nullptr);
- long svcId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L);
-
- if (serType == nullptr) {
- L.INFO("[PSA_NANOMSG] Ignoring serializer service without ", PUBSUB_SERIALIZER_TYPE_KEY, " property");
- return;
- }
-
- {
- std::lock_guard<std::mutex> lock(serializers.mutex);
- auto it = serializers.map.find(svcId);
- if (it == serializers.map.end()) {
- serializers.map.emplace(std::piecewise_construct,
- std::forward_as_tuple(svcId),
- std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc)));
- }
- }
-}
-
-
-void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_properties_t *props) {
- long svcId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L);
-
- //remove serializer
- // 1) First find entry and
- // 2) loop and destroy all topic sender using the serializer and
- // 3) loop and destroy all topic receivers using the serializer
- // Note that it is the responsibility of the topology manager to create new topic senders/receivers
-
- std::lock_guard<std::mutex> lock(serializers.mutex);
-
- auto kvsm = serializers.map.find(svcId);
- if (kvsm != serializers.map.end()) {
- auto &entry = kvsm->second;
- {
- std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
- for (auto it = topicSenders.map.begin(); it != topicSenders.map.end(); /*nothing*/) {
- auto &sender = it->second;
- if (entry.svcId == sender.getSerializerSvcId()) {
- it = topicSenders.map.erase(it);
- } else {
- ++it;
- }
- }
- }
-
- {
- std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
- for (auto iter = topicReceivers.map.begin(); iter != topicReceivers.map.end();) {
- auto *receiver = iter->second;
- if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
- auto key = iter->first;
- topicReceivers.map.erase(iter++);
- delete receiver;
- } else {
- ++iter;
- }
- }
- }
-
- }
-}
-
-celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties,
- double *outScore, long *outSerializerSvcId) {
- L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher");
- celix_status_t status = CELIX_SUCCESS;
- double score = pubsub_utils_matchPublisher(ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
- qosSampleScore, qosControlScore, defaultScore, outTopicProperties, outSerializerSvcId);
- *outScore = score;
-
- return status;
-}
-
-celix_status_t pubsub_nanomsg_admin::matchSubscriber(
- long svcProviderBndId,
- const celix_properties_t *svcProperties,
- celix_properties_t **outTopicProperties,
- double *outScore,
- long *outSerializerSvcId) {
- L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber");
- celix_status_t status = CELIX_SUCCESS;
- double score = pubsub_utils_matchSubscriber(ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
- qosSampleScore, qosControlScore, defaultScore, outTopicProperties, outSerializerSvcId);
- if (outScore != nullptr) {
- *outScore = score;
- }
- return status;
-}
-
-celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t *endpoint, bool *outMatch) {
- L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint");
- celix_status_t status = CELIX_SUCCESS;
- bool match = pubsub_utils_matchEndpoint(ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, nullptr);
- if (outMatch != nullptr) {
- *outMatch = match;
- }
- return status;
-}
-
-celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic,
- const celix_properties_t */*topicProperties*/,
- long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
- celix_status_t status = CELIX_SUCCESS;
-
- //1) Create TopicSender
- //2) Store TopicSender
- //3) Connect existing endpoints
- //4) set outPublisherEndpoint
-
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- std::lock_guard<std::mutex> serializerLock(serializers.mutex);
- std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
- auto sender = topicSenders.map.find(key);
- if (sender == topicSenders.map.end()) {
- //psa_nanomsg_serializer_entry *serEntry = nullptr;
- auto kv = serializers.map.find(serializerSvcId);
- if (kv != serializers.map.end()) {
- auto &serEntry = kv->second;
- auto e = topicSenders.map.emplace(std::piecewise_construct,
- std::forward_as_tuple(key),
- std::forward_as_tuple(ctx, scope, topic, serializerSvcId, serEntry.svc, ipAddress,
- basePort, maxPort));
- celix_properties_t *newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE,
- PUBSUB_NANOMSG_ADMIN_TYPE, serEntry.serType, nullptr);
- celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, e.first->second.getUrl().c_str());
- //if available also set container name
- const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
- if (cn != nullptr) {
- celix_properties_set(newEndpoint, "container_name", cn);
- }
- if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) {
- *outPublisherEndpoint = newEndpoint;
- }
- } else {
- L.ERROR("[PSA NANOMSG] Error creating a TopicSender");
- }
- } else {
- L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic ", scope,"/", topic);
- }
- free(key);
-
- return status;
-}
-
-celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, const char *topic) {
- celix_status_t status = CELIX_SUCCESS;
-
- //1) Find and remove TopicSender from map
- //2) destroy topic sender
-
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
- if (topicSenders.map.erase(key) == 0) {
- L.ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic ", scope, "/", topic, " Does not exists");
- }
- free(key);
-
- return status;
-}
-
-celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope, const std::string &topic,
- const celix_properties_t */*topicProperties*/,
- long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
-
- celix_properties_t *newEndpoint = nullptr;
-
- auto key = pubsubEndpoint_createScopeTopicKey(scope.c_str(), topic.c_str());
- pubsub::nanomsg::topic_receiver * receiver = nullptr;
- {
- std::lock_guard<std::mutex> serializerLock(serializers.mutex);
- std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- auto trkv = topicReceivers.map.find(key);
- if (trkv != topicReceivers.map.end()) {
- receiver = trkv->second;
- }
- if (receiver == nullptr) {
- auto kvs = serializers.map.find(serializerSvcId);
- if (kvs != serializers.map.end()) {
- auto serEntry = kvs->second;
- receiver = new pubsub::nanomsg::topic_receiver(ctx, scope, topic, serializerSvcId, serEntry.svc);
- } else {
- L.ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender ", scope, "/", topic);
- }
- if (receiver != nullptr) {
- const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
- const char *serType = kvs->second.serType;
- newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
- serType, nullptr);
- //if available also set container name
- const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
- if (cn != nullptr) {
- celix_properties_set(newEndpoint, "container_name", cn);
- }
- topicReceivers.map[key] = receiver;
- } else {
- L.ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
- }
- } else {
- L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic ", scope, "/", topic);
- }
- }
- if (receiver != nullptr && newEndpoint != nullptr) {
- std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
- for (auto entry : discoveredEndpoints.map) {
- auto *endpoint = entry.second;
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
- if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- connectEndpointToReceiver(receiver, endpoint);
- }
- }
- }
-
- if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) {
- *outSubscriberEndpoint = newEndpoint;
- }
- free(key);
- celix_status_t status = CELIX_SUCCESS;
- return status;
-}
-
-celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) {
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- auto entry = topicReceivers.map.find(key);
- free(key);
- if (entry != topicReceivers.map.end()) {
- auto receiverKey = entry->first;
- pubsub::nanomsg::topic_receiver *receiver = entry->second;
- topicReceivers.map.erase(receiverKey);
-
- delete receiver;
- }
-
- celix_status_t status = CELIX_SUCCESS;
- return status;
-}
-
-celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
- const celix_properties_t *endpoint) {
- //note can be called with discoveredEndpoint.mutex lock
- celix_status_t status = CELIX_SUCCESS;
-
- auto scope = receiver->scope();
- auto topic = receiver->topic();
-
- std::string eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
- std::string eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
- const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
-
- if (url == nullptr) {
-// L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type);
- status = CELIX_BUNDLE_EXCEPTION;
- } else {
- if ((eScope == scope) && (eTopic == topic)) {
- receiver->connectTo(url);
- }
- }
-
- return status;
-}
-
-celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpoint) {
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
-
- if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
- for (auto &entry: topicReceivers.map) {
- pubsub::nanomsg::topic_receiver *receiver = entry.second;
- connectEndpointToReceiver(receiver, endpoint);
- }
- }
-
- std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
- celix_properties_t *cpy = celix_properties_copy(endpoint);
- //TODO : check if properties are never deleted before map.
- const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr);
- discoveredEndpoints.map[uuid] = cpy;
-
- celix_status_t status = CELIX_SUCCESS;
- return status;
-}
-
-
-celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
- const celix_properties_t *endpoint) {
- //note can be called with discoveredEndpoint.mutex lock
- celix_status_t status = CELIX_SUCCESS;
-
- auto scope = receiver->scope();
- auto topic = receiver->topic();
-
- auto eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
- auto eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
- const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
-
- if (url == nullptr) {
- L.WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
- status = CELIX_BUNDLE_EXCEPTION;
- } else {
- if ((eScope == scope) && (eTopic == topic)) {
- receiver->disconnectFrom(url);
- }
- }
-
- return status;
-}
-
-celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *endpoint) {
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
-
- if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- for (auto &entry : topicReceivers.map) {
- pubsub::nanomsg::topic_receiver *receiver = entry.second;
- disconnectEndpointFromReceiver(receiver, endpoint);
- }
- }
- {
- std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
- const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
- discoveredEndpoints.map.erase(uuid);
- }
- return CELIX_SUCCESS;;
-}
-
-celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine CELIX_UNUSED, FILE *out,
- FILE *errStream CELIX_UNUSED) {
- celix_status_t status = CELIX_SUCCESS;
- fprintf(out, "\n");
- fprintf(out, "Topic Senders:\n");
- {
- std::lock_guard<std::mutex> serializerLock(serializers.mutex);
- std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
- for (auto &senderEntry: topicSenders.map) {
- auto &sender = senderEntry.second;
- long serSvcId = sender.getSerializerSvcId();
- auto kvs = serializers.map.find(serSvcId);
- const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType);
- const auto scope = sender.getScope();
- const auto topic = sender.getTopic();
- const auto url = sender.getUrl();
- fprintf(out, "|- Topic Sender %s/%s\n", scope.c_str(), topic.c_str());
- fprintf(out, " |- serializer type = %s\n", serType);
- fprintf(out, " |- url = %s\n", url.c_str());
- }
- }
-
- {
- fprintf(out, "\n");
- fprintf(out, "\nTopic Receivers:\n");
- std::lock_guard<std::mutex> serializerLock(serializers.mutex);
- std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- for (auto &entry : topicReceivers.map) {
- pubsub::nanomsg::topic_receiver *receiver = entry.second;
- long serSvcId = receiver->serializerSvcId();
- auto kv = serializers.map.find(serSvcId);
- const char *serType = (kv == serializers.map.end() ? "!Error!" : kv->second.serType);
- auto scope = receiver->scope();
- auto topic = receiver->topic();
-
- std::vector<std::string> connected{};
- std::vector<std::string> unconnected{};
- receiver->listConnections(connected, unconnected);
-
- fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str());
- fprintf(out, " |- serializer type = %s\n", serType);
- for (auto &url : connected) {
- fprintf(out, " |- connected url = %s\n", url.c_str());
- }
- for (auto &url : unconnected) {
- fprintf(out, " |- unconnected url = %s\n", url.c_str());
- }
- }
- }
- fprintf(out, "\n");
-
- return status;
-}
-
-#ifndef ANDROID
-static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip) {
- celix_status_t status = CELIX_BUNDLE_EXCEPTION;
-
- struct ifaddrs *ifaddr, *ifa;
- char host[NI_MAXHOST];
-
- if (getifaddrs(&ifaddr) != -1)
- {
- for (ifa = ifaddr; ifa != nullptr && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
- {
- if (ifa->ifa_addr == nullptr)
- continue;
-
- if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
- if (interface == nullptr) {
- *ip = strdup(host);
- status = CELIX_SUCCESS;
- }
- else if (strcmp(ifa->ifa_name, interface) == 0) {
- *ip = strdup(host);
- status = CELIX_SUCCESS;
- }
- }
- }
-
- freeifaddrs(ifaddr);
- }
-
- return status;
-}
-#endif
\ No newline at end of file
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
deleted file mode 100644
index 3785ce5e..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef CELIX_PUBSUB_NANOMSG_ADMIN_H
-#define CELIX_PUBSUB_NANOMSG_ADMIN_H
-
-#include <mutex>
-#include <map>
-#include <pubsub_admin.h>
-#include "celix_api.h"
-#include "celix_compiler.h"
-#include "pubsub_nanomsg_topic_receiver.h"
-#include <pubsub_serializer.h>
-#include "LogHelper.h"
-#include "command.h"
-#include "pubsub_nanomsg_topic_sender.h"
-#include "pubsub_nanomsg_topic_receiver.h"
-
-#define PUBSUB_NANOMSG_ADMIN_TYPE "nanomsg"
-#define PUBSUB_NANOMSG_URL_KEY "nanomsg.url"
-
-#define PUBSUB_NANOMSG_VERBOSE_KEY "PSA_NANOMSG_VERBOSE"
-#define PUBSUB_NANOMSG_VERBOSE_DEFAULT true
-
-#define PUBSUB_NANOMSG_PSA_IP_KEY "PSA_IP"
-#define PUBSUB_NANOMSG_PSA_ITF_KEY "PSA_INTERFACE"
-
-#define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1"
-
-template <typename key, typename value>
-struct ProtectedMap {
- std::mutex mutex{};
- std::map<key, value> map{};
-};
-
-class pubsub_nanomsg_admin {
-public:
- pubsub_nanomsg_admin(celix_bundle_context_t *ctx);
- pubsub_nanomsg_admin(const pubsub_nanomsg_admin&) = delete;
- pubsub_nanomsg_admin& operator=(const pubsub_nanomsg_admin&) = delete;
- ~pubsub_nanomsg_admin();
- void start();
- void stop();
-
-private:
- void addSerializerSvc(void *svc, const celix_properties_t *props);
- void removeSerializerSvc(void */*svc*/, const celix_properties_t *props);
- celix_status_t matchPublisher(
- long svcRequesterBndId,
- const celix_filter_t *svcFilter,
- celix_properties_t **outTopicProperties,
- double *outScore,
- long *outsSerializerSvcId);
- celix_status_t matchSubscriber(
- long svcProviderBndId,
- const celix_properties_t *svcProperties,
- celix_properties_t **outTopicProperties,
- double *outScope,
- long *outSerializerSvcId);
- celix_status_t matchEndpoint(const celix_properties_t *endpoint, bool *match);
-
- celix_status_t setupTopicSender(
- const char *scope,
- const char *topic,
- const celix_properties_t *topicProperties,
- long serializerSvcId,
- celix_properties_t **outPublisherEndpoint);
-
- celix_status_t teardownTopicSender(const char *scope, const char *topic);
-
- celix_status_t setupTopicReceiver(
- const std::string &scope,
- const std::string &topic,
- const celix_properties_t *topicProperties,
- long serializerSvcId,
- celix_properties_t **outSubscriberEndpoint);
-
- celix_status_t teardownTopicReceiver(const char *scope, const char *topic);
-
- celix_status_t addEndpoint(const celix_properties_t *endpoint);
- celix_status_t removeEndpoint(const celix_properties_t *endpoint);
-
- celix_status_t executeCommand(char *commandLine CELIX_UNUSED, FILE *out,
- FILE *errStream CELIX_UNUSED);
-
- celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
- const celix_properties_t *endpoint);
-
- celix_status_t disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
- const celix_properties_t *endpoint);
- celix_bundle_context_t *ctx;
- celix::pubsub::nanomsg::LogHelper L;
- pubsub_admin_service_t adminService{};
- long adminSvcId = -1L;
- long cmdSvcId = -1L;
- command_service_t cmdSvc{};
- long serializersTrackerId = -1L;
-
- const char *fwUUID{};
-
- char* ipAddress{};
-
- unsigned int basePort{};
- unsigned int maxPort{};
-
- double qosSampleScore{};
- double qosControlScore{};
- double defaultScore{};
-
- bool verbose{};
-
- class psa_nanomsg_serializer_entry {
- public:
- psa_nanomsg_serializer_entry(const char*_serType, long _svcId, pubsub_serializer_service_t *_svc) :
- serType{_serType}, svcId{_svcId}, svc{_svc} {
-
- }
-
- const char *serType;
- long svcId;
- pubsub_serializer_service_t *svc;
- };
- ProtectedMap<long, psa_nanomsg_serializer_entry> serializers{};
- ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender> topicSenders{};
- ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
- ProtectedMap<const std::string, celix_properties_t *> discoveredEndpoints{};
-};
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#ifdef __cplusplus
-}
-#endif
-
-
-#endif //CELIX_PUBSUB_NANOMSG_ADMIN_H
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
deleted file mode 100644
index ccafd436..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <memory.h>
-#include "pubsub_nanomsg_common.h"
-#include "celix_compiler.h"
-
-int celix::pubsub::nanomsg::localMsgTypeIdForMsgType(void *handle CELIX_UNUSED, const char *msgType,
- unsigned int *msgTypeId) {
- *msgTypeId = utils_stringHash(msgType);
- return 0;
-}
-
-bool celix::pubsub::nanomsg::checkVersion(version_pt msgVersion, const celix::pubsub::nanomsg::msg_header *hdr) {
- bool check=false;
- int major=0,minor=0;
-
- if (msgVersion!=NULL) {
- version_getMajor(msgVersion,&major);
- version_getMinor(msgVersion,&minor);
- if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
- check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
- }
- }
-
- return check;
-}
-
-std::string celix::pubsub::nanomsg::setScopeAndTopicFilter(const std::string &scope, const std::string &topic) {
- std::string result("");
- if (scope.size() >= 2) { //3 ??
- result += scope[0];
- result += scope[1];
- }
- if (topic.size() >= 2) { //3 ??
- result += topic[0];
- result += topic[1];
- }
- return result;
-}
\ No newline at end of file
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
deleted file mode 100644
index 465669bf..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef CELIX_PUBSUB_NANOMSG_COMMON_H
-#define CELIX_PUBSUB_NANOMSG_COMMON_H
-
-#include <string>
-#include <sstream>
-#include <utils.h>
-
-#include "version.h"
-#include "log_helper.h"
-
-/*
- * NOTE zmq is used by first sending three frames:
- * 1) A subscription filter.
- * This is a 5 char string of the first two chars of scope and topic combined and terminated with a '\0'.
- *
- * 2) The pubsub_zmq_msg_header_t is send containing the type id and major/minor version
- *
- * 3) The actual payload
- */
-
-namespace celix { namespace pubsub { namespace nanomsg {
- struct msg_header {
- //header
- unsigned int type;
- unsigned char major;
- unsigned char minor;
- };
- int localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
- std::string setScopeAndTopicFilter(const std::string &scope, const std::string &topic);
-
- bool checkVersion(version_pt msgVersion, const celix::pubsub::nanomsg::msg_header *hdr);
-
-}}}
-
-
-
-#endif //CELIX_PUBSUB_NANOMSG_COMMON_H
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
deleted file mode 100644
index 443f2cee..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <iostream>
-#include <mutex>
-#include <memory.h>
-#include <vector>
-#include <string>
-#include <sstream>
-
-#include <stdlib.h>
-#include <assert.h>
-
-#include <sys/epoll.h>
-#include <arpa/inet.h>
-
-#include <nanomsg/nn.h>
-#include <nanomsg/bus.h>
-
-#include <pubsub_serializer.h>
-#include <pubsub/subscriber.h>
-#include <pubsub_constants.h>
-#include <pubsub_endpoint.h>
-#include <LogHelper.h>
-
-#include "pubsub_nanomsg_topic_receiver.h"
-#include "pubsub_psa_nanomsg_constants.h"
-#include "pubsub_nanomsg_common.h"
-#include "pubsub_topology_manager.h"
-
-//TODO see if block and wakeup (reset) also works
-#define PSA_NANOMSG_RECV_TIMEOUT 100 //100 msec timeout
-
-/*
-#define L_DEBUG(...) \
- logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
-#define L_INFO(...) \
- logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
-#define L_WARN(...) \
- logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
-#define L_ERROR(...) \
- logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-*/
-//#define L_DEBUG printf
-//#define L_INFO printf
-//#define L_WARN printf
-//#define L_ERROR printf
-
-
-pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
- const std::string &_scope,
- const std::string &_topic,
- long _serializerSvcId,
- pubsub_serializer_service_t *_serializer) : L{_ctx, "NANOMSG_topic_receiver"}, m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
- ctx = _ctx;
- serializer = _serializer;
-
- m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
- if (m_nanoMsgSocket < 0) {
- L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for scope/topic: ", m_scope.c_str(), "/", m_topic.c_str());
- std::bad_alloc{};
- } else {
- int timeout = PSA_NANOMSG_RECV_TIMEOUT;
- if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
- sizeof (timeout)) < 0) {
- L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for ",m_scope, "/",m_topic, ", set sockopt RECV_TIMEO failed");
- std::bad_alloc{};
- }
-
- auto subscriberFilter = celix::pubsub::nanomsg::setScopeAndTopicFilter(m_scope, m_topic);
-
- auto opts = createOptions();
-
- subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- recvThread.running = true;
- free ((void*)opts.filter.filter);
- recvThread.thread = std::thread([this]() {this->recvThread_exec();});
- }
-}
-
-celix_service_tracking_options_t pubsub::nanomsg::topic_receiver::createOptions() {
- std::stringstream filter_str;
-
- filter_str << "(" << PUBSUB_SUBSCRIBER_TOPIC << "=" << m_topic << ")";
- celix_service_tracking_options_t opts{};
- opts.filter.ignoreServiceLanguage = true;
- opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
- opts.filter.filter = strdup(filter_str.str().c_str()); // TODO : memory leak ??
- opts.callbackHandle = this;
- opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) {
- static_cast<pubsub::nanomsg::topic_receiver*>(handle)->addSubscriber(svc, props, svcOwner);
- };
- opts.removeWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) {
- static_cast<pubsub::nanomsg::topic_receiver*>(handle)->removeSubscriber(svc, props, svcOwner);
- };
- return opts;
-}
-
-pubsub::nanomsg::topic_receiver::~topic_receiver() {
-
- {
- std::lock_guard<std::mutex> _lock(recvThread.mutex);
- recvThread.running = false;
- }
- recvThread.thread.join();
-
- celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
-
- {
- std::lock_guard<std::mutex> _lock(subscribers.mutex);
- for (auto elem : subscribers.map) {
- serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes);
- }
- subscribers.map.clear();
- }
- nn_close(m_nanoMsgSocket);
-
-}
-
-std::string pubsub::nanomsg::topic_receiver::scope() const {
- return m_scope;
-}
-
-std::string pubsub::nanomsg::topic_receiver::topic() const {
- return m_topic;
-}
-
-long pubsub::nanomsg::topic_receiver::serializerSvcId() const {
- return m_serializerSvcId;
-}
-
-void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls,
- std::vector<std::string> &unconnectedUrls) {
- std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
- for (auto entry : requestedConnections.map) {
- if (entry.second.isConnected()) {
- connectedUrls.emplace_back(entry.second.getUrl());
- } else {
- unconnectedUrls.emplace_back(entry.second.getUrl());
- }
- }
-}
-
-
-void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
- L.DBG("[PSA_NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " connecting to nanomsg url ", url);
-
- std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
- auto entry = requestedConnections.map.find(url);
- if (entry == requestedConnections.map.end()) {
- requestedConnections.map.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(std::string(url)),
- std::forward_as_tuple(url, -1));
- entry = requestedConnections.map.find(url);
- }
- if (!entry->second.isConnected()) {
- int connection_id = nn_connect(m_nanoMsgSocket, url);
- if (connection_id >= 0) {
- entry->second.setConnected(true);
- entry->second.setId(connection_id);
- } else {
- L.WARN("[PSA_NANOMSG] Error connecting to NANOMSG url ", url, " (",strerror(errno), ")");
- }
- }
-}
-
-void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
- L.DBG("[PSA NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " disconnect from nanomsg url ", url);
-
- std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
- auto entry = requestedConnections.map.find(url);
- if (entry != requestedConnections.map.end()) {
- if (entry->second.isConnected()) {
- if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) {
- entry->second.setConnected(false);
- } else {
- L.WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url ", url, ", id: ", entry->second.getId(), " (",strerror(errno),")");
- }
- }
- requestedConnections.map.erase(url);
- std::cerr << "REMOVING connection " << url << std::endl;
- } else {
- std::cerr << "Disconnecting from unknown URL " << url << std::endl;
- }
-}
-
-void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props,
- const celix_bundle_t *bnd) {
- long bndId = celix_bundle_getId(bnd);
- std::string subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE);
- if (subScope != m_scope) {
- //not the same scope. ignore
- return;
- }
-
- std::lock_guard<std::mutex> _lock(subscribers.mutex);
- auto entry = subscribers.map.find(bndId);
- if (entry != subscribers.map.end()) {
- entry->second.usageCount += 1;
- } else {
- //new create entry
- subscribers.map.emplace(std::piecewise_construct,
- std::forward_as_tuple(bndId),
- std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1));
- entry = subscribers.map.find(bndId);
-
- int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes);
- if (rc != 0) {
- L.ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver ", m_scope.c_str(), "/", m_topic.c_str());
- subscribers.map.erase(bndId);
- }
- }
-}
-
-void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
- const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
- long bndId = celix_bundle_getId(bnd);
-
- std::lock_guard<std::mutex> _lock(subscribers.mutex);
- auto entry = subscribers.map.find(bndId);
- if (entry != subscribers.map.end()) {
- entry->second.usageCount -= 1;
- if (entry->second.usageCount <= 0) {
- //remove entry
- int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
- if (rc != 0) {
- L.ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver ", m_scope.c_str(), "/",m_topic.c_str(),"\n");
- }
- subscribers.map.erase(bndId);
- }
- }
-}
-
-void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const celix::pubsub::nanomsg::msg_header *hdr, const char* payload, size_t payloadSize) {
- pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
- pubsub_subscriber_t *svc = entry->svc;
-
- if (msgSer!= NULL) {
- void *deserializedMsg = NULL;
- bool validVersion = celix::pubsub::nanomsg::checkVersion(msgSer->msgVersion, hdr);
- if (validVersion) {
- celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
- if (status == CELIX_SUCCESS) {
- bool release = false;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
- if (release) {
- msgSer->freeMsg(msgSer->handle, deserializedMsg);
- }
- } else {
- L.WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type ", msgSer->msgName , "for scope/topic ", scope(), "/", topic());
- }
- }
- } else {
- L.WARN("[PSA_NANOMSG_TR] Cannot find serializer for type id ", hdr->type);
- }
-}
-
-void pubsub::nanomsg::topic_receiver::processMsg(const celix::pubsub::nanomsg::msg_header *hdr, const char *payload, size_t payloadSize) {
- std::lock_guard<std::mutex> _lock(subscribers.mutex);
- for (auto entry : subscribers.map) {
- processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize);
- }
-}
-
-struct Message {
- celix::pubsub::nanomsg::msg_header header;
- char payload[];
-};
-
-void pubsub::nanomsg::topic_receiver::recvThread_exec() {
- while (recvThread.running) {
- Message *msg = nullptr;
- nn_iovec iov[2];
- iov[0].iov_base = &msg;
- iov[0].iov_len = NN_MSG;
-
- nn_msghdr msgHdr;
- memset(&msgHdr, 0, sizeof(msgHdr));
-
- msgHdr.msg_iov = iov;
- msgHdr.msg_iovlen = 1;
-
- msgHdr.msg_control = nullptr;
- msgHdr.msg_controllen = 0;
-
- errno = 0;
- int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0);
- if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(celix::pubsub::nanomsg::msg_header)) {
- processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
- nn_freemsg(msg);
- } else if (recvBytes >= 0) {
- L.ERROR("[PSA_NANOMSG_TR] Error receiving nanomsg msg, size (", recvBytes,") smaller than header\n");
- } else if (errno == EAGAIN || errno == ETIMEDOUT) {
- // no data: go to next cycle
- } else if (errno == EINTR) {
- L.DBG("[PSA_NANOMSG_TR] nn_recvmsg interrupted");
- } else {
- L.WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno ", errno, " : ", strerror(errno), "\n");
- }
- } // while
-
-}
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
deleted file mode 100644
index e93ea30f..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include <string>
-#include <vector>
-#include <thread>
-#include <mutex>
-#include <map>
-#include "pubsub_serializer.h"
-#include "LogHelper.h"
-#include "celix_bundle_context.h"
-#include "pubsub_nanomsg_common.h"
-#include "pubsub/subscriber.h"
-
-struct psa_nanomsg_subscriber_entry {
- psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) :
- svc{_svc}, usageCount{_usageCount} {
- }
- pubsub_subscriber_t *svc{};
- int usageCount;
- hash_map_t *msgTypes{nullptr}; //map from serializer svc
-};
-
-typedef struct psa_nanomsg_requested_connection_entry {
-public:
- psa_nanomsg_requested_connection_entry(std::string _url, int _id, bool _connected=false):
- url{_url}, id{_id}, connected{_connected} {
- }
- bool isConnected() const {
- return connected;
- }
-
- int getId() const {
- return id;
- }
-
- void setId(int _id) {
- id = _id;
- }
- void setConnected(bool c) {
- connected = c;
- }
-
- const std::string &getUrl() const {
- return url;
- }
-private:
- std::string url;
- int id;
- bool connected;
-} psa_nanomsg_requested_connection_entry_t;
-
-namespace pubsub {
- namespace nanomsg {
- class topic_receiver {
- public:
- topic_receiver(celix_bundle_context_t
- *ctx,
- const std::string &scope,
- const std::string &topic,
- long serializerSvcId, pubsub_serializer_service_t
- *serializer);
- topic_receiver(const topic_receiver &) = delete;
- topic_receiver & operator=(const topic_receiver &) = delete;
- ~topic_receiver();
-
- std::string scope() const;
- std::string topic() const;
- long serializerSvcId() const;
- void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
- void connectTo(const char *url);
- void disconnectFrom(const char *url);
- void recvThread_exec();
- void processMsg(const celix::pubsub::nanomsg::msg_header *hdr, const char *payload, size_t payloadSize);
- void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const celix::pubsub::nanomsg::msg_header *hdr, const char* payload, size_t payloadSize);
- void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
- void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd);
- celix_service_tracking_options_t createOptions();
-
- private:
- celix_bundle_context_t *ctx{nullptr};
- celix::pubsub::nanomsg::LogHelper L;
- long m_serializerSvcId{0};
- pubsub_serializer_service_t *serializer{nullptr};
- const std::string m_scope{};
- const std::string m_topic{};
-
- int m_nanoMsgSocket{0};
-
- struct {
- std::thread thread;
- std::mutex mutex;
- bool running;
- } recvThread{};
-
- struct {
- std::mutex mutex;
- std::map<std::string, psa_nanomsg_requested_connection_entry_t> map;
- } requestedConnections{};
-
- long subscriberTrackerId{0};
- struct {
- std::mutex mutex;
- std::map<long, psa_nanomsg_subscriber_entry> map;
- } subscribers{};
- };
- }
-}
-
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
deleted file mode 100644
index 452ea785..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <memory.h>
-#include <iostream>
-#include <sstream>
-#include <stdlib.h>
-#include <utils.h>
-#include <arpa/inet.h>
-#include <LogHelper.h>
-#include <nanomsg/nn.h>
-#include <nanomsg/bus.h>
-
-
-#include <pubsub_constants.h>
-#include "pubsub_nanomsg_topic_sender.h"
-#include "pubsub_psa_nanomsg_constants.h"
-#include "pubsub_nanomsg_common.h"
-#include "celix_compiler.h"
-
-#define FIRST_SEND_DELAY_IN_SECONDS 2
-#define NANOMSG_BIND_MAX_RETRY 10
-
-static unsigned int rand_range(unsigned int min, unsigned int max);
-static void delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper);
-
-pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx,
- const char *_scope,
- const char *_topic,
- long _serializerSvcId,
- pubsub_serializer_service_t *_ser,
- const char *_bindIp,
- unsigned int _basePort,
- unsigned int _maxPort) :
- ctx{_ctx},
- L{ctx, "PSA_NANOMSG_TS"},
- serializerSvcId {_serializerSvcId},
- serializer{_ser},
- scope{_scope},
- topic{_topic}{
-
- scopeAndTopicFilter = celix::pubsub::nanomsg::setScopeAndTopicFilter(_scope, _topic);
-
- //setting up nanomsg socket for nanomsg TopicSender
- int nnSock = nn_socket(AF_SP, NN_BUS);
- if (nnSock == -1) {
- perror("Error for nanomsg_socket");
- }
-
- int rv = -1, retry=0;
- while (rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
- /* Randomized part due to same bundle publishing on different topics */
- unsigned int port = rand_range(_basePort,_maxPort);
- std::stringstream _url;
- _url << "tcp://" << _bindIp << ":" << port;
-
- std::stringstream bindUrl;
- bindUrl << "tcp://0.0.0.0:" << port;
-
- rv = nn_bind (nnSock, bindUrl.str().c_str());
- if (rv == -1) {
- perror("Error for nn_bind");
- } else {
- this->url = _url.str();
- nanomsg.socket = nnSock;
- }
- retry++;
- }
-
- if (!url.empty()) {
-
- //register publisher services using a service factory
- publisher.factory.handle = this;
- publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
- return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService(
- requestingBundle,
- svcProperties);
- };
- publisher.factory.ungetService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
- return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService(
- requestingBundle,
- svcProperties);
- };
-
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic.c_str());
- celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope.c_str());
-
- celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
- opts.factory = &publisher.factory;
- opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
- opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
- opts.properties = props;
-
- publisher.svcId = celix_bundleContext_registerServiceWithOptions(_ctx, &opts);
- }
-
-}
-
-pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() {
- celix_bundleContext_unregisterService(ctx, publisher.svcId);
-
- nn_close(nanomsg.socket);
- std::lock_guard<std::mutex> lock(boundedServices.mutex);
- for (auto &it: boundedServices.map) {
- serializer->destroySerializerMap(serializer->handle, it.second.msgTypes);
- }
- boundedServices.map.clear();
-
-}
-
-long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const {
- return serializerSvcId;
-}
-
-const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const {
- return scope;
-}
-
-const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const {
- return topic;
-}
-
-const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const {
- return url;
-}
-
-
-void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle,
- const celix_properties_t *svcProperties CELIX_UNUSED) {
- long bndId = celix_bundle_getId(requestingBundle);
- void *service{nullptr};
- std::lock_guard<std::mutex> lock(boundedServices.mutex);
- auto existingEntry = boundedServices.map.find(bndId);
- if (existingEntry != boundedServices.map.end()) {
- existingEntry->second.getCount += 1;
- service = &existingEntry->second.service;
- } else {
- auto entry = boundedServices.map.emplace(std::piecewise_construct,
- std::forward_as_tuple(bndId),
- std::forward_as_tuple(scope, topic, bndId, nanomsg.socket, ctx));
- int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes);
-
- if (rc == 0) {
- entry.first->second.service.handle = &entry.first->second;
- entry.first->second.service.localMsgTypeIdForMsgType = celix::pubsub::nanomsg::localMsgTypeIdForMsgType;
- entry.first->second.service.send = [](void *handle, unsigned int msgTypeId, const void *msg) {
- return static_cast<pubsub::nanomsg::bounded_service_entry*>(handle)->topicPublicationSend(msgTypeId, msg);
- };
- service = &entry.first->second.service;
- } else {
- boundedServices.map.erase(bndId);
- L.ERROR("Error creating serializer map for NanoMsg TopicSender. Scope: ", scope, ", Topic: ", topic);
- }
- }
-
- return service;
-}
-
-void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle,
- const celix_properties_t */*svcProperties*/) {
- long bndId = celix_bundle_getId(requestingBundle);
-
- std::lock_guard<std::mutex> lock(boundedServices.mutex);
- auto entry = boundedServices.map.find(bndId);
- if (entry != boundedServices.map.end()) {
- entry->second.getCount -= 1;
- if (entry->second.getCount == 0) {
- int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
- if (rc != 0) {
- L.ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
- }
- boundedServices.map.erase(bndId);
- }
- }
-}
-
-int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int msgTypeId, const void *inMsg) {
- int status;
- auto msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, (void*)(uintptr_t)msgTypeId));
-
- if (msgSer != nullptr) {
- delay_first_send_for_late_joiners(L);
-
- int major = 0, minor = 0;
-
- celix::pubsub::nanomsg::msg_header msg_hdr{};
- msg_hdr.type = msgTypeId;
-
- if (msgSer->msgVersion != nullptr) {
- version_getMajor(msgSer->msgVersion, &major);
- version_getMinor(msgSer->msgVersion, &minor);
- msg_hdr.major = (unsigned char) major;
- msg_hdr.minor = (unsigned char) minor;
- }
-
- void *serializedOutput = nullptr;
- size_t serializedOutputLen = 0;
- status = msgSer->serialize(msgSer, inMsg, &serializedOutput, &serializedOutputLen);
- if (status == CELIX_SUCCESS) {
- nn_iovec data[2];
-
- nn_msghdr msg{};
- msg.msg_iov = data;
- msg.msg_iovlen = 2;
- msg.msg_iov[0].iov_base = static_cast<void*>(&msg_hdr);
- msg.msg_iov[0].iov_len = sizeof(msg_hdr);
- msg.msg_iov[1].iov_base = serializedOutput;
- msg.msg_iov[1].iov_len = serializedOutputLen;
- msg.msg_control = nullptr;
- msg.msg_controllen = 0;
- errno = 0;
- int rc = nn_sendmsg(nanoMsgSocket, &msg, 0 );
- free(serializedOutput);
- if (rc < 0) {
- L.WARN("[PSA_NANOMSG_TS] Error sending zmsg, rc: ", rc, ", error: ", strerror(errno));
- } else {
- L.INFO("[PSA_NANOMSG_TS] Send message with size ", rc, "\n");
- L.INFO("[PSA_NANOMSG_TS] Send message ID ", msg_hdr.type,
- " major: ", (int)msg_hdr.major,
- " minor: ", (int)msg_hdr.minor,"\n");
- }
- } else {
- L.WARN("[PSA_NANOMSG_TS] Error serialize message of type ", msgSer->msgName,
- " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n");
- }
- } else {
- status = CELIX_SERVICE_EXCEPTION;
- L.WARN("[PSA_NANOMSG_TS] Error cannot serialize message with msg type id ", msgTypeId,
- " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n");
- }
- return status;
-}
-
-static void delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper) {
-
- static bool firstSend = true;
-
- if (firstSend) {
- logHelper.INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
- sleep(FIRST_SEND_DELAY_IN_SECONDS);
- firstSend = false;
- }
-}
-
-static unsigned int rand_range(unsigned int min, unsigned int max) {
- double scaled = ((double)random())/((double)RAND_MAX);
- return (unsigned int)((max-min+1)*scaled + min);
-}
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
deleted file mode 100644
index 3a29ad4e..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef CELIX_PUBSUB_NANOMSG_TOPIC_SENDER_H
-#define CELIX_PUBSUB_NANOMSG_TOPIC_SENDER_H
-
-#include <mutex>
-#include <map>
-#include "celix_bundle_context.h"
-#include "celix_compiler.h"
-#include <log_helper.h>
-#include <pubsub_serializer.h>
-#include <pubsub/publisher.h>
-
-namespace pubsub {
- namespace nanomsg {
-
- class bounded_service_entry {
- public:
- bounded_service_entry(
- std::string &_scope,
- std::string &_topic,
- long _bndId,
- int _nanoMsgSocket,
- celix_bundle_context_t *_context) : scope{_scope}, topic{_topic}, bndId{_bndId}, nanoMsgSocket{_nanoMsgSocket}, L{_context, "nanomsg_bounded_service_entry"} {
-
- }
- bounded_service_entry(const bounded_service_entry&) = delete;
- bounded_service_entry &operator=(const bounded_service_entry&) = delete;
- int topicPublicationSend(unsigned int msgTypeId, const void *inMsg);
-
- pubsub_publisher_t service{};
- std::string scope;
- std::string topic;
- long bndId{};
- hash_map_t *msgTypes{};
- int getCount{1};
- int nanoMsgSocket{};
- celix::pubsub::nanomsg::LogHelper L;
- } ;
-
-
- class pubsub_nanomsg_topic_sender {
- public:
- pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx,
- const char *_scope,
- const char *_topic, long _serializerSvcId, pubsub_serializer_service_t *_ser,
- const char *_bindIp, unsigned int _basePort, unsigned int _maxPort);
-
- ~pubsub_nanomsg_topic_sender();
-
- pubsub_nanomsg_topic_sender(const pubsub_nanomsg_topic_sender &) = delete;
-
- const pubsub_nanomsg_topic_sender &operator=(const pubsub_nanomsg_topic_sender &) = delete;
-
- long getSerializerSvcId() const ;
- const std::string &getScope() const ;
- const std::string &getTopic() const ;
- const std::string &getUrl() const;
-
- void* getPublisherService(const celix_bundle_t *requestingBundle,
- const celix_properties_t *svcProperties CELIX_UNUSED);
- void ungetPublisherService(const celix_bundle_t *requestingBundle,
- const celix_properties_t *svcProperties CELIX_UNUSED);
- int topicPublicationSend(unsigned int msgTypeId, const void *inMsg);
- void delay_first_send_for_late_joiners() ;
-
- //private:
- celix_bundle_context_t *ctx;
- celix::pubsub::nanomsg::LogHelper L;
- long serializerSvcId;
- pubsub_serializer_service_t *serializer;
-
- std::string scope{};
- std::string topic{};
- std::string scopeAndTopicFilter{};
- std::string url{};
-
- struct {
- std::mutex mutex;
- int socket;
- } nanomsg{};
-
- struct {
- long svcId;
- celix_service_factory_t factory;
- } publisher{};
-
- struct {
- std::mutex mutex{};
- std::map<long, bounded_service_entry> map{};
- //hash_map_t *map{}; //key = bndId, value = psa_nanomsg_bounded_service_entry_t
- } boundedServices{};
- };
- }
-}
-
-#endif //CELIX_PUBSUB_NANOMSG_TOPIC_SENDER_H
diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h
deleted file mode 100644
index 1700ee42..00000000
--- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef PUBSUB_PSA_NANOMSG_CONSTANTS_H_
-#define PUBSUB_PSA_NANOMSG_CONSTANTS_H_
-
-
-#define PSA_NANOMSG_BASE_PORT "PSA_NANOMSG_BASE_PORT"
-#define PSA_NANOMSG_MAX_PORT "PSA_NANOMSG_MAX_PORT"
-
-#define PSA_NANOMSG_DEFAULT_BASE_PORT 5501
-#define PSA_NANOMSG_DEFAULT_MAX_PORT 6000
-
-#define PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE 30
-#define PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE 70
-#define PSA_NANOMSG_DEFAULT_SCORE 30
-
-#define PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY "PSA_NANOMSG_QOS_SAMPLE_SCORE"
-#define PSA_NANOMSG_QOS_CONTROL_SCORE_KEY "PSA_NANOMSG_QOS_CONTROL_SCORE"
-#define PSA_NANOMSG_DEFAULT_SCORE_KEY "PSA_NANOMSG_DEFAULT_SCORE"
-
-
-#endif /* PUBSUB_PSA_NANOMSG_CONSTANTS_H_ */