You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:02:58 UTC
[2/8] celix git commit: nanomsg topicreceiver changed to class
nanomsg topicreceiver changed to class
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c19a5bd8
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c19a5bd8
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c19a5bd8
Branch: refs/heads/nanomsg
Commit: c19a5bd85b5c514794c6b38a1080b5382b8ca1ad
Parents: 95633eb
Author: Erjan Altena <er...@gmail.com>
Authored: Mon Nov 19 20:10:50 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Mon Nov 19 20:10:50 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_admin.cc | 44 ++---
.../src/pubsub_nanomsg_admin.h | 6 +-
.../src/pubsub_nanomsg_topic_receiver.cc | 162 +++++++++----------
.../src/pubsub_nanomsg_topic_receiver.h | 77 +++++++--
4 files changed, 169 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 6c15ec8..3e788ae 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -111,7 +111,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
{
std::lock_guard<std::mutex> lock(topicReceivers.mutex);
for (auto kv: topicReceivers.map) {
- pubsub_nanoMsgTopicReceiver_destroy(kv.second);
+ delete kv.second;
}
}
@@ -274,10 +274,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
for (auto kv : topicReceivers.map){
auto *receiver = kv.second;
- if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+ if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
char *key = kv.first;
topicReceivers.map.erase(key);
- pubsub_nanoMsgTopicReceiver_destroy(receiver);
+ delete receiver;
free(key);
}
}
@@ -409,7 +409,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
celix_properties_t *newEndpoint = nullptr;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- pubsub_nanomsg_topic_receiver_t * receiver = nullptr;
+ pubsub::nanomsg::topic_receiver * receiver = nullptr;
{
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
@@ -421,7 +421,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
auto kvs = serializers.map.find(serializerSvcId);
if (kvs != serializers.map.end()) {
auto serEntry = kvs->second;
- receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+ receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
} else {
L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
}
@@ -471,24 +471,24 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co
free(key);
if (entry != topicReceivers.map.end()) {
char *receiverKey = entry->first;
- pubsub_nanomsg_topic_receiver_t *receiver = entry->second;
+ pubsub::nanomsg::topic_receiver *receiver = entry->second;
topicReceivers.map.erase(receiverKey);
free(receiverKey);
- pubsub_nanoMsgTopicReceiver_destroy(receiver);
+ delete receiver;
}
celix_status_t status = CELIX_SUCCESS;
return status;
}
-celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
- const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+ const char *scope = receiver->scope();
+ const char *topic = receiver->topic();
const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
@@ -503,7 +503,7 @@ celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_to
if (eScope != nullptr && eTopic != nullptr &&
strncmp(eScope, scope, 1024 * 1024) == 0 &&
strncmp(eTopic, topic, 1024 * 1024) == 0) {
- pubsub_nanoMsgTopicReceiver_connectTo(receiver, url);
+ receiver->connectTo(url);
}
}
@@ -516,7 +516,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
for (auto entry: topicReceivers.map) {
- pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
+ pubsub::nanomsg::topic_receiver *receiver = entry.second;
connectEndpointToReceiver(receiver, endpoint);
}
}
@@ -532,13 +532,13 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo
}
-celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
- const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+ const char *scope = receiver->scope();
+ const char *topic = receiver->topic();
const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
@@ -551,7 +551,7 @@ celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanom
if (eScope != nullptr && eTopic != nullptr &&
strncmp(eScope, scope, 1024 * 1024) == 0 &&
strncmp(eTopic, topic, 1024 * 1024) == 0) {
- pubsub_nanoMsgTopicReceiver_disconnectFrom(receiver, url);
+ receiver->disconnectFrom(url);
}
}
@@ -564,7 +564,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en
if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
for (auto entry : topicReceivers.map) {
- pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
+ pubsub::nanomsg::topic_receiver *receiver = entry.second;
disconnectEndpointFromReceiver(receiver, endpoint);
}
}
@@ -605,16 +605,16 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
for (auto entry : topicReceivers.map) {
- pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
- long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
+ pubsub::nanomsg::topic_receiver *receiver = entry.second;
+ long serSvcId = receiver->serializerSvcId();
auto kv = serializers.map.find(serSvcId);
const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
- const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
- const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+ const char *scope = receiver->scope();
+ const char *topic = receiver->topic();
std::vector<std::string> connected{};
std::vector<std::string> unconnected{};
- pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
+ receiver->listConnections(connected, unconnected);
fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index c34a310..3e680b6 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -84,10 +84,10 @@ private:
celix_status_t executeCommand(char *commandLine __attribute__((unused)), FILE *out,
FILE *errStream __attribute__((unused)));
- celix_status_t connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+ celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver,
const celix_properties_t *endpoint);
- celix_status_t disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+ celix_status_t disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
const celix_properties_t *endpoint);
celix_bundle_context_t *ctx;
log_helper_t *log;
@@ -117,7 +117,7 @@ private:
} psa_nanomsg_serializer_entry_t;
ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
- ProtectedMap<char*, pubsub_nanomsg_topic_receiver_t*> topicReceivers{};
+ ProtectedMap<char*, pubsub::nanomsg::topic_receiver*> topicReceivers{};
ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
};
http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 42f6423..889d79d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -109,92 +109,96 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc
static void* psa_nanomsg_recvThread(void *data);
-pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
- log_helper_t *logHelper, const char *scope,
- const char *topic, long serializerSvcId,
- pubsub_serializer_service_t *serializer) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
- receiver->ctx = ctx;
- receiver->logHelper = logHelper;
- receiver->serializerSvcId = serializerSvcId;
- receiver->serializer = serializer;
- psa_nanomsg_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
-
-
- receiver->nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
- if (receiver->nanoMsgSocket < 0) {
- free(receiver);
- receiver = NULL;
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", scope, topic);
+//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+// log_helper_t *logHelper, const char *scope,
+// const char *topic, long serializerSvcId,
+// pubsub_serializer_service_t *serializer) {
+pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
+ log_helper_t *_logHelper,
+ const char *_scope,
+ const char *_topic,
+ long _serializerSvcId,
+ pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
+ //pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
+ ctx = _ctx;
+ logHelper = _logHelper;
+ serializer = _serializer;
+ psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, m_scopeAndTopicFilter);
+
+ m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
+ if (m_nanoMsgSocket < 0) {
+ // TODO throw error or something
+ //free(receiver);
+ //receiver = NULL;
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope, m_topic);
} else {
int timeout = PSA_NANOMSG_RECV_TIMEOUT;
- if (nn_setsockopt(receiver->nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
+ if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
sizeof (timeout)) < 0) {
- free(receiver);
- receiver = NULL;
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", scope, topic);
+ // TODO throw error or something
+ //free(receiver);
+ //receiver = NULL;
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope, m_topic);
}
char subscribeFilter[5];
- psa_nanomsg_setScopeAndTopicFilter(scope, topic, subscribeFilter);
+ psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
//zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter);
- receiver->scope = strndup(scope, 1024 * 1024);
- receiver->topic = strndup(topic, 1024 * 1024);
+ m_scope = strndup(m_scope, 1024 * 1024);
+ m_topic = strndup(m_topic, 1024 * 1024);
- receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
- receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
char buf[size + 1];
- snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
celix_service_tracking_options_t opts{};
opts.filter.ignoreServiceLanguage = true;
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.filter = buf;
- opts.callbackHandle = receiver;
+ opts.callbackHandle = this;
opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
- receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- receiver->recvThread.running = true;
- celixThread_create(&receiver->recvThread.thread, NULL, psa_nanomsg_recvThread, receiver);
+ subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ recvThread.running = true;
+ celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this);
std::stringstream namestream;
- namestream << "NANOMSG TR " << scope << "/" << topic;
- celixThread_setName(&receiver->recvThread.thread, namestream.str().c_str());
+ namestream << "NANOMSG TR " << m_scope << "/" << m_topic;
+ celixThread_setName(&recvThread.thread, namestream.str().c_str());
}
- return receiver;
}
-void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver) {
- if (receiver != NULL) {
+pubsub::nanomsg::topic_receiver::~topic_receiver() {
{
- std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
- receiver->recvThread.running = false;
+ std::lock_guard<std::mutex> _lock(recvThread.mutex);
+ recvThread.running = false;
}
- celixThread_join(receiver->recvThread.thread, NULL);
+ celixThread_join(recvThread.thread, NULL);
- celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+ celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
hash_map_iterator_t iter=hash_map_iterator_t();
{
- std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
- iter = hashMapIterator_construct(receiver->subscribers.map);
+ std::lock_guard<std::mutex> _lock(subscribers.mutex);
+ iter = hashMapIterator_construct(subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
if (entry != NULL) {
- receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
free(entry);
}
}
- hashMap_destroy(receiver->subscribers.map, false, false);
+ hashMap_destroy(subscribers.map, false, false);
}
{
- std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
- iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+ iter = hashMapIterator_construct(requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
if (entry != NULL) {
@@ -202,37 +206,34 @@ void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiv
free(entry);
}
}
- hashMap_destroy(receiver->requestedConnections.map, false, false);
+ hashMap_destroy(requestedConnections.map, false, false);
}
//celixThreadMutex_destroy(&receiver->subscribers.mutex);
//celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
//celixThreadMutex_destroy(&receiver->recvThread.mutex);
- nn_close(receiver->nanoMsgSocket);
+ nn_close(m_nanoMsgSocket);
- free(receiver->scope);
- free(receiver->topic);
- }
- free(receiver);
+ free((void*)m_scope);
+ free((void*)m_topic);
}
-const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver) {
- return receiver->scope;
+const char* pubsub::nanomsg::topic_receiver::scope() const {
+ return m_scope;
}
-const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver) {
- return receiver->topic;
+const char* pubsub::nanomsg::topic_receiver::topic() const {
+ return m_topic;
}
-long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver) {
- return receiver->serializerSvcId;
+long pubsub::nanomsg::topic_receiver::serializerSvcId() const {
+ return m_serializerSvcId;
}
-void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
- std::vector<std::string> &connectedUrls,
+void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls,
std::vector<std::string> &unconnectedUrls) {
- std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter));
if (entry->connected) {
@@ -244,21 +245,19 @@ void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t
}
-void pubsub_nanoMsgTopicReceiver_connectTo(
- pubsub_nanomsg_topic_receiver_t *receiver,
- const char *url) {
- L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope, receiver->topic, url);
+void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
+ L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url);
- std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
- psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(receiver->requestedConnections.map, url));
+ std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+ psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(requestedConnections.map, url));
if (entry == NULL) {
entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry)));
entry->url = strndup(url, 1024*1024);
entry->connected = false;
- hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
+ hashMap_put(requestedConnections.map, (void*)entry->url, entry);
}
if (!entry->connected) {
- int connection_id = nn_connect(receiver->nanoMsgSocket, url);
+ int connection_id = nn_connect(m_nanoMsgSocket, url);
if (connection_id >= 0) {
entry->connected = true;
entry->id = connection_id;
@@ -268,13 +267,13 @@ void pubsub_nanoMsgTopicReceiver_connectTo(
}
}
-void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url) {
- L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope, receiver->topic, url);
+void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
+ L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url);
- std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
- psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(receiver->requestedConnections.map, url));
+ std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+ psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(requestedConnections.map, url));
if (entry != NULL && entry->connected) {
- if (nn_shutdown(receiver->nanoMsgSocket, entry->id) == 0) {
+ if (nn_shutdown(m_nanoMsgSocket, entry->id) == 0) {
entry->connected = false;
} else {
L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno));
@@ -287,7 +286,7 @@ void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t
}
static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+ pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
long bndId = celix_bundle_getId(bnd);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -318,7 +317,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+ pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
long bndId = celix_bundle_getId(bnd);
@@ -338,7 +337,7 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s
}
}
-static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
pubsub_subscriber_t *svc = entry->svc;
@@ -362,7 +361,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t
}
}
-static inline void processMsg(pubsub_nanomsg_topic_receiver_t *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -377,8 +376,9 @@ struct Message {
pubsub_nanmosg_msg_header_t header;
char payload[];
};
+
static void* psa_nanomsg_recvThread(void *data) {
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(data);
+ pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data);
bool running{};
{
std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index d584db8..6cd216b 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -24,23 +24,72 @@
#include "log_helper.h"
#include "celix_bundle_context.h"
-typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+namespace pubsub {
+ namespace nanomsg {
+ class topic_receiver {
+ public:
+ topic_receiver(celix_bundle_context_t
+ *ctx,
+ log_helper_t *logHelper,
+ const char *scope,
+ const char *topic,
+ long serializerSvcId, pubsub_serializer_service_t
+ *serializer);
+ topic_receiver(const topic_receiver &) = delete;
+ topic_receiver & operator=(const topic_receiver &) = delete;
+ ~topic_receiver();
-pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
- log_helper_t *logHelper, const char *scope,
- const char *topic, long serializerSvcId,
- pubsub_serializer_service_t *serializer);
-void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+ const char* scope() const;
+ const char* topic() const;
+ long serializerSvcId() const;
+ void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
+ void connectTo(const char *url);
+ void disconnectFrom(const char *url);
+ private:
+ celix_bundle_context_t *ctx{nullptr};
+ log_helper_t *logHelper{nullptr};
+ long m_serializerSvcId{0};
+ pubsub_serializer_service_t *serializer{nullptr};
+ const char *m_scope{nullptr};
+ const char *m_topic{nullptr};
+ char m_scopeAndTopicFilter[5];
-const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
-const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+ int m_nanoMsgSocket{0};
-long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
-void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
- std::vector<std::string> &connectedUrls,
- std::vector<std::string> &unconnectedUrls);
+ struct {
+ celix_thread_t thread;
+ std::mutex mutex;
+ bool running;
+ } recvThread{};
-void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
-void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+ struct {
+ std::mutex mutex;
+ hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+ } requestedConnections{};
+
+ long subscriberTrackerId{0};
+ struct {
+ std::mutex mutex;
+ hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ } subscribers{};
+ };
+ }}
+//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+// log_helper_t *logHelper, const char *scope,
+// const char *topic, long serializerSvcId,
+// pubsub_serializer_service_t *serializer);
+//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+
+//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
+//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+
+//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
+//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
+// std::vector<std::string> &connectedUrls,
+// std::vector<std::string> &unconnectedUrls);
+
+//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
#endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H