You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:02:57 UTC
[1/8] celix git commit: Removed celix-maps from nanomsg admin
Repository: celix
Updated Branches:
refs/heads/nanomsg 3009e6470 -> 7c141424d
Removed celix-maps from nanomsg admin
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/95633eb9
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/95633eb9
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/95633eb9
Branch: refs/heads/nanomsg
Commit: 95633eb954a09232867277d7cd0d71c30d49a012
Parents: 3009e64
Author: Erjan Altena <er...@gmail.com>
Authored: Sat Nov 3 19:40:27 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Sat Nov 3 19:40:27 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_admin.cc | 148 +++++++------------
.../src/pubsub_nanomsg_admin.h | 34 ++---
2 files changed, 67 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/95633eb9/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 9fe91d9..6c15ec8 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -32,8 +32,6 @@
#include "pubsub_utils.h"
#include "pubsub_nanomsg_admin.h"
#include "pubsub_psa_nanomsg_constants.h"
-#include "pubsub_nanomsg_topic_sender.h"
-#include "pubsub_nanomsg_topic_receiver.h"
/*
//#define L_DEBUG(...) \
// logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
@@ -50,11 +48,6 @@
#define L_ERROR printf
-typedef struct psa_nanomsg_serializer_entry {
- const char *serType;
- long svcId;
- pubsub_serializer_service_t *svc;
-} psa_nanomsg_serializer_entry_t;
static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
@@ -103,41 +96,29 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
-
- //serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
-
- topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
-
- topicReceivers.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
-
- discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
}
pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
//note assuming al psa register services and service tracker are removed.
{
std::lock_guard<std::mutex> lock(topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(&iter));
+ for (auto kv : topicSenders.map) {
+ auto *sender = kv.second;
pubsub_nanoMsgTopicSender_destroy(sender);
}
}
{
std::lock_guard<std::mutex> lock(topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
- pubsub_nanoMsgTopicReceiver_destroy(recv);
+ for (auto kv: topicReceivers.map) {
+ pubsub_nanoMsgTopicReceiver_destroy(kv.second);
}
}
{
std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+ for (auto entry : discoveredEndpoints.map) {
+ auto *ep = entry.second;
celix_properties_destroy(ep);
}
}
@@ -150,12 +131,6 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
}
}
- hashMap_destroy(topicSenders.map, true, false);
-
- hashMap_destroy(topicReceivers.map, true, false);
-
- hashMap_destroy(discoveredEndpoints.map, false, false);
-
free(ipAddress);
}
@@ -276,21 +251,19 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
std::lock_guard<std::mutex> lock(serializers.mutex);
psa_nanomsg_serializer_entry_t* entry = nullptr;
- auto kv = serializers.map.find(svcId);
- if (kv != serializers.map.end()) {
- entry = kv->second;
+ auto kvsm = serializers.map.find(svcId);
+ if (kvsm != serializers.map.end()) {
+ entry = kvsm->second;
}
serializers.map.erase(svcId);
if (entry != nullptr) {
{
std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapEntry_getValue(senderEntry));
+ for (auto kv: topicSenders.map) {
+ auto *sender = kv.second;
if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
- char *key = static_cast<char *>(hashMapEntry_getKey(senderEntry));
- hashMapIterator_remove(&iter);
+ char *key = kv.first;
+ topicSenders.map.erase(kv.first);
pubsub_nanoMsgTopicSender_destroy(sender);
free(key);
}
@@ -299,13 +272,11 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
{
std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+ for (auto kv : topicReceivers.map){
+ auto *receiver = kv.second;
if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
- char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
- hashMapIterator_remove(&iter);
+ char *key = kv.first;
+ topicReceivers.map.erase(key);
pubsub_nanoMsgTopicReceiver_destroy(receiver);
free(key);
}
@@ -365,7 +336,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
pubsub_nanomsg_topic_sender_t *sender = nullptr;
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
- sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMap_get(topicSenders.map, key));
+ sender = topicSenders.map.find(key)->second;
if (sender == nullptr) {
//auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
// (void *) serializerSvcId));
@@ -389,7 +360,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
if (cn != nullptr) {
celix_properties_set(newEndpoint, "container_name", cn);
}
- hashMap_put(topicSenders.map, key, sender);
+ topicSenders.map[key] = sender;
} else {
L_ERROR("[PSA NANOMSG] Error creating a TopicSender");
free(key);
@@ -417,10 +388,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
- hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key);
- if (entry != nullptr) {
- char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(topicSenders.map, key));
+ auto kv = topicSenders.map.find(key);
+ if (kv != topicSenders.map.end()) {
+ char *mapKey = kv->first;
+ pubsub_nanomsg_topic_sender_t *sender = kv->second;
free(mapKey);
//TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
pubsub_nanoMsgTopicSender_destroy(sender);
@@ -442,18 +413,21 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
{
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMap_get(topicReceivers.map, key));
+ auto trkv = topicReceivers.map.find(key);
+ if (trkv != topicReceivers.map.end()) {
+ receiver = trkv->second;
+ }
if (receiver == nullptr) {
- auto kv = serializers.map.find(serializerSvcId);
- if (kv != serializers.map.end()) {
- auto serEntry = kv->second;
+ auto kvs = serializers.map.find(serializerSvcId);
+ if (kvs != serializers.map.end()) {
+ auto serEntry = kvs->second;
receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
} else {
L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
}
if (receiver != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
- const char *serType = kv->second->serType;
+ const char *serType = kvs->second->serType;
newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
serType, nullptr);
//if available also set container name
@@ -461,7 +435,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
if (cn != nullptr) {
celix_properties_set(newEndpoint, "container_name", cn);
}
- hashMap_put(topicReceivers.map, key, receiver);
+ topicReceivers.map[key] = receiver;
} else {
L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
free(key);
@@ -473,9 +447,8 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
}
if (receiver != nullptr && newEndpoint != nullptr) {
std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- auto *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+ for (auto entry : discoveredEndpoints.map) {
+ auto *endpoint = entry.second;
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
connectEndpointToReceiver(receiver, endpoint);
@@ -494,12 +467,12 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) {
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key);
+ auto entry = topicReceivers.map.find(key);
free(key);
- if (entry != nullptr) {
- char *receiverKey = static_cast<char*>(hashMapEntry_getKey(entry));
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(entry));
- hashMap_remove(topicReceivers.map, receiverKey);
+ if (entry != topicReceivers.map.end()) {
+ char *receiverKey = entry->first;
+ pubsub_nanomsg_topic_receiver_t *receiver = entry->second;
+ topicReceivers.map.erase(receiverKey);
free(receiverKey);
pubsub_nanoMsgTopicReceiver_destroy(receiver);
@@ -542,17 +515,17 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+ for (auto entry: topicReceivers.map) {
+ pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
connectEndpointToReceiver(receiver, endpoint);
}
}
std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
celix_properties_t *cpy = celix_properties_copy(endpoint);
+ //TODO : check if properties are never deleted before map.
const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr);
- hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy);
+ discoveredEndpoints.map[uuid] = cpy;
celix_status_t status = CELIX_SUCCESS;
return status;
@@ -590,24 +563,17 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+ for (auto entry : topicReceivers.map) {
+ pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
disconnectEndpointFromReceiver(receiver, endpoint);
}
}
- celix_properties_t *found = nullptr;
{
std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
- found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid));
- }
- if (found != nullptr) {
- celix_properties_destroy(found);
+ discoveredEndpoints.map.erase(uuid);
}
-
- celix_status_t status = CELIX_SUCCESS;
- return status;
+ return CELIX_SUCCESS;;
}
celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribute__((unused)), FILE *out,
@@ -619,15 +585,11 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
{
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(
- &iter));
+ for (auto kvts: topicSenders.map) {
+ pubsub_nanomsg_topic_sender_t *sender = kvts.second;
long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
- auto kv = serializers.map.find(serSvcId);
- //psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
- // serializers.map, (void *) serSvcId));
- const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
+ auto kvs = serializers.map.find(serSvcId);
+ const char *serType = kvs->second == nullptr ? "!Error!" : kvs->second->serType;
const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -640,12 +602,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
{
fprintf(out, "\n");
fprintf(out, "\nTopic Receivers:\n");
- std::lock_guard<std::mutex> serialerLock(serializers.mutex);
+ std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMapIterator_nextValue(
- &iter));
+ for (auto entry : topicReceivers.map) {
+ pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
auto kv = serializers.map.find(serSvcId);
const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
http://git-wip-us.apache.org/repos/asf/celix/blob/95633eb9/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 98314b3..c34a310 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -29,6 +29,8 @@
#include <pubsub_serializer.h>
#include "../../../shell/shell/include/command.h"
+#include "pubsub_nanomsg_topic_sender.h"
+#include "pubsub_nanomsg_topic_receiver.h"
#define PUBSUB_NANOMSG_ADMIN_TYPE "zmq"
#define PUBSUB_NANOMSG_URL_KEY "zmq.url"
@@ -42,6 +44,13 @@
#define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1"
//typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
+
+template <typename key, typename value>
+struct ProtectedMap {
+ std::mutex mutex{};
+ std::map<key, value> map{};
+};
+
class pubsub_nanomsg_admin {
public:
pubsub_nanomsg_admin(celix_bundle_context_t *ctx, log_helper_t *logHelper);
@@ -106,27 +115,10 @@ private:
long svcId;
pubsub_serializer_service_t *svc;
} psa_nanomsg_serializer_entry_t;
- struct {
- std::mutex mutex;
- std::map<long, psa_nanomsg_serializer_entry_t*> map;
- //hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
- } serializers{};
-
- struct {
- std::mutex mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
- } topicSenders{};
-
- struct {
- std::mutex mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
- } topicReceivers{};
-
- struct {
- std::mutex mutex;
- hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
- } discoveredEndpoints{};
-
+ ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
+ ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
+ ProtectedMap<char*, pubsub_nanomsg_topic_receiver_t*> topicReceivers{};
+ ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
};
#ifdef __cplusplus
[5/8] celix git commit: subscriber.map now std::map
Posted by er...@apache.org.
subscriber.map now std::map
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/120895dd
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/120895dd
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/120895dd
Branch: refs/heads/nanomsg
Commit: 120895dd93c2c995d98327925fe88f939d345d12
Parents: 8658738
Author: Erjan Altena <er...@gmail.com>
Authored: Fri Nov 23 21:53:50 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Fri Nov 23 21:53:50 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_topic_receiver.cc | 100 +++++++------------
.../src/pubsub_nanomsg_topic_receiver.h | 12 ++-
2 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 8acf6b1..2205ed2 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -63,13 +63,6 @@
#define L_ERROR printf
-
-
-
-//static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
-// const celix_bundle_t *owner);
-
-
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
log_helper_t *_logHelper,
const char *_scope,
@@ -103,8 +96,8 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
m_scope = strndup(m_scope, 1024 * 1024);
m_topic = strndup(m_topic, 1024 * 1024);
- subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
- std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
+ //subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ //std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
//requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
@@ -139,38 +132,13 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
- hash_map_iterator_t iter=hash_map_iterator_t();
{
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- iter = hashMapIterator_construct(subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
- if (entry != NULL) {
- serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
- free(entry);
- }
+ for(auto elem : subscribers.map) {
+ serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes);
}
- hashMap_destroy(subscribers.map, false, false);
+ subscribers.map.clear();
}
-
-
-// {
-// std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-// iter = hashMapIterator_construct(requestedConnections.map);
-// while (hashMapIterator_hasNext(&iter)) {
-// psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
-// if (entry != NULL) {
-// free(entry->url);
-// free(entry);
-// }
-// }
-// hashMap_destroy(requestedConnections.map, false, false);
-// }
-
- //celixThreadMutex_destroy(&receiver->subscribers.mutex);
- //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
- //celixThreadMutex_destroy(&receiver->recvThread.mutex);
-
nn_close(m_nanoMsgSocket);
free((void*)m_scope);
@@ -211,6 +179,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
std::piecewise_construct,
std::forward_as_tuple(std::string(url)),
std::forward_as_tuple(url, -1));
+ entry = requestedConnections.map.find(url);
}
if (!entry->second.isConnected()) {
int connection_id = nn_connect(m_nanoMsgSocket, url);
@@ -254,21 +223,26 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_prope
}
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
- if (entry != NULL) {
- entry->usageCount += 1;
+ auto entry = subscribers.map.find(bndId);
+ if (entry != subscribers.map.end()) {
+ entry->second.usageCount += 1;
} else {
//new create entry
- entry = static_cast<psa_nanomsg_subscriber_entry_t*>(calloc(1, sizeof(*entry)));
- entry->usageCount = 1;
- entry->svc = static_cast<pubsub_subscriber_t*>(svc);
+ subscribers.map.emplace(std::piecewise_construct,
+ std::forward_as_tuple(bndId),
+ std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1));
+ entry = subscribers.map.find(bndId);
+ if (entry == subscribers.map.end()) {
+ std::cerr << "### THIS IS A VERY CRITICAL ERROR!!\n";
+ }
- int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+ int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes);
if (rc == 0) {
- hashMap_put(subscribers.map, (void*)bndId, entry);
+ //hashMap_put(subscribers.map, (void*)bndId, entry);
+ //subscribers.map[bndId] = *entry;
} else {
L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic);
- free(entry);
+ subscribers.map.erase(bndId);
}
}
}
@@ -278,18 +252,17 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
long bndId = celix_bundle_getId(bnd);
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
- if (entry != NULL) {
- entry->usageCount -= 1;
- }
- if (entry != NULL && entry->usageCount <= 0) {
- //remove entry
- hashMap_remove(subscribers.map, (void*)bndId);
- int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
- if (rc != 0) {
- L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+ auto entry = subscribers.map.find(bndId);
+ if (entry != subscribers.map.end()) {
+ entry->second.usageCount -= 1;
+ if (entry->second.usageCount <= 0) {
+ //remove entry
+ int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
+ if (rc != 0) {
+ L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+ }
+ subscribers.map.erase(bndId);
}
- free(entry);
}
}
@@ -319,12 +292,13 @@ void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_s
void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
- if (entry != NULL) {
- processMsgForSubscriberEntry(entry, hdr, payload, payloadSize);
- }
+ //hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
+ //while (hashMapIterator_hasNext(&iter)) {
+ for (auto entry : subscribers.map) {
+ //psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
+ //if (entry != NULL) {
+ processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize);
+ //}
}
}
http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 3398fb1..09d62a9 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -28,10 +28,13 @@
#include "pubsub_nanomsg_common.h"
#include "pubsub/subscriber.h"
-typedef struct psa_zmq_subscriber_entry {
+typedef struct psa_nanomsg_subscriber_entry {
+ psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) :
+ svc{_svc}, usageCount{_usageCount} {
+ }
+ pubsub_subscriber_t *svc{};
int usageCount;
- hash_map_t *msgTypes; //map from serializer svc
- pubsub_subscriber_t *svc;
+ hash_map_t *msgTypes{nullptr}; //map from serializer svc
} psa_nanomsg_subscriber_entry_t;
typedef struct psa_zmq_requested_connection_entry {
@@ -116,7 +119,8 @@ namespace pubsub {
long subscriberTrackerId{0};
struct {
std::mutex mutex;
- hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ std::map<long, psa_nanomsg_subscriber_entry_t> map;
+ //hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
} subscribers{};
};
}
[6/8] celix git commit: Nanomsg: moved charptr to std::string
Posted by er...@apache.org.
Nanomsg: moved charptr to std::string
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/883abeed
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/883abeed
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/883abeed
Branch: refs/heads/nanomsg
Commit: 883abeed00cfa3c1509b026fea888550b863defc
Parents: 120895d
Author: Erjan Altena <er...@gmail.com>
Authored: Fri Nov 23 22:35:30 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Fri Nov 23 22:35:30 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_admin.cc | 56 +++++++-----------
.../src/pubsub_nanomsg_admin.h | 4 +-
.../src/pubsub_nanomsg_common.cc | 8 +--
.../src/pubsub_nanomsg_common.h | 3 +-
.../src/pubsub_nanomsg_topic_receiver.cc | 60 +++++++-------------
.../src/pubsub_nanomsg_topic_receiver.h | 26 ++++-----
6 files changed, 63 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 3e788ae..030441d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -159,7 +159,7 @@ void pubsub_nanomsg_admin::start() {
};
adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->setupTopicReceiver(scope, topic,serializerSvcId, subscriberEndpoint);
+ return me->setupTopicReceiver(std::string(scope), std::string(topic),serializerSvcId, subscriberEndpoint);
};
adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) {
@@ -205,7 +205,7 @@ void pubsub_nanomsg_admin::start() {
celix_properties_t* shellProps = celix_properties_create();
celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
- celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
+ celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the nanomsg PSA");
cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps);
}
@@ -275,10 +275,9 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
for (auto kv : topicReceivers.map){
auto *receiver = kv.second;
if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
- char *key = kv.first;
+ auto key = kv.first;
topicReceivers.map.erase(key);
delete receiver;
- free(key);
}
}
}
@@ -338,8 +337,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
sender = topicSenders.map.find(key)->second;
if (sender == nullptr) {
- //auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
- // (void *) serializerSvcId));
psa_nanomsg_serializer_entry_t *serEntry = nullptr;
auto kv = serializers.map.find(serializerSvcId);
if (kv != serializers.map.end()) {
@@ -403,12 +400,12 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
return status;
}
-celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const char *topic,
+celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope, const std::string &topic,
long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
celix_properties_t *newEndpoint = nullptr;
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ std::string key = pubsubEndpoint_createScopeTopicKey(scope.c_str(), topic.c_str());
pubsub::nanomsg::topic_receiver * receiver = nullptr;
{
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
@@ -423,12 +420,12 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
auto serEntry = kvs->second;
receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
} else {
- L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
+ L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope.c_str(), topic.c_str());
}
if (receiver != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
const char *serType = kvs->second->serType;
- newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+ newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
serType, nullptr);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
@@ -438,11 +435,9 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
topicReceivers.map[key] = receiver;
} else {
L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
- free(key);
}
} else {
- free(key);
- L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+ L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope.c_str(), topic.c_str());
}
}
if (receiver != nullptr && newEndpoint != nullptr) {
@@ -470,11 +465,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co
auto entry = topicReceivers.map.find(key);
free(key);
if (entry != topicReceivers.map.end()) {
- char *receiverKey = entry->first;
+ auto receiverKey = entry->first;
pubsub::nanomsg::topic_receiver *receiver = entry->second;
topicReceivers.map.erase(receiverKey);
- free(receiverKey);
delete receiver;
}
@@ -487,22 +481,18 @@ celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = receiver->scope();
- const char *topic = receiver->topic();
+ auto scope = receiver->scope();
+ auto topic = receiver->topic();
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+ std::string eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
+ std::string eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
if (url == nullptr) {
-// const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, nullptr);
-// const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
// L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type);
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != nullptr && eTopic != nullptr &&
- strncmp(eScope, scope, 1024 * 1024) == 0 &&
- strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ if ((eScope == scope) && (eTopic == topic)) {
receiver->connectTo(url);
}
}
@@ -537,20 +527,18 @@ celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nano
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = receiver->scope();
- const char *topic = receiver->topic();
+ auto scope = receiver->scope();
+ auto topic = receiver->topic();
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+ auto eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
+ auto eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
if (url == nullptr) {
L_WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != nullptr && eTopic != nullptr &&
- strncmp(eScope, scope, 1024 * 1024) == 0 &&
- strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ if ((eScope == scope) && (eTopic == topic)) {
receiver->disconnectFrom(url);
}
}
@@ -609,14 +597,14 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
long serSvcId = receiver->serializerSvcId();
auto kv = serializers.map.find(serSvcId);
const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
- const char *scope = receiver->scope();
- const char *topic = receiver->topic();
+ auto scope = receiver->scope();
+ auto topic = receiver->topic();
std::vector<std::string> connected{};
std::vector<std::string> unconnected{};
receiver->listConnections(connected, unconnected);
- fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+ fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str());
fprintf(out, " |- serializer type = %s\n", serType);
for (auto url : connected) {
fprintf(out, " |- connected url = %s\n", url.c_str());
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 3e680b6..b33a3c0 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -74,7 +74,7 @@ private:
long serializerSvcId, celix_properties_t **publisherEndpoint);
celix_status_t teardownTopicSender(const char *scope, const char *topic);
- celix_status_t setupTopicReceiver(const char *scope, const char *topic,
+ celix_status_t setupTopicReceiver(const std::string &scope, const std::string &topic,
long serializerSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t teardownTopicReceiver(const char *scope, const char *topic);
@@ -117,7 +117,7 @@ private:
} psa_nanomsg_serializer_entry_t;
ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
- ProtectedMap<char*, pubsub::nanomsg::topic_receiver*> topicReceivers{};
+ ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
};
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
index 2a2bcfe..3ecd19c 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
@@ -41,15 +41,15 @@ bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_he
return check;
}
-void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter) {
- for (int i = 0; i < 5; ++i) {
+void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter) {
+ for (int i = 0; i < 5; ++i) { // 5 ??
filter[i] = '\0';
}
- if (scope != NULL && strnlen(scope, 3) >= 2) {
+ if (scope.size() >= 2) { //3 ??
filter[0] = scope[0];
filter[1] = scope[1];
}
- if (topic != NULL && strnlen(topic, 3) >= 2) {
+ if (topic.size() >= 2) { //3 ??
filter[2] = topic[0];
filter[3] = topic[1];
}
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
index 3d5d48d..276169f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -20,6 +20,7 @@
#ifndef CELIX_PUBSUB_ZMQ_COMMON_H
#define CELIX_PUBSUB_ZMQ_COMMON_H
+#include <string>
#include <utils.h>
#include "version.h"
@@ -48,7 +49,7 @@ typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t;
int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
-void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter);
+void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter);
bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_header_t *hdr);
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 2205ed2..db8469b 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -65,8 +65,8 @@
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
log_helper_t *_logHelper,
- const char *_scope,
- const char *_topic,
+ const std::string &_scope,
+ const std::string &_topic,
long _serializerSvcId,
pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
ctx = _ctx;
@@ -76,33 +76,22 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
if (m_nanoMsgSocket < 0) {
- // TODO throw error or something
- //free(receiver);
- //receiver = NULL;
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope, m_topic);
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope.c_str(), m_topic.c_str());
+ std::bad_alloc{};
} else {
int timeout = PSA_NANOMSG_RECV_TIMEOUT;
if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
sizeof (timeout)) < 0) {
- // TODO throw error or something
- //free(receiver);
- //receiver = NULL;
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope, m_topic);
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope.c_str(), m_topic.c_str());
+ std::bad_alloc{};
}
- char subscribeFilter[5];
- psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
+ char subscriberFilter[5]; // 5 ??
+ psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscriberFilter);
- m_scope = strndup(m_scope, 1024 * 1024);
- m_topic = strndup(m_topic, 1024 * 1024);
-
- //subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
- //std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
- //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
- int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
+ int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str());
char buf[size + 1];
- snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
+ snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str());
celix_service_tracking_options_t opts{};
opts.filter.ignoreServiceLanguage = true;
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
@@ -141,14 +130,13 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
}
nn_close(m_nanoMsgSocket);
- free((void*)m_scope);
- free((void*)m_topic);
}
-const char* pubsub::nanomsg::topic_receiver::scope() const {
+std::string pubsub::nanomsg::topic_receiver::scope() const {
return m_scope;
}
-const char* pubsub::nanomsg::topic_receiver::topic() const {
+
+std::string pubsub::nanomsg::topic_receiver::topic() const {
return m_topic;
}
@@ -170,7 +158,7 @@ void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &
void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
- L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url);
+ L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -193,7 +181,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
}
void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
- L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url);
+ L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -216,8 +204,8 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props,
const celix_bundle_t *bnd) {
long bndId = celix_bundle_getId(bnd);
- const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
- if (strncmp(subScope, m_scope, strlen(m_scope)) != 0) {
+ std::string subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ if (subScope != m_scope) {
//not the same scope. ignore
return;
}
@@ -232,16 +220,10 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_prope
std::forward_as_tuple(bndId),
std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1));
entry = subscribers.map.find(bndId);
- if (entry == subscribers.map.end()) {
- std::cerr << "### THIS IS A VERY CRITICAL ERROR!!\n";
- }
int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes);
- if (rc == 0) {
- //hashMap_put(subscribers.map, (void*)bndId, entry);
- //subscribers.map[bndId] = *entry;
- } else {
- L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic);
+ if (rc != 0) {
+ L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str());
subscribers.map.erase(bndId);
}
}
@@ -259,14 +241,14 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
//remove entry
int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
if (rc != 0) {
- L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+ L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str());
}
subscribers.map.erase(bndId);
}
}
}
-void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
pubsub_subscriber_t *svc = entry->svc;
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 09d62a9..2519e4a 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -28,18 +28,18 @@
#include "pubsub_nanomsg_common.h"
#include "pubsub/subscriber.h"
-typedef struct psa_nanomsg_subscriber_entry {
+struct psa_nanomsg_subscriber_entry {
psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) :
svc{_svc}, usageCount{_usageCount} {
}
pubsub_subscriber_t *svc{};
int usageCount;
hash_map_t *msgTypes{nullptr}; //map from serializer svc
-} psa_nanomsg_subscriber_entry_t;
+};
-typedef struct psa_zmq_requested_connection_entry {
+typedef struct psa_nanomsg_requested_connection_entry {
public:
- psa_zmq_requested_connection_entry(std::string _url, int _id, bool _connected=false):
+ psa_nanomsg_requested_connection_entry(std::string _url, int _id, bool _connected=false):
url{_url}, id{_id}, connected{_connected} {
}
bool isConnected() const {
@@ -73,23 +73,23 @@ namespace pubsub {
topic_receiver(celix_bundle_context_t
*ctx,
log_helper_t *logHelper,
- const char *scope,
- const char *topic,
+ const std::string &scope,
+ const std::string &topic,
long serializerSvcId, pubsub_serializer_service_t
*serializer);
topic_receiver(const topic_receiver &) = delete;
topic_receiver & operator=(const topic_receiver &) = delete;
~topic_receiver();
- const char* scope() const;
- const char* topic() const;
+ std::string scope() const;
+ std::string topic() const;
long serializerSvcId() const;
void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
void connectTo(const char *url);
void disconnectFrom(const char *url);
void recvThread_exec();
void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize);
- void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
+ void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd);
@@ -98,8 +98,8 @@ namespace pubsub {
log_helper_t *logHelper{nullptr};
long m_serializerSvcId{0};
pubsub_serializer_service_t *serializer{nullptr};
- const char *m_scope{nullptr};
- const char *m_topic{nullptr};
+ const std::string m_scope{};
+ const std::string m_topic{};
char m_scopeAndTopicFilter[5];
int m_nanoMsgSocket{0};
@@ -113,14 +113,12 @@ namespace pubsub {
struct {
std::mutex mutex;
std::map<std::string, psa_nanomsg_requested_connection_entry_t> map;
- //hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
} requestedConnections{};
long subscriberTrackerId{0};
struct {
std::mutex mutex;
- std::map<long, psa_nanomsg_subscriber_entry_t> map;
- //hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ std::map<long, psa_nanomsg_subscriber_entry> map;
} subscribers{};
};
}
[3/8] celix git commit: nanomsg Topic receiver to class
Posted by er...@apache.org.
nanomsg Topic receiver to class
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/0abbf432
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/0abbf432
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/0abbf432
Branch: refs/heads/nanomsg
Commit: 0abbf4323838b5823b6f275ab418438727dfe289
Parents: c19a5bd
Author: Erjan Altena <er...@gmail.com>
Authored: Wed Nov 21 20:34:12 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Wed Nov 21 21:10:41 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_common.h | 4 +-
.../src/pubsub_nanomsg_topic_receiver.cc | 90 ++++++--------------
.../src/pubsub_nanomsg_topic_receiver.h | 42 +++++----
3 files changed, 47 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
index 28293a8..3d5d48d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -37,14 +37,14 @@
*/
-struct pubsub_zmq_msg_header {
+struct pubsub_nanomsg_msg_header {
//header
unsigned int type;
unsigned char major;
unsigned char minor;
};
-typedef struct pubsub_zmq_msg_header pubsub_nanmosg_msg_header_t;
+typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t;
int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 889d79d..88886c6 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -17,6 +17,7 @@
*under the License.
*/
+#include <iostream>
#include <mutex>
#include <memory.h>
#include <vector>
@@ -61,34 +62,6 @@
#define L_WARN printf
#define L_ERROR printf
-struct pubsub_nanomsg_topic_receiver {
- celix_bundle_context_t *ctx;
- log_helper_t *logHelper;
- long serializerSvcId;
- pubsub_serializer_service_t *serializer;
- char *scope;
- char *topic;
- char scopeAndTopicFilter[5];
-
- int nanoMsgSocket;
-
- struct {
- celix_thread_t thread;
- std::mutex mutex;
- bool running;
- } recvThread;
-
- struct {
- std::mutex mutex;
- hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
- } requestedConnections;
-
- long subscriberTrackerId;
- struct {
- std::mutex mutex;
- hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
- } subscribers;
-};
typedef struct psa_zmq_requested_connection_entry {
char *url;
@@ -96,23 +69,14 @@ typedef struct psa_zmq_requested_connection_entry {
int id;
} psa_nanomsg_requested_connection_entry_t;
-typedef struct psa_zmq_subscriber_entry {
- int usageCount;
- hash_map_t *msgTypes; //map from serializer svc
- pubsub_subscriber_t *svc;
-} psa_nanomsg_subscriber_entry_t;
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+ const celix_bundle_t *owner);
static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
const celix_bundle_t *owner);
-static void* psa_nanomsg_recvThread(void *data);
-//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-// log_helper_t *logHelper, const char *scope,
-// const char *topic, long serializerSvcId,
-// pubsub_serializer_service_t *serializer) {
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
log_helper_t *_logHelper,
const char *_scope,
@@ -149,6 +113,7 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
m_topic = strndup(m_topic, 1024 * 1024);
subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
@@ -159,15 +124,13 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.filter = buf;
opts.callbackHandle = this;
- opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
+ opts.addWithOwner = pubsub_nanomsgTopicReceiver_addSubscriber;
opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
recvThread.running = true;
- celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this);
- std::stringstream namestream;
- namestream << "NANOMSG TR " << m_scope << "/" << m_topic;
- celixThread_setName(&recvThread.thread, namestream.str().c_str());
+
+ recvThread.thread = std::thread([this]() {this->recvThread_exec();});
}
}
@@ -177,7 +140,7 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
std::lock_guard<std::mutex> _lock(recvThread.mutex);
recvThread.running = false;
}
- celixThread_join(recvThread.thread, NULL);
+ recvThread.thread.join();
celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
@@ -285,12 +248,13 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
}
}
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
+static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+ const celix_bundle_t *bnd) {
+ auto *receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
long bndId = celix_bundle_getId(bnd);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
- if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+ if (strncmp(subScope, receiver->m_scope, strlen(receiver->m_scope)) != 0) {
//not the same scope. ignore
return;
}
@@ -309,7 +273,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
if (rc == 0) {
hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
} else {
- L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
free(entry);
}
}
@@ -317,7 +281,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
- pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
+ auto receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
long bndId = celix_bundle_getId(bnd);
@@ -331,13 +295,13 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s
hashMap_remove(receiver->subscribers.map, (void*)bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
if (rc != 0) {
- L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
}
free(entry);
}
}
-static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
pubsub_subscriber_t *svc = entry->svc;
@@ -353,7 +317,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r
msgSer->freeMsg(msgSer->handle, deserializedMsg);
}
} else {
- L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
+ //L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, scope, topic);
}
}
} else {
@@ -361,13 +325,13 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r
}
}
-static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
- std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+ std::lock_guard<std::mutex> _lock(subscribers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
if (entry != NULL) {
- processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize);
+ processMsgForSubscriberEntry(entry, hdr, payload, payloadSize);
}
}
}
@@ -377,12 +341,11 @@ struct Message {
char payload[];
};
-static void* psa_nanomsg_recvThread(void *data) {
- pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data);
+void pubsub::nanomsg::topic_receiver::recvThread_exec() {
bool running{};
{
- std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
- running = receiver->recvThread.running;
+ std::lock_guard<std::mutex> _lock(recvThread.mutex);
+ running = recvThread.running;
}
while (running) {
Message *msg = nullptr;
@@ -400,9 +363,9 @@ static void* psa_nanomsg_recvThread(void *data) {
msgHdr.msg_controllen = 0;
errno = 0;
- int recvBytes = nn_recvmsg(receiver->nanoMsgSocket, &msgHdr, 0);
+ int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0);
if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
- processMsg(receiver, &msg->header, msg->payload, recvBytes-sizeof(msg->header));
+ processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
nn_freemsg(msg);
} else if (recvBytes >= 0) {
L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
@@ -415,5 +378,4 @@ static void* psa_nanomsg_recvThread(void *data) {
}
} // while
- return NULL;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 6cd216b..f977917 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -16,15 +16,24 @@
*specific language governing permissions and limitations
*under the License.
*/
-#ifndef CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
-#define CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
+#pragma once
#include <string>
#include <vector>
+#include <thread>
+#include <mutex>
#include "pubsub_serializer.h"
#include "log_helper.h"
#include "celix_bundle_context.h"
+#include "pubsub_nanomsg_common.h"
+#include "pubsub/subscriber.h"
+
+typedef struct psa_zmq_subscriber_entry {
+ int usageCount;
+ hash_map_t *msgTypes; //map from serializer svc
+ pubsub_subscriber_t *svc;
+} psa_nanomsg_subscriber_entry_t;
+
-//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
namespace pubsub {
namespace nanomsg {
class topic_receiver {
@@ -46,7 +55,10 @@ namespace pubsub {
void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
void connectTo(const char *url);
void disconnectFrom(const char *url);
- private:
+ void recvThread_exec();
+ void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize);
+ void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
+ //private:
celix_bundle_context_t *ctx{nullptr};
log_helper_t *logHelper{nullptr};
long m_serializerSvcId{0};
@@ -58,7 +70,7 @@ namespace pubsub {
int m_nanoMsgSocket{0};
struct {
- celix_thread_t thread;
+ std::thread thread;
std::mutex mutex;
bool running;
} recvThread{};
@@ -74,22 +86,6 @@ namespace pubsub {
hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
} subscribers{};
};
- }}
-//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-// log_helper_t *logHelper, const char *scope,
-// const char *topic, long serializerSvcId,
-// pubsub_serializer_service_t *serializer);
-//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
-
-//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
-//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
-
-//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
-//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
-// std::vector<std::string> &connectedUrls,
-// std::vector<std::string> &unconnectedUrls);
-
-//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
-//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+ }
+}
-#endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
[2/8] celix git commit: nanomsg topicreceiver changed to class
Posted by er...@apache.org.
nanomsg topicreceiver changed to class
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c19a5bd8
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c19a5bd8
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c19a5bd8
Branch: refs/heads/nanomsg
Commit: c19a5bd85b5c514794c6b38a1080b5382b8ca1ad
Parents: 95633eb
Author: Erjan Altena <er...@gmail.com>
Authored: Mon Nov 19 20:10:50 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Mon Nov 19 20:10:50 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_admin.cc | 44 ++---
.../src/pubsub_nanomsg_admin.h | 6 +-
.../src/pubsub_nanomsg_topic_receiver.cc | 162 +++++++++----------
.../src/pubsub_nanomsg_topic_receiver.h | 77 +++++++--
4 files changed, 169 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 6c15ec8..3e788ae 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -111,7 +111,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
{
std::lock_guard<std::mutex> lock(topicReceivers.mutex);
for (auto kv: topicReceivers.map) {
- pubsub_nanoMsgTopicReceiver_destroy(kv.second);
+ delete kv.second;
}
}
@@ -274,10 +274,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
for (auto kv : topicReceivers.map){
auto *receiver = kv.second;
- if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+ if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
char *key = kv.first;
topicReceivers.map.erase(key);
- pubsub_nanoMsgTopicReceiver_destroy(receiver);
+ delete receiver;
free(key);
}
}
@@ -409,7 +409,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
celix_properties_t *newEndpoint = nullptr;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- pubsub_nanomsg_topic_receiver_t * receiver = nullptr;
+ pubsub::nanomsg::topic_receiver * receiver = nullptr;
{
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
@@ -421,7 +421,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
auto kvs = serializers.map.find(serializerSvcId);
if (kvs != serializers.map.end()) {
auto serEntry = kvs->second;
- receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+ receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
} else {
L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
}
@@ -471,24 +471,24 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co
free(key);
if (entry != topicReceivers.map.end()) {
char *receiverKey = entry->first;
- pubsub_nanomsg_topic_receiver_t *receiver = entry->second;
+ pubsub::nanomsg::topic_receiver *receiver = entry->second;
topicReceivers.map.erase(receiverKey);
free(receiverKey);
- pubsub_nanoMsgTopicReceiver_destroy(receiver);
+ delete receiver;
}
celix_status_t status = CELIX_SUCCESS;
return status;
}
-celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
- const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+ const char *scope = receiver->scope();
+ const char *topic = receiver->topic();
const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
@@ -503,7 +503,7 @@ celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_to
if (eScope != nullptr && eTopic != nullptr &&
strncmp(eScope, scope, 1024 * 1024) == 0 &&
strncmp(eTopic, topic, 1024 * 1024) == 0) {
- pubsub_nanoMsgTopicReceiver_connectTo(receiver, url);
+ receiver->connectTo(url);
}
}
@@ -516,7 +516,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
for (auto entry: topicReceivers.map) {
- pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
+ pubsub::nanomsg::topic_receiver *receiver = entry.second;
connectEndpointToReceiver(receiver, endpoint);
}
}
@@ -532,13 +532,13 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
}
-celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
- const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+ const char *scope = receiver->scope();
+ const char *topic = receiver->topic();
const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
@@ -551,7 +551,7 @@ celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanom
if (eScope != nullptr && eTopic != nullptr &&
strncmp(eScope, scope, 1024 * 1024) == 0 &&
strncmp(eTopic, topic, 1024 * 1024) == 0) {
- pubsub_nanoMsgTopicReceiver_disconnectFrom(receiver, url);
+ receiver->disconnectFrom(url);
}
}
@@ -564,7 +564,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
for (auto entry : topicReceivers.map) {
- pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
+ pubsub::nanomsg::topic_receiver *receiver = entry.second;
disconnectEndpointFromReceiver(receiver, endpoint);
}
}
@@ -605,16 +605,16 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
for (auto entry : topicReceivers.map) {
- pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
- long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
+ pubsub::nanomsg::topic_receiver *receiver = entry.second;
+ long serSvcId = receiver->serializerSvcId();
auto kv = serializers.map.find(serSvcId);
const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
- const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
- const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+ const char *scope = receiver->scope();
+ const char *topic = receiver->topic();
std::vector<std::string> connected{};
std::vector<std::string> unconnected{};
- pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
+ receiver->listConnections(connected, unconnected);
fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index c34a310..3e680b6 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -84,10 +84,10 @@ private:
celix_status_t executeCommand(char *commandLine __attribute__((unused)), FILE *out,
FILE *errStream __attribute__((unused)));
- celix_status_t connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+ celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
const celix_properties_t *endpoint);
- celix_status_t disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+ celix_status_t disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
const celix_properties_t *endpoint);
celix_bundle_context_t *ctx;
log_helper_t *log;
@@ -117,7 +117,7 @@ private:
} psa_nanomsg_serializer_entry_t;
ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
- ProtectedMap<char*, pubsub_nanomsg_topic_receiver_t*> topicReceivers{};
+ ProtectedMap<char*, pubsub::nanomsg::topic_receiver*> topicReceivers{};
ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
};
http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 42f6423..889d79d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -109,92 +109,96 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc
static void* psa_nanomsg_recvThread(void *data);
-pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
- log_helper_t *logHelper, const char *scope,
- const char *topic, long serializerSvcId,
- pubsub_serializer_service_t *serializer) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
- receiver->ctx = ctx;
- receiver->logHelper = logHelper;
- receiver->serializerSvcId = serializerSvcId;
- receiver->serializer = serializer;
- psa_nanomsg_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
-
-
- receiver->nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
- if (receiver->nanoMsgSocket < 0) {
- free(receiver);
- receiver = NULL;
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", scope, topic);
+//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+// log_helper_t *logHelper, const char *scope,
+// const char *topic, long serializerSvcId,
+// pubsub_serializer_service_t *serializer) {
+pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
+ log_helper_t *_logHelper,
+ const char *_scope,
+ const char *_topic,
+ long _serializerSvcId,
+ pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
+ //pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
+ ctx = _ctx;
+ logHelper = _logHelper;
+ serializer = _serializer;
+ psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, m_scopeAndTopicFilter);
+
+ m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
+ if (m_nanoMsgSocket < 0) {
+ // TODO throw error or something
+ //free(receiver);
+ //receiver = NULL;
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope, m_topic);
} else {
int timeout = PSA_NANOMSG_RECV_TIMEOUT;
- if (nn_setsockopt(receiver->nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
+ if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
sizeof (timeout)) < 0) {
- free(receiver);
- receiver = NULL;
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", scope, topic);
+ // TODO throw error or something
+ //free(receiver);
+ //receiver = NULL;
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope, m_topic);
}
char subscribeFilter[5];
- psa_nanomsg_setScopeAndTopicFilter(scope, topic, subscribeFilter);
+ psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
//zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter);
- receiver->scope = strndup(scope, 1024 * 1024);
- receiver->topic = strndup(topic, 1024 * 1024);
+ m_scope = strndup(m_scope, 1024 * 1024);
+ m_topic = strndup(m_topic, 1024 * 1024);
- receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
- receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
char buf[size + 1];
- snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
celix_service_tracking_options_t opts{};
opts.filter.ignoreServiceLanguage = true;
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.filter = buf;
- opts.callbackHandle = receiver;
+ opts.callbackHandle = this;
opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
- receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- receiver->recvThread.running = true;
- celixThread_create(&receiver->recvThread.thread, NULL, psa_nanomsg_recvThread, receiver);
+ subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ recvThread.running = true;
+ celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this);
std::stringstream namestream;
- namestream << "NANOMSG TR " << scope << "/" << topic;
- celixThread_setName(&receiver->recvThread.thread, namestream.str().c_str());
+ namestream << "NANOMSG TR " << m_scope << "/" << m_topic;
+ celixThread_setName(&recvThread.thread, namestream.str().c_str());
}
- return receiver;
}
-void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver) {
- if (receiver != NULL) {
+pubsub::nanomsg::topic_receiver::~topic_receiver() {
{
- std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
- receiver->recvThread.running = false;
+ std::lock_guard<std::mutex> _lock(recvThread.mutex);
+ recvThread.running = false;
}
- celixThread_join(receiver->recvThread.thread, NULL);
+ celixThread_join(recvThread.thread, NULL);
- celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+ celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
hash_map_iterator_t iter=hash_map_iterator_t();
{
- std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
- iter = hashMapIterator_construct(receiver->subscribers.map);
+ std::lock_guard<std::mutex> _lock(subscribers.mutex);
+ iter = hashMapIterator_construct(subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
if (entry != NULL) {
- receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
free(entry);
}
}
- hashMap_destroy(receiver->subscribers.map, false, false);
+ hashMap_destroy(subscribers.map, false, false);
}
{
- std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
- iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+ iter = hashMapIterator_construct(requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
if (entry != NULL) {
@@ -202,37 +206,34 @@ void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiv
free(entry);
}
}
- hashMap_destroy(receiver->requestedConnections.map, false, false);
+ hashMap_destroy(requestedConnections.map, false, false);
}
//celixThreadMutex_destroy(&receiver->subscribers.mutex);
//celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
//celixThreadMutex_destroy(&receiver->recvThread.mutex);
- nn_close(receiver->nanoMsgSocket);
+ nn_close(m_nanoMsgSocket);
- free(receiver->scope);
- free(receiver->topic);
- }
- free(receiver);
+ free((void*)m_scope);
+ free((void*)m_topic);
}
-const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver) {
- return receiver->scope;
+const char* pubsub::nanomsg::topic_receiver::scope() const {
+ return m_scope;
}
-const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver) {
- return receiver->topic;
+const char* pubsub::nanomsg::topic_receiver::topic() const {
+ return m_topic;
}
-long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver) {
- return receiver->serializerSvcId;
+long pubsub::nanomsg::topic_receiver::serializerSvcId() const {
+ return m_serializerSvcId;
}
-void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
- std::vector<std::string> &connectedUrls,
+void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls,
std::vector<std::string> &unconnectedUrls) {
- std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter));
if (entry->connected) {
@@ -244,21 +245,19 @@ void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t
}
-void pubsub_nanoMsgTopicReceiver_connectTo(
- pubsub_nanomsg_topic_receiver_t *receiver,
- const char *url) {
- L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope, receiver->topic, url);
+void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
+ L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url);
- std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
- psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(receiver->requestedConnections.map, url));
+ std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+ psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(requestedConnections.map, url));
if (entry == NULL) {
entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry)));
entry->url = strndup(url, 1024*1024);
entry->connected = false;
- hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
+ hashMap_put(requestedConnections.map, (void*)entry->url, entry);
}
if (!entry->connected) {
- int connection_id = nn_connect(receiver->nanoMsgSocket, url);
+ int connection_id = nn_connect(m_nanoMsgSocket, url);
if (connection_id >= 0) {
entry->connected = true;
entry->id = connection_id;
@@ -268,13 +267,13 @@ void pubsub_nanoMsgTopicReceiver_connectTo(
}
}
-void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url) {
- L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope, receiver->topic, url);
+void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
+ L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url);
- std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
- psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(receiver->requestedConnections.map, url));
+ std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+ psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(requestedConnections.map, url));
if (entry != NULL && entry->connected) {
- if (nn_shutdown(receiver->nanoMsgSocket, entry->id) == 0) {
+ if (nn_shutdown(m_nanoMsgSocket, entry->id) == 0) {
entry->connected = false;
} else {
L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno));
@@ -287,7 +286,7 @@ void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t
}
static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+ pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
long bndId = celix_bundle_getId(bnd);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -318,7 +317,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+ pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
long bndId = celix_bundle_getId(bnd);
@@ -338,7 +337,7 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s
}
}
-static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
pubsub_subscriber_t *svc = entry->svc;
@@ -362,7 +361,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t
}
}
-static inline void processMsg(pubsub_nanomsg_topic_receiver_t *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -377,8 +376,9 @@ struct Message {
pubsub_nanmosg_msg_header_t header;
char payload[];
};
+
static void* psa_nanomsg_recvThread(void *data) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(data);
+ pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data);
bool running{};
{
std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index d584db8..6cd216b 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -24,23 +24,72 @@
#include "log_helper.h"
#include "celix_bundle_context.h"
-typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+namespace pubsub {
+ namespace nanomsg {
+ class topic_receiver {
+ public:
+ topic_receiver(celix_bundle_context_t
+ *ctx,
+ log_helper_t *logHelper,
+ const char *scope,
+ const char *topic,
+ long serializerSvcId, pubsub_serializer_service_t
+ *serializer);
+ topic_receiver(const topic_receiver &) = delete;
+ topic_receiver & operator=(const topic_receiver &) = delete;
+ ~topic_receiver();
-pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
- log_helper_t *logHelper, const char *scope,
- const char *topic, long serializerSvcId,
- pubsub_serializer_service_t *serializer);
-void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+ const char* scope() const;
+ const char* topic() const;
+ long serializerSvcId() const;
+ void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
+ void connectTo(const char *url);
+ void disconnectFrom(const char *url);
+ private:
+ celix_bundle_context_t *ctx{nullptr};
+ log_helper_t *logHelper{nullptr};
+ long m_serializerSvcId{0};
+ pubsub_serializer_service_t *serializer{nullptr};
+ const char *m_scope{nullptr};
+ const char *m_topic{nullptr};
+ char m_scopeAndTopicFilter[5];
-const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
-const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+ int m_nanoMsgSocket{0};
-long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
-void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
- std::vector<std::string> &connectedUrls,
- std::vector<std::string> &unconnectedUrls);
+ struct {
+ celix_thread_t thread;
+ std::mutex mutex;
+ bool running;
+ } recvThread{};
-void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
-void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+ struct {
+ std::mutex mutex;
+ hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+ } requestedConnections{};
+
+ long subscriberTrackerId{0};
+ struct {
+ std::mutex mutex;
+ hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ } subscribers{};
+ };
+ }}
+//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+// log_helper_t *logHelper, const char *scope,
+// const char *topic, long serializerSvcId,
+// pubsub_serializer_service_t *serializer);
+//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+
+//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
+//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+
+//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
+//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
+// std::vector<std::string> &connectedUrls,
+// std::vector<std::string> &unconnectedUrls);
+
+//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
#endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
[8/8] celix git commit: NanoMsg
Posted by er...@apache.org.
NanoMsg
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7c141424
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7c141424
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7c141424
Branch: refs/heads/nanomsg
Commit: 7c141424d925afa98a83bc693649dc3500354965
Parents: cdefb0d
Author: Erjan Altena <er...@gmail.com>
Authored: Tue Nov 27 21:02:18 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Tue Nov 27 21:02:18 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_admin.cc | 42 ++--
.../src/pubsub_nanomsg_admin.h | 2 +-
.../src/pubsub_nanomsg_topic_sender.cc | 240 ++++++++-----------
.../src/pubsub_nanomsg_topic_sender.h | 66 ++++-
4 files changed, 178 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index cf516ee..42ed632 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -99,20 +99,20 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
std::lock_guard<std::mutex> lock(topicSenders.mutex);
for (auto kv : topicSenders.map) {
auto *sender = kv.second;
- pubsub_nanoMsgTopicSender_destroy(sender);
+ delete (sender);
}
}
{
std::lock_guard<std::mutex> lock(topicReceivers.mutex);
- for (auto kv: topicReceivers.map) {
+ for (auto &kv: topicReceivers.map) {
delete kv.second;
}
}
{
std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
- for (auto entry : discoveredEndpoints.map) {
+ for (auto &entry : discoveredEndpoints.map) {
auto *ep = entry.second;
celix_properties_destroy(ep);
}
@@ -252,10 +252,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
for (auto kv: topicSenders.map) {
auto *sender = kv.second;
- if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+ if (sender != nullptr && entry.svcId == sender->getSerializerSvcId()) {
char *key = kv.first;
topicSenders.map.erase(kv.first);
- pubsub_nanoMsgTopicSender_destroy(sender);
+ delete (sender);
free(key);
}
}
@@ -263,7 +263,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
{
std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
- for (auto kv : topicReceivers.map){
+ for (auto &kv : topicReceivers.map){
auto *receiver = kv.second;
if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
auto key = kv.first;
@@ -322,7 +322,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
celix_properties_t *newEndpoint = nullptr;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- pubsub_nanomsg_topic_sender_t *sender = nullptr;
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = nullptr;
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
sender = topicSenders.map.find(key)->second;
@@ -333,7 +333,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
serEntry = &kv->second;
}
if (serEntry != nullptr) {
- sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
+ sender = new pubsub::nanomsg::pubsub_nanomsg_topic_sender(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
basePort, maxPort);
}
if (sender != nullptr) {
@@ -341,7 +341,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType,
nullptr);
- celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
+ celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, sender->getUrl());
//if available also set container name
const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
if (cn != nullptr) {
@@ -378,10 +378,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
auto kv = topicSenders.map.find(key);
if (kv != topicSenders.map.end()) {
char *mapKey = kv->first;
- pubsub_nanomsg_topic_sender_t *sender = kv->second;
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kv->second;
free(mapKey);
//TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
- pubsub_nanoMsgTopicSender_destroy(sender);
+ delete (sender);
} else {
L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
}
@@ -495,7 +495,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
- for (auto entry: topicReceivers.map) {
+ for (auto &entry: topicReceivers.map) {
pubsub::nanomsg::topic_receiver *receiver = entry.second;
connectEndpointToReceiver(receiver, endpoint);
}
@@ -541,7 +541,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- for (auto entry : topicReceivers.map) {
+ for (auto &entry : topicReceivers.map) {
pubsub::nanomsg::topic_receiver *receiver = entry.second;
disconnectEndpointFromReceiver(receiver, endpoint);
}
@@ -564,13 +564,13 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
for (auto kvts: topicSenders.map) {
- pubsub_nanomsg_topic_sender_t *sender = kvts.second;
- long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kvts.second;
+ long serSvcId = sender->getSerializerSvcId();
auto kvs = serializers.map.find(serSvcId);
const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType);
- const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
- const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
- const char *url = pubsub_nanoMsgTopicSender_url(sender);
+ const char *scope = sender->getScope();
+ const char *topic = sender->getTopic();
+ const char *url = sender->getUrl();
fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
fprintf(out, " |- url = %s\n", url);
@@ -582,7 +582,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
fprintf(out, "\nTopic Receivers:\n");
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
- for (auto entry : topicReceivers.map) {
+ for (auto &entry : topicReceivers.map) {
pubsub::nanomsg::topic_receiver *receiver = entry.second;
long serSvcId = receiver->serializerSvcId();
auto kv = serializers.map.find(serSvcId);
@@ -596,10 +596,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str());
fprintf(out, " |- serializer type = %s\n", serType);
- for (auto url : connected) {
+ for (auto &url : connected) {
fprintf(out, " |- connected url = %s\n", url.c_str());
}
- for (auto url : unconnected) {
+ for (auto &url : unconnected) {
fprintf(out, " |- unconnected url = %s\n", url.c_str());
}
}
http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 689ae20..7c2e9a0 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -119,7 +119,7 @@ private:
pubsub_serializer_service_t *svc;
} psa_nanomsg_serializer_entry_t;
ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
- ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
+ ProtectedMap<char*, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> topicSenders{};
ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
};
http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index d5ed28f..1c75e71 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -28,11 +28,9 @@
#include <nanomsg/bus.h>
-#include <pubsub_serializer.h>
#include <pubsub_constants.h>
#include <pubsub/publisher.h>
#include <pubsub_common.h>
-#include <log_helper.h>
#include "pubsub_nanomsg_topic_sender.h"
#include "pubsub_psa_nanomsg_constants.h"
#include "pubsub_nanomsg_common.h"
@@ -41,69 +39,47 @@
#define NANOMSG_BIND_MAX_RETRY 10
#define L_DEBUG(...) \
- logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+ logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
#define L_INFO(...) \
- logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+ logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
#define L_WARN(...) \
- logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+ logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
#define L_ERROR(...) \
- logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-
-struct pubsub_nanomsg_topic_sender {
- celix_bundle_context_t *ctx;
- log_helper_t *logHelper;
- long serializerSvcId;
- pubsub_serializer_service_t *serializer;
-
- char *scope;
- char *topic;
- char scopeAndTopicFilter[5];
- char *url;
-
- struct {
- celix_thread_mutex_t mutex;
- int socket;
- } nanomsg;
-
- struct {
- long svcId;
- celix_service_factory_t factory;
- } publisher;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = bndId, value = psa_nanomsg_bounded_service_entry_t
- } boundedServices;
-};
+ logHelper_log(logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
typedef struct psa_nanomsg_bounded_service_entry {
- pubsub_nanomsg_topic_sender_t *parent;
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent;
pubsub_publisher_t service;
long bndId;
hash_map_t *msgTypes;
int getCount;
} psa_nanomsg_bounded_service_entry_t;
-static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
- const celix_properties_t *svcProperties);
-static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
- const celix_properties_t *svcProperties);
+//static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+// const celix_properties_t *svcProperties);
+//static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+// const celix_properties_t *svcProperties);
static unsigned int rand_range(unsigned int min, unsigned int max);
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender);
+static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender);
static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg);
-pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
- const char *scope, const char *topic,
- long serializerSvcId, pubsub_serializer_service_t *ser,
- const char *bindIP, unsigned int basePort,
- unsigned int maxPort) {
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(calloc(1, sizeof(*sender)));
- sender->ctx = ctx;
- sender->logHelper = logHelper;
- sender->serializerSvcId = serializerSvcId;
- sender->serializer = ser;
- psa_nanomsg_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx,
+ log_helper_t *_logHelper,
+ const char *_scope,
+ const char *_topic,
+ long _serializerSvcId,
+ pubsub_serializer_service_t *_ser,
+ const char *_bindIp,
+ unsigned int _basePort,
+ unsigned int _maxPort) :
+ ctx{_ctx},
+ logHelper{_logHelper},
+ serializerSvcId {_serializerSvcId},
+ serializer{_ser}{
+
+ psa_nanomsg_setScopeAndTopicFilter(_scope, _topic, scopeAndTopicFilter);
//setting up nanomsg socket for nanomsg TopicSender
{
@@ -116,10 +92,10 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
int rv = -1, retry=0;
while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
/* Randomized part due to same bundle publishing on different topics */
- unsigned int port = rand_range(basePort,maxPort);
- size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
- char *url = static_cast<char*>(calloc(len, sizeof(char*)));
- snprintf(url, len, "tcp://%s:%u", bindIP, port);
+ unsigned int port = rand_range(_basePort,_maxPort);
+ size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", _bindIp, port) + 1;
+ char *_url = static_cast<char*>(calloc(len, sizeof(char*)));
+ snprintf(_url, len, "tcp://%s:%u", _bindIp, port);
len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
@@ -127,165 +103,155 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
rv = nn_bind (socket, bindUrl);
if (rv == -1) {
perror("Error for nn_bind");
- free(url);
+ free(_url);
} else {
- sender->url = url;
- sender->nanomsg.socket = socket;
+ this->url = _url;
+ nanomsg.socket = socket;
}
retry++;
free(bindUrl);
}
}
- if (sender->url != NULL) {
- sender->scope = strndup(scope, 1024 * 1024);
- sender->topic = strndup(topic, 1024 * 1024);
+ if (url != NULL) {
+ scope = strndup(_scope, 1024 * 1024);
+ topic = strndup(_topic, 1024 * 1024);
- celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
- celixThreadMutex_create(&sender->nanomsg.mutex, NULL);
- sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+ celixThreadMutex_create(&boundedServices.mutex, NULL);
+ celixThreadMutex_create(&nanomsg.mutex, NULL);
+ boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
}
//register publisher services using a service factory
- if (sender->url != NULL) {
- sender->publisher.factory.handle = sender;
- sender->publisher.factory.getService = psa_nanomsg_getPublisherService;
- sender->publisher.factory.ungetService = psa_nanomsg_ungetPublisherService;
+ if (url != NULL) {
+ publisher.factory.handle = this;
+ publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
+ return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService(
+ requestingBundle,
+ svcProperties);
+ };
+ publisher.factory.ungetService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
+ return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService(
+ requestingBundle,
+ svcProperties);
+ };
celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
- celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+ celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic);
+ celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope);
celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
- opts.factory = &sender->publisher.factory;
+ opts.factory = &publisher.factory;
opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
opts.properties = props;
- sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
- }
-
- if (sender->url == NULL) {
- free(sender);
- sender = NULL;
+ publisher.svcId = celix_bundleContext_registerServiceWithOptions(_ctx, &opts);
}
- return sender;
}
-void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender) {
- if (sender != NULL) {
- celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() {
+ celix_bundleContext_unregisterService(ctx, publisher.svcId);
- nn_close(sender->nanomsg.socket);
+ nn_close(nanomsg.socket);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
- if (entry != NULL) {
- sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
- free(entry);
- }
+ celixThreadMutex_lock(&boundedServices.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(boundedServices.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
+ if (entry != NULL) {
+ serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
+ free(entry);
}
- hashMap_destroy(sender->boundedServices.map, false, false);
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
-
- celixThreadMutex_destroy(&sender->boundedServices.mutex);
- celixThreadMutex_destroy(&sender->nanomsg.mutex);
-
- free(sender->scope);
- free(sender->topic);
- free(sender->url);
- free(sender);
}
-}
+ hashMap_destroy(boundedServices.map, false, false);
+ celixThreadMutex_unlock(&boundedServices.mutex);
-long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender) {
- return sender->serializerSvcId;
-}
+ celixThreadMutex_destroy(&boundedServices.mutex);
+ celixThreadMutex_destroy(&nanomsg.mutex);
-const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender) {
- return sender->scope;
+ free(scope);
+ free(topic);
+ free(url);
}
-const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender) {
- return sender->topic;
+long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const {
+ return serializerSvcId;
}
-const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender) {
- return sender->url;
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const {
+ return scope;
}
-void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
- //TODO subscriber count -> topic info
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const {
+ return topic;
}
-void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) {
- //TODO
+const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const {
+ return url;
}
-static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+
+void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties __attribute__((unused))) {
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
long bndId = celix_bundle_getId(requestingBundle);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+ celixThreadMutex_lock(&boundedServices.mutex);
+ psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId));
if (entry != NULL) {
entry->getCount += 1;
} else {
entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(calloc(1, sizeof(*entry)));
entry->getCount = 1;
- entry->parent = sender;
+ entry->parent = this;
entry->bndId = bndId;
- int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+ int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
if (rc == 0) {
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_nanoMsg_localMsgTypeIdForMsgType;
entry->service.send = psa_nanomsg_topicPublicationSend;
entry->service.sendMultipart = NULL; //not supported TODO remove
- hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+ hashMap_put(boundedServices.map, (void*)bndId, entry);
} else {
- L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", sender->scope, sender->topic);
+ L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", scope, topic);
}
}
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
+ celixThreadMutex_unlock(&boundedServices.mutex);
return &entry->service;
}
-static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties __attribute__((unused))) {
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle);
long bndId = celix_bundle_getId(requestingBundle);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId));
+ celixThreadMutex_lock(&boundedServices.mutex);
+ psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId));
if (entry != NULL) {
entry->getCount -= 1;
}
if (entry != NULL && entry->getCount == 0) {
//free entry
- hashMap_remove(sender->boundedServices.map, (void*)bndId);
- int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+ hashMap_remove(boundedServices.map, (void*)bndId);
+ int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
if (rc != 0) {
L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
}
free(entry);
}
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
+ celixThreadMutex_unlock(&boundedServices.mutex);
}
static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) {
int status = CELIX_SUCCESS;
psa_nanomsg_bounded_service_entry_t *bound = static_cast<psa_nanomsg_bounded_service_entry_t*>(handle);
- pubsub_nanomsg_topic_sender_t *sender = bound->parent;
+ pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = bound->parent;
pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId));
@@ -329,27 +295,27 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId
int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 );
free(serializedOutput);
if (rc < 0) {
- L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno));
+ //TODO L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno));
} else {
- L_INFO("[PSA_ZMQ_TS] Send message with size %d\n", rc);
- L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n", msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
+ //TODO L_INFO("[PSA_ZMQ_TS] Send message with size %d\n", rc);
+ //TODO L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n", msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
}
} else {
- L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
+ //TODO L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
}
} else {
status = CELIX_SERVICE_EXCEPTION;
- L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+ //TODO L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
}
return status;
}
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) {
+static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender */*sender*/) {
static bool firstSend = true;
if(firstSend){
- L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+ //TODO L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
sleep(FIRST_SEND_DELAY_IN_SECONDS);
firstSend = false;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
index ec85c37..90ab6ce 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
@@ -20,23 +20,63 @@
#define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
#include "celix_bundle_context.h"
+#include <log_helper.h>
+#include <pubsub_serializer.h>
-typedef struct pubsub_nanomsg_topic_sender pubsub_nanomsg_topic_sender_t;
+namespace pubsub {
+ namespace nanomsg {
+ class pubsub_nanomsg_topic_sender {
+ public:
+ pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, const char *_scope,
+ const char *_topic, long _serializerSvcId, pubsub_serializer_service_t *_ser,
+ const char *_bindIp, unsigned int _basePort, unsigned int _maxPort);
-pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper,
- const char *scope, const char *topic,
- long serializerSvcId, pubsub_serializer_service_t *ser,
- const char *bindIP, unsigned int basePort,
- unsigned int maxPort);
-void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender);
+ ~pubsub_nanomsg_topic_sender();
-const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender);
-const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender);
-const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender);
+ pubsub_nanomsg_topic_sender(const pubsub_nanomsg_topic_sender &) = delete;
-long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender);
+ const pubsub_nanomsg_topic_sender &operator=(const pubsub_nanomsg_topic_sender &) = delete;
-void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
-void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint);
+ long getSerializerSvcId() const ;
+ const char *getScope() const ;
+ const char *getTopic() const ;
+ const char *getUrl() const;
+
+ void* getPublisherService(const celix_bundle_t *requestingBundle,
+ const celix_properties_t *svcProperties __attribute__((unused)));
+ void ungetPublisherService(const celix_bundle_t *requestingBundle,
+ const celix_properties_t *svcProperties __attribute__((unused)));
+ int topicPublicationSend(unsigned int msgTypeId, const void *inMsg);
+ void delay_first_send_for_late_joiners() ;
+
+
+ //private:
+ celix_bundle_context_t *ctx;
+ log_helper_t *logHelper;
+ long serializerSvcId;
+ pubsub_serializer_service_t *serializer;
+
+ char *scope{};
+ char *topic{};
+ char scopeAndTopicFilter[5];
+ char *url{};
+
+ struct {
+ celix_thread_mutex_t mutex;
+ int socket;
+ } nanomsg{};
+
+ struct {
+ long svcId;
+ celix_service_factory_t factory;
+ } publisher{};
+
+ struct {
+ celix_thread_mutex_t mutex{};
+ hash_map_t *map{}; //key = bndId, value = psa_nanomsg_bounded_service_entry_t
+ } boundedServices{};
+ };
+ }
+}
#endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
[4/8] celix git commit: nanomsg celix-map replaced by std::map
Posted by er...@apache.org.
nanomsg celix-map replaced by std::map
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/8658738d
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/8658738d
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/8658738d
Branch: refs/heads/nanomsg
Commit: 8658738d5e9a905eb0642ea605d36e50dc24730a
Parents: 0abbf43
Author: Erjan Altena <er...@gmail.com>
Authored: Wed Nov 21 21:28:32 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Wed Nov 21 21:28:32 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_topic_receiver.cc | 128 +++++++++----------
.../src/pubsub_nanomsg_topic_receiver.h | 37 +++++-
2 files changed, 95 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/8658738d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 88886c6..8acf6b1 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -63,18 +63,11 @@
#define L_ERROR printf
-typedef struct psa_zmq_requested_connection_entry {
- char *url;
- bool connected;
- int id;
-} psa_nanomsg_requested_connection_entry_t;
-static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
- const celix_bundle_t *owner);
-static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
- const celix_bundle_t *owner);
+//static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
+// const celix_bundle_t *owner);
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
@@ -83,7 +76,6 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
const char *_topic,
long _serializerSvcId,
pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
- //pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
ctx = _ctx;
logHelper = _logHelper;
serializer = _serializer;
@@ -107,14 +99,13 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
char subscribeFilter[5];
psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
- //zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter);
m_scope = strndup(m_scope, 1024 * 1024);
m_topic = strndup(m_topic, 1024 * 1024);
subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
- requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
char buf[size + 1];
@@ -124,8 +115,12 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.filter = buf;
opts.callbackHandle = this;
- opts.addWithOwner = pubsub_nanomsgTopicReceiver_addSubscriber;
- opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
+ opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) {
+ static_cast<topic_receiver*>(handle)->addSubscriber(svc, props, svcOwner);
+ };
+ opts.removeWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) {
+ static_cast<topic_receiver*>(handle)->removeSubscriber(svc, props, svcOwner);
+ };
subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
recvThread.running = true;
@@ -159,18 +154,18 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
}
- {
- std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
- iter = hashMapIterator_construct(requestedConnections.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
- if (entry != NULL) {
- free(entry->url);
- free(entry);
- }
- }
- hashMap_destroy(requestedConnections.map, false, false);
- }
+// {
+// std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+// iter = hashMapIterator_construct(requestedConnections.map);
+// while (hashMapIterator_hasNext(&iter)) {
+// psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
+// if (entry != NULL) {
+// free(entry->url);
+// free(entry);
+// }
+// }
+// hashMap_destroy(requestedConnections.map, false, false);
+// }
//celixThreadMutex_destroy(&receiver->subscribers.mutex);
//celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
@@ -196,13 +191,11 @@ long pubsub::nanomsg::topic_receiver::serializerSvcId() const {
void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls,
std::vector<std::string> &unconnectedUrls) {
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(requestedConnections.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter));
- if (entry->connected) {
- connectedUrls.push_back(std::string(entry->url));
+ for (auto entry : requestedConnections.map) {
+ if (entry.second.isConnected()) {
+ connectedUrls.push_back(std::string(entry.second.getUrl()));
} else {
- unconnectedUrls.push_back(std::string(entry->url));
+ unconnectedUrls.push_back(std::string(entry.second.getUrl()));
}
}
}
@@ -212,18 +205,18 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
- psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(requestedConnections.map, url));
- if (entry == NULL) {
- entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry)));
- entry->url = strndup(url, 1024*1024);
- entry->connected = false;
- hashMap_put(requestedConnections.map, (void*)entry->url, entry);
+ auto entry = requestedConnections.map.find(url);
+ if (entry == requestedConnections.map.end()) {
+ requestedConnections.map.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(std::string(url)),
+ std::forward_as_tuple(url, -1));
}
- if (!entry->connected) {
+ if (!entry->second.isConnected()) {
int connection_id = nn_connect(m_nanoMsgSocket, url);
if (connection_id >= 0) {
- entry->connected = true;
- entry->id = connection_id;
+ entry->second.setConnected(true);
+ entry->second.setId(connection_id);
} else {
L_WARN("[PSA_NANOMSG] Error connecting to NANOMSG url %s. (%s)", url, strerror(errno));
}
@@ -234,33 +227,34 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
- psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(requestedConnections.map, url));
- if (entry != NULL && entry->connected) {
- if (nn_shutdown(m_nanoMsgSocket, entry->id) == 0) {
- entry->connected = false;
- } else {
- L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno));
+ auto entry = requestedConnections.map.find(url);
+ if (entry != requestedConnections.map.end()) {
+ if (entry->second.isConnected()) {
+ if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) {
+ entry->second.setConnected(false);
+ } else {
+ L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->second.getId(),
+ strerror(errno));
+ }
}
- }
- if (entry != NULL) {
- free(entry->url);
- free(entry);
+ requestedConnections.map.erase(url);
+ std::cerr << "REMOVING connection " << url << std::endl;
+ } else {
+ std::cerr << "Disconnecting from unknown URL " << url << std::endl;
}
}
-static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props,
const celix_bundle_t *bnd) {
- auto *receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
-
long bndId = celix_bundle_getId(bnd);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
- if (strncmp(subScope, receiver->m_scope, strlen(receiver->m_scope)) != 0) {
+ if (strncmp(subScope, m_scope, strlen(m_scope)) != 0) {
//not the same scope. ignore
return;
}
- std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId));
+ std::lock_guard<std::mutex> _lock(subscribers.mutex);
+ psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
if (entry != NULL) {
entry->usageCount += 1;
} else {
@@ -269,33 +263,31 @@ static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, c
entry->usageCount = 1;
entry->svc = static_cast<pubsub_subscriber_t*>(svc);
- int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+ int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
if (rc == 0) {
- hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+ hashMap_put(subscribers.map, (void*)bndId, entry);
} else {
- L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
+ L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic);
free(entry);
}
}
}
-static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
+void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
- auto receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
-
long bndId = celix_bundle_getId(bnd);
- std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId));
+ std::lock_guard<std::mutex> _lock(subscribers.mutex);
+ psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
if (entry != NULL) {
entry->usageCount -= 1;
}
if (entry != NULL && entry->usageCount <= 0) {
//remove entry
- hashMap_remove(receiver->subscribers.map, (void*)bndId);
- int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ hashMap_remove(subscribers.map, (void*)bndId);
+ int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
if (rc != 0) {
- L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
+ L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
}
free(entry);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/8658738d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index f977917..3398fb1 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -21,6 +21,7 @@
#include <vector>
#include <thread>
#include <mutex>
+#include <map>
#include "pubsub_serializer.h"
#include "log_helper.h"
#include "celix_bundle_context.h"
@@ -33,6 +34,34 @@ typedef struct psa_zmq_subscriber_entry {
pubsub_subscriber_t *svc;
} psa_nanomsg_subscriber_entry_t;
+typedef struct psa_zmq_requested_connection_entry {
+public:
+ psa_zmq_requested_connection_entry(std::string _url, int _id, bool _connected=false):
+ url{_url}, id{_id}, connected{_connected} {
+ }
+ bool isConnected() const {
+ return connected;
+ }
+
+ int getId() const {
+ return id;
+ }
+
+ void setId(int _id) {
+ id = _id;
+ }
+ void setConnected(bool c) {
+ connected = c;
+ }
+
+ const std::string &getUrl() const {
+ return url;
+ }
+private:
+ std::string url;
+ int id;
+ bool connected;
+} psa_nanomsg_requested_connection_entry_t;
namespace pubsub {
namespace nanomsg {
@@ -58,7 +87,10 @@ namespace pubsub {
void recvThread_exec();
void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize);
void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
- //private:
+ void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+ void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd);
+
+ private:
celix_bundle_context_t *ctx{nullptr};
log_helper_t *logHelper{nullptr};
long m_serializerSvcId{0};
@@ -77,7 +109,8 @@ namespace pubsub {
struct {
std::mutex mutex;
- hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+ std::map<std::string, psa_nanomsg_requested_connection_entry_t> map;
+ //hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
} requestedConnections{};
long subscriberTrackerId{0};
[7/8] celix git commit: Nanomsg
Posted by er...@apache.org.
Nanomsg
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/cdefb0d6
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/cdefb0d6
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/cdefb0d6
Branch: refs/heads/nanomsg
Commit: cdefb0d665b27a41f360599598ff489b322b4405
Parents: 883abee
Author: Erjan Altena <er...@gmail.com>
Authored: Mon Nov 26 20:23:11 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Mon Nov 26 20:23:11 2018 +0100
----------------------------------------------------------------------
.../log_service/loghelper_include/log_helper.h | 2 +-
bundles/log_service/src/log_helper.c | 2 +-
.../src/pubsub_nanomsg_admin.cc | 62 ++++++++------------
.../src/pubsub_nanomsg_admin.h | 10 ++--
.../src/pubsub_nanomsg_topic_receiver.cc | 10 ++--
.../src/pubsub_nanomsg_topic_sender.cc | 11 +---
6 files changed, 42 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/loghelper_include/log_helper.h
----------------------------------------------------------------------
diff --git a/bundles/log_service/loghelper_include/log_helper.h b/bundles/log_service/loghelper_include/log_helper.h
index 28e6877..af058eb 100644
--- a/bundles/log_service/loghelper_include/log_helper.h
+++ b/bundles/log_service/loghelper_include/log_helper.h
@@ -33,7 +33,7 @@ celix_status_t logHelper_create(bundle_context_pt context, log_helper_pt* log_he
celix_status_t logHelper_start(log_helper_pt loghelper);
celix_status_t logHelper_stop(log_helper_pt loghelper);
celix_status_t logHelper_destroy(log_helper_pt* loghelper);
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... );
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... );
#ifdef __cplusplus
}
#endif
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/src/log_helper.c
----------------------------------------------------------------------
diff --git a/bundles/log_service/src/log_helper.c b/bundles/log_service/src/log_helper.c
index 6570357..e9939ed 100644
--- a/bundles/log_service/src/log_helper.c
+++ b/bundles/log_service/src/log_helper.c
@@ -156,7 +156,7 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... )
{
celix_status_t status = CELIX_SUCCESS;
va_list listPointer;
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 030441d..cf516ee 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -32,20 +32,15 @@
#include "pubsub_utils.h"
#include "pubsub_nanomsg_admin.h"
#include "pubsub_psa_nanomsg_constants.h"
-/*
-//#define L_DEBUG(...) \
-// logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
-//#define L_INFO(...) \
-// logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
-//#define L_WARN(...) \
-// logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
-//#define L_ERROR(...) \
-// logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-*/
-#define L_DEBUG printf
-#define L_INFO printf
-#define L_WARN printf
-#define L_ERROR printf
+
+#define L_DEBUG(...) \
+ logHelper_log(log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
@@ -125,10 +120,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
{
std::lock_guard<std::mutex> lock(serializers.mutex);
- // todo: do not use pointer but type in map
- for(auto kv: serializers.map) {
- free(kv.second);
- }
+ serializers.map.clear();
}
free(ipAddress);
@@ -229,11 +221,14 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t
std::lock_guard<std::mutex> lock(serializers.mutex);
auto it = serializers.map.find(svcId);
if (it == serializers.map.end()) {
- auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
- entry->serType = serType;
- entry->svcId = svcId;
- entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
- serializers.map[svcId] = entry;
+ serializers.map.emplace(std::piecewise_construct,
+ std::forward_as_tuple(svcId),
+ std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc)));
+// auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
+// entry->serType = serType;
+// entry->svcId = svcId;
+// entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
+// serializers.map[svcId] = entry;
}
}
}
@@ -250,18 +245,14 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
std::lock_guard<std::mutex> lock(serializers.mutex);
- psa_nanomsg_serializer_entry_t* entry = nullptr;
auto kvsm = serializers.map.find(svcId);
if (kvsm != serializers.map.end()) {
- entry = kvsm->second;
- }
- serializers.map.erase(svcId);
- if (entry != nullptr) {
+ auto &entry = kvsm->second;
{
std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
for (auto kv: topicSenders.map) {
auto *sender = kv.second;
- if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+ if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
char *key = kv.first;
topicSenders.map.erase(kv.first);
pubsub_nanoMsgTopicSender_destroy(sender);
@@ -274,7 +265,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
for (auto kv : topicReceivers.map){
auto *receiver = kv.second;
- if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
+ if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
auto key = kv.first;
topicReceivers.map.erase(key);
delete receiver;
@@ -282,7 +273,6 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
}
}
- free(entry);
}
}
@@ -340,7 +330,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
psa_nanomsg_serializer_entry_t *serEntry = nullptr;
auto kv = serializers.map.find(serializerSvcId);
if (kv != serializers.map.end()) {
- serEntry = kv->second;
+ serEntry = &kv->second;
}
if (serEntry != nullptr) {
sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
@@ -418,13 +408,13 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope
auto kvs = serializers.map.find(serializerSvcId);
if (kvs != serializers.map.end()) {
auto serEntry = kvs->second;
- receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+ receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry.svc);
} else {
L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope.c_str(), topic.c_str());
}
if (receiver != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
- const char *serType = kvs->second->serType;
+ const char *serType = kvs->second.serType;
newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
serType, nullptr);
//if available also set container name
@@ -577,7 +567,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
pubsub_nanomsg_topic_sender_t *sender = kvts.second;
long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
auto kvs = serializers.map.find(serSvcId);
- const char *serType = kvs->second == nullptr ? "!Error!" : kvs->second->serType;
+ const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType);
const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -596,7 +586,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
pubsub::nanomsg::topic_receiver *receiver = entry.second;
long serSvcId = receiver->serializerSvcId();
auto kv = serializers.map.find(serSvcId);
- const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
+ const char *serType = (kv == serializers.map.end() ? "!Error!" : kv->second.serType);
auto scope = receiver->scope();
auto topic = receiver->topic();
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index b33a3c0..689ae20 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -43,8 +43,6 @@
#define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1"
-//typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
-
template <typename key, typename value>
struct ProtectedMap {
std::mutex mutex{};
@@ -111,11 +109,16 @@ private:
bool verbose{};
typedef struct psa_nanomsg_serializer_entry {
+ psa_nanomsg_serializer_entry(const char*_serType, long _svcId, pubsub_serializer_service_t *_svc) :
+ serType{_serType}, svcId{_svcId}, svc{_svc} {
+
+ }
+
const char *serType;
long svcId;
pubsub_serializer_service_t *svc;
} psa_nanomsg_serializer_entry_t;
- ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
+ ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
@@ -131,4 +134,3 @@ extern "C" {
#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
-
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index db8469b..9f77a4c 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -158,7 +158,7 @@ void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &
void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
- L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
+ L_DEBUG("[PSA_NANOMSG] TopicReceiver %s/%s connecting to nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -181,7 +181,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
}
void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
- L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
+ L_DEBUG("[PSA NANOMSG] TopicReceiver %s/%s disconnect from nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -316,13 +316,13 @@ void pubsub::nanomsg::topic_receiver::recvThread_exec() {
processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
nn_freemsg(msg);
} else if (recvBytes >= 0) {
- L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
+ L_ERROR("[PSA_NANOMSG_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
} else if (errno == EAGAIN || errno == ETIMEDOUT) {
//nop
} else if (errno == EINTR) {
- L_DEBUG("[PSA_ZMQ_TR] zmsg_recv interrupted");
+ L_DEBUG("[PSA_NANOMSG_TR] nn_recvmsg interrupted");
} else {
- L_WARN("[PSA_ZMQ_TR] Error receiving zmq message: errno %d: %s\n", errno, strerror(errno));
+ L_WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno %d: %s\n", errno, strerror(errno));
}
} // while
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index ff0d4f7..d5ed28f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -37,9 +37,9 @@
#include "pubsub_psa_nanomsg_constants.h"
#include "pubsub_nanomsg_common.h"
-#define FIRST_SEND_DELAY_IN_SECONDS 2
+#define FIRST_SEND_DELAY_IN_SECONDS 2
#define NANOMSG_BIND_MAX_RETRY 10
-/*
+
#define L_DEBUG(...) \
logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
#define L_INFO(...) \
@@ -48,11 +48,6 @@
logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
#define L_ERROR(...) \
logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-*/
-#define L_DEBUG printf
-#define L_INFO printf
-#define L_WARN printf
-#define L_ERROR printf
struct pubsub_nanomsg_topic_sender {
celix_bundle_context_t *ctx;
@@ -349,7 +344,7 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId
return status;
}
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t */*sender*/) {
+static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) {
static bool firstSend = true;