You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:03:02 UTC
[6/8] celix git commit: Nanomsg: moved charptr to std::string
Nanomsg: moved charptr to std::string
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/883abeed
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/883abeed
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/883abeed
Branch: refs/heads/nanomsg
Commit: 883abeed00cfa3c1509b026fea888550b863defc
Parents: 120895d
Author: Erjan Altena <er...@gmail.com>
Authored: Fri Nov 23 22:35:30 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Fri Nov 23 22:35:30 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_admin.cc | 56 +++++++-----------
.../src/pubsub_nanomsg_admin.h | 4 +-
.../src/pubsub_nanomsg_common.cc | 8 +--
.../src/pubsub_nanomsg_common.h | 3 +-
.../src/pubsub_nanomsg_topic_receiver.cc | 60 +++++++-------------
.../src/pubsub_nanomsg_topic_receiver.h | 26 ++++-----
6 files changed, 63 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 3e788ae..030441d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -159,7 +159,7 @@ void pubsub_nanomsg_admin::start() {
};
adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
auto me = static_cast<pubsub_nanomsg_admin*>(handle);
- return me->setupTopicReceiver(scope, topic,serializerSvcId, subscriberEndpoint);
+ return me->setupTopicReceiver(std::string(scope), std::string(topic),serializerSvcId, subscriberEndpoint);
};
adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) {
@@ -205,7 +205,7 @@ void pubsub_nanomsg_admin::start() {
celix_properties_t* shellProps = celix_properties_create();
celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
- celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
+ celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the nanomsg PSA");
cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps);
}
@@ -275,10 +275,9 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
for (auto kv : topicReceivers.map){
auto *receiver = kv.second;
if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
- char *key = kv.first;
+ auto key = kv.first;
topicReceivers.map.erase(key);
delete receiver;
- free(key);
}
}
}
@@ -338,8 +337,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
sender = topicSenders.map.find(key)->second;
if (sender == nullptr) {
- //auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
- // (void *) serializerSvcId));
psa_nanomsg_serializer_entry_t *serEntry = nullptr;
auto kv = serializers.map.find(serializerSvcId);
if (kv != serializers.map.end()) {
@@ -403,12 +400,12 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
return status;
}
-celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const char *topic,
+celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope, const std::string &topic,
long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
celix_properties_t *newEndpoint = nullptr;
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ std::string key = pubsubEndpoint_createScopeTopicKey(scope.c_str(), topic.c_str());
pubsub::nanomsg::topic_receiver * receiver = nullptr;
{
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
@@ -423,12 +420,12 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
auto serEntry = kvs->second;
receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
} else {
- L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
+ L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope.c_str(), topic.c_str());
}
if (receiver != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
const char *serType = kvs->second->serType;
- newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+ newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
serType, nullptr);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
@@ -438,11 +435,9 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
topicReceivers.map[key] = receiver;
} else {
L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
- free(key);
}
} else {
- free(key);
- L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+ L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope.c_str(), topic.c_str());
}
}
if (receiver != nullptr && newEndpoint != nullptr) {
@@ -470,11 +465,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co
auto entry = topicReceivers.map.find(key);
free(key);
if (entry != topicReceivers.map.end()) {
- char *receiverKey = entry->first;
+ auto receiverKey = entry->first;
pubsub::nanomsg::topic_receiver *receiver = entry->second;
topicReceivers.map.erase(receiverKey);
- free(receiverKey);
delete receiver;
}
@@ -487,22 +481,18 @@ celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = receiver->scope();
- const char *topic = receiver->topic();
+ auto scope = receiver->scope();
+ auto topic = receiver->topic();
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+ std::string eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
+ std::string eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
if (url == nullptr) {
-// const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, nullptr);
-// const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
// L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type);
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != nullptr && eTopic != nullptr &&
- strncmp(eScope, scope, 1024 * 1024) == 0 &&
- strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ if ((eScope == scope) && (eTopic == topic)) {
receiver->connectTo(url);
}
}
@@ -537,20 +527,18 @@ celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nano
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = receiver->scope();
- const char *topic = receiver->topic();
+ auto scope = receiver->scope();
+ auto topic = receiver->topic();
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+ auto eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
+ auto eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "");
const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
if (url == nullptr) {
L_WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != nullptr && eTopic != nullptr &&
- strncmp(eScope, scope, 1024 * 1024) == 0 &&
- strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ if ((eScope == scope) && (eTopic == topic)) {
receiver->disconnectFrom(url);
}
}
@@ -609,14 +597,14 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
long serSvcId = receiver->serializerSvcId();
auto kv = serializers.map.find(serSvcId);
const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
- const char *scope = receiver->scope();
- const char *topic = receiver->topic();
+ auto scope = receiver->scope();
+ auto topic = receiver->topic();
std::vector<std::string> connected{};
std::vector<std::string> unconnected{};
receiver->listConnections(connected, unconnected);
- fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+ fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str());
fprintf(out, " |- serializer type = %s\n", serType);
for (auto url : connected) {
fprintf(out, " |- connected url = %s\n", url.c_str());
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 3e680b6..b33a3c0 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -74,7 +74,7 @@ private:
long serializerSvcId, celix_properties_t **publisherEndpoint);
celix_status_t teardownTopicSender(const char *scope, const char *topic);
- celix_status_t setupTopicReceiver(const char *scope, const char *topic,
+ celix_status_t setupTopicReceiver(const std::string &scope, const std::string &topic,
long serializerSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t teardownTopicReceiver(const char *scope, const char *topic);
@@ -117,7 +117,7 @@ private:
} psa_nanomsg_serializer_entry_t;
ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
- ProtectedMap<char*, pubsub::nanomsg::topic_receiver*> topicReceivers{};
+ ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
};
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
index 2a2bcfe..3ecd19c 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
@@ -41,15 +41,15 @@ bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_he
return check;
}
-void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter) {
- for (int i = 0; i < 5; ++i) {
+void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter) {
+ for (int i = 0; i < 5; ++i) { // 5 ??
filter[i] = '\0';
}
- if (scope != NULL && strnlen(scope, 3) >= 2) {
+ if (scope.size() >= 2) { //3 ??
filter[0] = scope[0];
filter[1] = scope[1];
}
- if (topic != NULL && strnlen(topic, 3) >= 2) {
+ if (topic.size() >= 2) { //3 ??
filter[2] = topic[0];
filter[3] = topic[1];
}
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
index 3d5d48d..276169f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -20,6 +20,7 @@
#ifndef CELIX_PUBSUB_ZMQ_COMMON_H
#define CELIX_PUBSUB_ZMQ_COMMON_H
+#include <string>
#include <utils.h>
#include "version.h"
@@ -48,7 +49,7 @@ typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t;
int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
-void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter);
+void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter);
bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_header_t *hdr);
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 2205ed2..db8469b 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -65,8 +65,8 @@
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
log_helper_t *_logHelper,
- const char *_scope,
- const char *_topic,
+ const std::string &_scope,
+ const std::string &_topic,
long _serializerSvcId,
pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
ctx = _ctx;
@@ -76,33 +76,22 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
if (m_nanoMsgSocket < 0) {
- // TODO throw error or something
- //free(receiver);
- //receiver = NULL;
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope, m_topic);
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope.c_str(), m_topic.c_str());
+ std::bad_alloc{};
} else {
int timeout = PSA_NANOMSG_RECV_TIMEOUT;
if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
sizeof (timeout)) < 0) {
- // TODO throw error or something
- //free(receiver);
- //receiver = NULL;
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope, m_topic);
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope.c_str(), m_topic.c_str());
+ std::bad_alloc{};
}
- char subscribeFilter[5];
- psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
+ char subscriberFilter[5]; // 5 ??
+ psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscriberFilter);
- m_scope = strndup(m_scope, 1024 * 1024);
- m_topic = strndup(m_topic, 1024 * 1024);
-
- //subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
- //std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
- //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
- int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
+ int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str());
char buf[size + 1];
- snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
+ snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str());
celix_service_tracking_options_t opts{};
opts.filter.ignoreServiceLanguage = true;
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
@@ -141,14 +130,13 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
}
nn_close(m_nanoMsgSocket);
- free((void*)m_scope);
- free((void*)m_topic);
}
-const char* pubsub::nanomsg::topic_receiver::scope() const {
+std::string pubsub::nanomsg::topic_receiver::scope() const {
return m_scope;
}
-const char* pubsub::nanomsg::topic_receiver::topic() const {
+
+std::string pubsub::nanomsg::topic_receiver::topic() const {
return m_topic;
}
@@ -170,7 +158,7 @@ void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &
void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
- L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url);
+ L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -193,7 +181,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
}
void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
- L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url);
+ L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -216,8 +204,8 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props,
const celix_bundle_t *bnd) {
long bndId = celix_bundle_getId(bnd);
- const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
- if (strncmp(subScope, m_scope, strlen(m_scope)) != 0) {
+ std::string subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ if (subScope != m_scope) {
//not the same scope. ignore
return;
}
@@ -232,16 +220,10 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_prope
std::forward_as_tuple(bndId),
std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1));
entry = subscribers.map.find(bndId);
- if (entry == subscribers.map.end()) {
- std::cerr << "### THIS IS A VERY CRITICAL ERROR!!\n";
- }
int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes);
- if (rc == 0) {
- //hashMap_put(subscribers.map, (void*)bndId, entry);
- //subscribers.map[bndId] = *entry;
- } else {
- L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic);
+ if (rc != 0) {
+ L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str());
subscribers.map.erase(bndId);
}
}
@@ -259,14 +241,14 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
//remove entry
int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
if (rc != 0) {
- L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+ L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str());
}
subscribers.map.erase(bndId);
}
}
}
-void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
pubsub_subscriber_t *svc = entry->svc;
http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 09d62a9..2519e4a 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -28,18 +28,18 @@
#include "pubsub_nanomsg_common.h"
#include "pubsub/subscriber.h"
-typedef struct psa_nanomsg_subscriber_entry {
+struct psa_nanomsg_subscriber_entry {
psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) :
svc{_svc}, usageCount{_usageCount} {
}
pubsub_subscriber_t *svc{};
int usageCount;
hash_map_t *msgTypes{nullptr}; //map from serializer svc
-} psa_nanomsg_subscriber_entry_t;
+};
-typedef struct psa_zmq_requested_connection_entry {
+typedef struct psa_nanomsg_requested_connection_entry {
public:
- psa_zmq_requested_connection_entry(std::string _url, int _id, bool _connected=false):
+ psa_nanomsg_requested_connection_entry(std::string _url, int _id, bool _connected=false):
url{_url}, id{_id}, connected{_connected} {
}
bool isConnected() const {
@@ -73,23 +73,23 @@ namespace pubsub {
topic_receiver(celix_bundle_context_t
*ctx,
log_helper_t *logHelper,
- const char *scope,
- const char *topic,
+ const std::string &scope,
+ const std::string &topic,
long serializerSvcId, pubsub_serializer_service_t
*serializer);
topic_receiver(const topic_receiver &) = delete;
topic_receiver & operator=(const topic_receiver &) = delete;
~topic_receiver();
- const char* scope() const;
- const char* topic() const;
+ std::string scope() const;
+ std::string topic() const;
long serializerSvcId() const;
void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
void connectTo(const char *url);
void disconnectFrom(const char *url);
void recvThread_exec();
void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize);
- void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
+ void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd);
@@ -98,8 +98,8 @@ namespace pubsub {
log_helper_t *logHelper{nullptr};
long m_serializerSvcId{0};
pubsub_serializer_service_t *serializer{nullptr};
- const char *m_scope{nullptr};
- const char *m_topic{nullptr};
+ const std::string m_scope{};
+ const std::string m_topic{};
char m_scopeAndTopicFilter[5];
int m_nanoMsgSocket{0};
@@ -113,14 +113,12 @@ namespace pubsub {
struct {
std::mutex mutex;
std::map<std::string, psa_nanomsg_requested_connection_entry_t> map;
- //hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
} requestedConnections{};
long subscriberTrackerId{0};
struct {
std::mutex mutex;
- std::map<long, psa_nanomsg_subscriber_entry_t> map;
- //hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ std::map<long, psa_nanomsg_subscriber_entry> map;
} subscribers{};
};
}