You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:03:04 UTC
[8/8] celix git commit: NanoMsg
NanoMsg
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7c141424
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7c141424
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7c141424
Branch: refs/heads/nanomsg
Commit: 7c141424d925afa98a83bc693649dc3500354965
Parents: cdefb0d
Author: Erjan Altena <er...@gmail.com>
Authored: Tue Nov 27 21:02:18 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Tue Nov 27 21:02:18 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_admin.cc | 42 ++--
.../src/pubsub_nanomsg_admin.h | 2 +-
.../src/pubsub_nanomsg_topic_sender.cc | 240 ++++++++-----------
.../src/pubsub_nanomsg_topic_sender.h | 66 ++++-
4 files changed, 178 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index cf516ee..42ed632 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -99,20 +99,20 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
std::lock_guard<std::mutex> lock(topicSenders.mutex);
for (auto kv : topicSenders.map) {
auto *sender = kv.second;
- pubsub_nanoMsgTopicSender_destroy(sender);
+ delete (sender);
}
}
{
std::lock_guard<std::mutex> lock(topicReceivers.mutex);
- for (auto kv: topicReceivers.map) {
+ for (auto &kv: topicReceivers.map) {
delete kv.second;
}
}
{
std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
- for (auto entry : discoveredEndpoints.map) {
+ for (auto &entry : discoveredEndpoints.map) {
auto *ep = entry.second;
celix_properties_destroy(ep);
}
@@ -252,10 +252,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
for (auto kv: topicSenders.map) {
auto *sender = kv.second;
- if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+ if (sender != nullptr && entry.svcId == sender->getSerializerSvcId()) {
char *key = kv.first;
topicSenders.map.erase(kv.first);
- pubsub_nanoMsgTopicSender_destroy(sender);
+ delete (sender);
free(key);
}
}
@@ -263,7 +263,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
{
std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
- for (auto kv : topicReceivers.map){
+ for (auto &kv : topicReceivers.map){
auto *receiver = kv.second;
if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
auto key = kv.first;
@@ -322,7 +322,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
celix_properties_t *newEndpoint = nullptr;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- pubsub_nanomsg_topic_sender_t *sender = nullptr;
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = nullptr;
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
sender = topicSenders.map.find(key)->second;
@@ -333,7 +333,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
serEntry = &kv->second;
}
if (serEntry != nullptr) {
- sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
+ sender = new pubsub::nanomsg::pubsub_nanomsg_topic_sender(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
basePort, maxPort);
}
if (sender != nullptr) {
@@ -341,7 +341,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType,
nullptr);
- celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
+ celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, sender->getUrl());
//if available also set container name
const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
if (cn != nullptr) {
@@ -378,10 +378,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
auto kv = topicSenders.map.find(key);
if (kv != topicSenders.map.end()) {
char *mapKey = kv->first;
- pubsub_nanomsg_topic_sender_t *sender = kv->second;
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kv->second;
free(mapKey);
//TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
- pubsub_nanoMsgTopicSender_destroy(sender);
+ delete (sender);
} else {
L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
}
@@ -495,7 +495,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
- for (auto entry: topicReceivers.map) {
+ for (auto &entry: topicReceivers.map) {
pubsub::nanomsg::topic_receiver *receiver = entry.second;
connectEndpointToReceiver(receiver, endpoint);
}
@@ -541,7 +541,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- for (auto entry : topicReceivers.map) {
+ for (auto &entry : topicReceivers.map) {
pubsub::nanomsg::topic_receiver *receiver = entry.second;
disconnectEndpointFromReceiver(receiver, endpoint);
}
@@ -564,13 +564,13 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
for (auto kvts: topicSenders.map) {
- pubsub_nanomsg_topic_sender_t *sender = kvts.second;
- long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kvts.second;
+ long serSvcId = sender->getSerializerSvcId();
auto kvs = serializers.map.find(serSvcId);
const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType);
- const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
- const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
- const char *url = pubsub_nanoMsgTopicSender_url(sender);
+ const char *scope = sender->getScope();
+ const char *topic = sender->getTopic();
+ const char *url = sender->getUrl();
fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
fprintf(out, " |- url = %s\n", url);
@@ -582,7 +582,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
fprintf(out, "\nTopic Receivers:\n");
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- for (auto entry : topicReceivers.map) {
+ for (auto &entry : topicReceivers.map) {
pubsub::nanomsg::topic_receiver *receiver = entry.second;
long serSvcId = receiver->serializerSvcId();
auto kv = serializers.map.find(serSvcId);
@@ -596,10 +596,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str());
fprintf(out, " |- serializer type = %s\n", serType);
- for (auto url : connected) {
+ for (auto &url : connected) {
fprintf(out, " |- connected url = %s\n", url.c_str());
}
- for (auto url : unconnected) {
+ for (auto &url : unconnected) {
fprintf(out, " |- unconnected url = %s\n", url.c_str());
}
}
http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 689ae20..7c2e9a0 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -119,7 +119,7 @@ private:
pubsub_serializer_service_t *svc;
} psa_nanomsg_serializer_entry_t;
ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
- ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
+ ProtectedMap<char*, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> topicSenders{};
ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
};
http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index d5ed28f..1c75e71 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -28,11 +28,9 @@
#include <nanomsg/bus.h>
-#include <pubsub_serializer.h>
#include <pubsub_constants.h>
#include <pubsub/publisher.h>
#include <pubsub_common.h>
-#include <log_helper.h>
#include "pubsub_nanomsg_topic_sender.h"
#include "pubsub_psa_nanomsg_constants.h"
#include "pubsub_nanomsg_common.h"
@@ -41,69 +39,47 @@
#define NANOMSG_BIND_MAX_RETRY 10
#define L_DEBUG(...) \
- logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+ logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
#define L_INFO(...) \
- logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+ logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
#define L_WARN(...) \
- logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+ logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
#define L_ERROR(...) \
- logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-
-struct pubsub_nanomsg_topic_sender {
- celix_bundle_context_t *ctx;
- log_helper_t *logHelper;
- long serializerSvcId;
- pubsub_serializer_service_t *serializer;
-
- char *scope;
- char *topic;
- char scopeAndTopicFilter[5];
- char *url;
-
- struct {
- celix_thread_mutex_t mutex;
- int socket;
- } nanomsg;
-
- struct {
- long svcId;
- celix_service_factory_t factory;
- } publisher;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = bndId, value = psa_nanomsg_bounded_service_entry_t
- } boundedServices;
-};
+ logHelper_log(logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
typedef struct psa_nanomsg_bounded_service_entry {
- pubsub_nanomsg_topic_sender_t *parent;
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent;
pubsub_publisher_t service;
long bndId;
hash_map_t *msgTypes;
int getCount;
} psa_nanomsg_bounded_service_entry_t;
-static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
- const celix_properties_t *svcProperties);
-static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
- const celix_properties_t *svcProperties);
+//static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+// const celix_properties_t *svcProperties);
+//static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+// const celix_properties_t *svcProperties);
static unsigned int rand_range(unsigned int min, unsigned int max);
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender);
+static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender);
static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg);
-pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
- const char *scope, const char *topic,
- long serializerSvcId, pubsub_serializer_service_t *ser,
- const char *bindIP, unsigned int basePort,
- unsigned int maxPort) {
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(calloc(1, sizeof(*sender)));
- sender->ctx = ctx;
- sender->logHelper = logHelper;
- sender->serializerSvcId = serializerSvcId;
- sender->serializer = ser;
- psa_nanomsg_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx,
+ log_helper_t *_logHelper,
+ const char *_scope,
+ const char *_topic,
+ long _serializerSvcId,
+ pubsub_serializer_service_t *_ser,
+ const char *_bindIp,
+ unsigned int _basePort,
+ unsigned int _maxPort) :
+ ctx{_ctx},
+ logHelper{_logHelper},
+ serializerSvcId {_serializerSvcId},
+ serializer{_ser}{
+
+ psa_nanomsg_setScopeAndTopicFilter(_scope, _topic, scopeAndTopicFilter);
//setting up nanomsg socket for nanomsg TopicSender
{
@@ -116,10 +92,10 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
int rv = -1, retry=0;
while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
/* Randomized part due to same bundle publishing on different topics */
- unsigned int port = rand_range(basePort,maxPort);
- size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
- char *url = static_cast<char*>(calloc(len, sizeof(char*)));
- snprintf(url, len, "tcp://%s:%u", bindIP, port);
+ unsigned int port = rand_range(_basePort,_maxPort);
+ size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", _bindIp, port) + 1;
+ char *_url = static_cast<char*>(calloc(len, sizeof(char*)));
+ snprintf(_url, len, "tcp://%s:%u", _bindIp, port);
len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
@@ -127,165 +103,155 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
rv = nn_bind (socket, bindUrl);
if (rv == -1) {
perror("Error for nn_bind");
- free(url);
+ free(_url);
} else {
- sender->url = url;
- sender->nanomsg.socket = socket;
+ this->url = _url;
+ nanomsg.socket = socket;
}
retry++;
free(bindUrl);
}
}
- if (sender->url != NULL) {
- sender->scope = strndup(scope, 1024 * 1024);
- sender->topic = strndup(topic, 1024 * 1024);
+ if (url != NULL) {
+ scope = strndup(_scope, 1024 * 1024);
+ topic = strndup(_topic, 1024 * 1024);
- celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
- celixThreadMutex_create(&sender->nanomsg.mutex, NULL);
- sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+ celixThreadMutex_create(&boundedServices.mutex, NULL);
+ celixThreadMutex_create(&nanomsg.mutex, NULL);
+ boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
}
//register publisher services using a service factory
- if (sender->url != NULL) {
- sender->publisher.factory.handle = sender;
- sender->publisher.factory.getService = psa_nanomsg_getPublisherService;
- sender->publisher.factory.ungetService = psa_nanomsg_ungetPublisherService;
+ if (url != NULL) {
+ publisher.factory.handle = this;
+ publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
+ return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService(
+ requestingBundle,
+ svcProperties);
+ };
+ publisher.factory.ungetService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
+ return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService(
+ requestingBundle,
+ svcProperties);
+ };
celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
- celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+ celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic);
+ celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope);
celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
- opts.factory = &sender->publisher.factory;
+ opts.factory = &publisher.factory;
opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
opts.properties = props;
- sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
- }
-
- if (sender->url == NULL) {
- free(sender);
- sender = NULL;
+ publisher.svcId = celix_bundleContext_registerServiceWithOptions(_ctx, &opts);
}
- return sender;
}
-void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender) {
- if (sender != NULL) {
- celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() {
+ celix_bundleContext_unregisterService(ctx, publisher.svcId);
- nn_close(sender->nanomsg.socket);
+ nn_close(nanomsg.socket);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
- if (entry != NULL) {
- sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
- free(entry);
- }
+ celixThreadMutex_lock(&boundedServices.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(boundedServices.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
+ if (entry != NULL) {
+ serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
+ free(entry);
}
- hashMap_destroy(sender->boundedServices.map, false, false);
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
-
- celixThreadMutex_destroy(&sender->boundedServices.mutex);
- celixThreadMutex_destroy(&sender->nanomsg.mutex);
-
- free(sender->scope);
- free(sender->topic);
- free(sender->url);
- free(sender);
}
-}
+ hashMap_destroy(boundedServices.map, false, false);
+ celixThreadMutex_unlock(&boundedServices.mutex);
-long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender) {
- return sender->serializerSvcId;
-}
+ celixThreadMutex_destroy(&boundedServices.mutex);
+ celixThreadMutex_destroy(&nanomsg.mutex);
-const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender) {
- return sender->scope;
+ free(scope);
+ free(topic);
+ free(url);
}
-const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender) {
- return sender->topic;
+long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const {
+ return serializerSvcId;
}
-const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender) {
- return sender->url;
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const {
+ return scope;
}
-void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
- //TODO subscriber count -> topic info
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const {
+ return topic;
}
-void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
- //TODO
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const {
+ return url;
}
-static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+
+void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties __attribute__((unused))) {
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
long bndId = celix_bundle_getId(requestingBundle);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+ celixThreadMutex_lock(&boundedServices.mutex);
+ psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId));
if (entry != NULL) {
entry->getCount += 1;
} else {
entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(calloc(1, sizeof(*entry)));
entry->getCount = 1;
- entry->parent = sender;
+ entry->parent = this;
entry->bndId = bndId;
- int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+ int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
if (rc == 0) {
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_nanoMsg_localMsgTypeIdForMsgType;
entry->service.send = psa_nanomsg_topicPublicationSend;
entry->service.sendMultipart = NULL; //not supported TODO remove
- hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+ hashMap_put(boundedServices.map, (void*)bndId, entry);
} else {
- L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", sender->scope, sender->topic);
+ L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", scope, topic);
}
}
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
+ celixThreadMutex_unlock(&boundedServices.mutex);
return &entry->service;
}
-static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties __attribute__((unused))) {
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
long bndId = celix_bundle_getId(requestingBundle);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+ celixThreadMutex_lock(&boundedServices.mutex);
+ psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId));
if (entry != NULL) {
entry->getCount -= 1;
}
if (entry != NULL && entry->getCount == 0) {
//free entry
- hashMap_remove(sender->boundedServices.map, (void*)bndId);
- int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+ hashMap_remove(boundedServices.map, (void*)bndId);
+ int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
if (rc != 0) {
L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
}
free(entry);
}
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
+ celixThreadMutex_unlock(&boundedServices.mutex);
}
static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) {
int status = CELIX_SUCCESS;
psa_nanomsg_bounded_service_entry_t *bound = static_cast<psa_nanomsg_bounded_service_entry_t*>(handle);
- pubsub_nanomsg_topic_sender_t *sender = bound->parent;
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = bound->parent;
pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId));
@@ -329,27 +295,27 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId
int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 );
free(serializedOutput);
if (rc < 0) {
- L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno));
+ //TODO L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno));
} else {
- L_INFO("[PSA_ZMQ_TS] Send message with size %d\n", rc);
- L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n", msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
+ //TODO L_INFO("[PSA_ZMQ_TS] Send message with size %d\n", rc);
+ //TODO L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n", msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
}
} else {
- L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
+ //TODO L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
}
} else {
status = CELIX_SERVICE_EXCEPTION;
- L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+ //TODO L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
}
return status;
}
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) {
+static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender */*sender*/) {
static bool firstSend = true;
if(firstSend){
- L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+ //TODO L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
sleep(FIRST_SEND_DELAY_IN_SECONDS);
firstSend = false;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
index ec85c37..90ab6ce 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
@@ -20,23 +20,63 @@
#define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
#include "celix_bundle_context.h"
+#include <log_helper.h>
+#include <pubsub_serializer.h>
-typedef struct pubsub_nanomsg_topic_sender pubsub_nanomsg_topic_sender_t;
+namespace pubsub {
+ namespace nanomsg {
+ class pubsub_nanomsg_topic_sender {
+ public:
+ pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, const char *_scope,
+ const char *_topic, long _serializerSvcId, pubsub_serializer_service_t *_ser,
+ const char *_bindIp, unsigned int _basePort, unsigned int _maxPort);
-pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
- const char *scope, const char *topic,
- long serializerSvcId, pubsub_serializer_service_t *ser,
- const char *bindIP, unsigned int basePort,
- unsigned int maxPort);
-void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender);
+ ~pubsub_nanomsg_topic_sender();
-const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender);
-const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender);
-const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender);
+ pubsub_nanomsg_topic_sender(const pubsub_nanomsg_topic_sender &) = delete;
-long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender);
+ const pubsub_nanomsg_topic_sender &operator=(const pubsub_nanomsg_topic_sender &) = delete;
-void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
-void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
+ long getSerializerSvcId() const ;
+ const char *getScope() const ;
+ const char *getTopic() const ;
+ const char *getUrl() const;
+
+ void* getPublisherService(const celix_bundle_t *requestingBundle,
+ const celix_properties_t *svcProperties __attribute__((unused)));
+ void ungetPublisherService(const celix_bundle_t *requestingBundle,
+ const celix_properties_t *svcProperties __attribute__((unused)));
+ int topicPublicationSend(unsigned int msgTypeId, const void *inMsg);
+ void delay_first_send_for_late_joiners() ;
+
+
+ //private:
+ celix_bundle_context_t *ctx;
+ log_helper_t *logHelper;
+ long serializerSvcId;
+ pubsub_serializer_service_t *serializer;
+
+ char *scope{};
+ char *topic{};
+ char scopeAndTopicFilter[5];
+ char *url{};
+
+ struct {
+ celix_thread_mutex_t mutex;
+ int socket;
+ } nanomsg{};
+
+ struct {
+ long svcId;
+ celix_service_factory_t factory;
+ } publisher{};
+
+ struct {
+ celix_thread_mutex_t mutex{};
+ hash_map_t *map{}; //key = bndId, value = psa_nanomsg_bounded_service_entry_t
+ } boundedServices{};
+ };
+ }
+}
#endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H