You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:03:03 UTC
[7/8] celix git commit: Nanomsg
Nanomsg
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/cdefb0d6
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/cdefb0d6
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/cdefb0d6
Branch: refs/heads/nanomsg
Commit: cdefb0d665b27a41f360599598ff489b322b4405
Parents: 883abee
Author: Erjan Altena <er...@gmail.com>
Authored: Mon Nov 26 20:23:11 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Mon Nov 26 20:23:11 2018 +0100
----------------------------------------------------------------------
.../log_service/loghelper_include/log_helper.h | 2 +-
bundles/log_service/src/log_helper.c | 2 +-
.../src/pubsub_nanomsg_admin.cc | 62 ++++++++------------
.../src/pubsub_nanomsg_admin.h | 10 ++--
.../src/pubsub_nanomsg_topic_receiver.cc | 10 ++--
.../src/pubsub_nanomsg_topic_sender.cc | 11 +---
6 files changed, 42 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/loghelper_include/log_helper.h
----------------------------------------------------------------------
diff --git a/bundles/log_service/loghelper_include/log_helper.h b/bundles/log_service/loghelper_include/log_helper.h
index 28e6877..af058eb 100644
--- a/bundles/log_service/loghelper_include/log_helper.h
+++ b/bundles/log_service/loghelper_include/log_helper.h
@@ -33,7 +33,7 @@ celix_status_t logHelper_create(bundle_context_pt context, log_helper_pt* log_he
celix_status_t logHelper_start(log_helper_pt loghelper);
celix_status_t logHelper_stop(log_helper_pt loghelper);
celix_status_t logHelper_destroy(log_helper_pt* loghelper);
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... );
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... );
#ifdef __cplusplus
}
#endif
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/src/log_helper.c
----------------------------------------------------------------------
diff --git a/bundles/log_service/src/log_helper.c b/bundles/log_service/src/log_helper.c
index 6570357..e9939ed 100644
--- a/bundles/log_service/src/log_helper.c
+++ b/bundles/log_service/src/log_helper.c
@@ -156,7 +156,7 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... )
{
celix_status_t status = CELIX_SUCCESS;
va_list listPointer;
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 030441d..cf516ee 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -32,20 +32,15 @@
#include "pubsub_utils.h"
#include "pubsub_nanomsg_admin.h"
#include "pubsub_psa_nanomsg_constants.h"
-/*
-//#define L_DEBUG(...) \
-// logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
-//#define L_INFO(...) \
-// logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
-//#define L_WARN(...) \
-// logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
-//#define L_ERROR(...) \
-// logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-*/
-#define L_DEBUG printf
-#define L_INFO printf
-#define L_WARN printf
-#define L_ERROR printf
+
+#define L_DEBUG(...) \
+ logHelper_log(log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
@@ -125,10 +120,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
{
std::lock_guard<std::mutex> lock(serializers.mutex);
- // todo: do not use pointer but type in map
- for(auto kv: serializers.map) {
- free(kv.second);
- }
+ serializers.map.clear();
}
free(ipAddress);
@@ -229,11 +221,14 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t
std::lock_guard<std::mutex> lock(serializers.mutex);
auto it = serializers.map.find(svcId);
if (it == serializers.map.end()) {
- auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
- entry->serType = serType;
- entry->svcId = svcId;
- entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
- serializers.map[svcId] = entry;
+ serializers.map.emplace(std::piecewise_construct,
+ std::forward_as_tuple(svcId),
+ std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc)));
+// auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
+// entry->serType = serType;
+// entry->svcId = svcId;
+// entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
+// serializers.map[svcId] = entry;
}
}
}
@@ -250,18 +245,14 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
std::lock_guard<std::mutex> lock(serializers.mutex);
- psa_nanomsg_serializer_entry_t* entry = nullptr;
auto kvsm = serializers.map.find(svcId);
if (kvsm != serializers.map.end()) {
- entry = kvsm->second;
- }
- serializers.map.erase(svcId);
- if (entry != nullptr) {
+ auto &entry = kvsm->second;
{
std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
for (auto kv: topicSenders.map) {
auto *sender = kv.second;
- if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+ if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
char *key = kv.first;
topicSenders.map.erase(kv.first);
pubsub_nanoMsgTopicSender_destroy(sender);
@@ -274,7 +265,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
for (auto kv : topicReceivers.map){
auto *receiver = kv.second;
- if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
+ if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
auto key = kv.first;
topicReceivers.map.erase(key);
delete receiver;
@@ -282,7 +273,6 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
}
}
- free(entry);
}
}
@@ -340,7 +330,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
psa_nanomsg_serializer_entry_t *serEntry = nullptr;
auto kv = serializers.map.find(serializerSvcId);
if (kv != serializers.map.end()) {
- serEntry = kv->second;
+ serEntry = &kv->second;
}
if (serEntry != nullptr) {
sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
@@ -418,13 +408,13 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope
auto kvs = serializers.map.find(serializerSvcId);
if (kvs != serializers.map.end()) {
auto serEntry = kvs->second;
- receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+ receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry.svc);
} else {
L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope.c_str(), topic.c_str());
}
if (receiver != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
- const char *serType = kvs->second->serType;
+ const char *serType = kvs->second.serType;
newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
serType, nullptr);
//if available also set container name
@@ -577,7 +567,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
pubsub_nanomsg_topic_sender_t *sender = kvts.second;
long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
auto kvs = serializers.map.find(serSvcId);
- const char *serType = kvs->second == nullptr ? "!Error!" : kvs->second->serType;
+ const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType);
const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -596,7 +586,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
pubsub::nanomsg::topic_receiver *receiver = entry.second;
long serSvcId = receiver->serializerSvcId();
auto kv = serializers.map.find(serSvcId);
- const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
+ const char *serType = (kv == serializers.map.end() ? "!Error!" : kv->second.serType);
auto scope = receiver->scope();
auto topic = receiver->topic();
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index b33a3c0..689ae20 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -43,8 +43,6 @@
#define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1"
-//typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
-
template <typename key, typename value>
struct ProtectedMap {
std::mutex mutex{};
@@ -111,11 +109,16 @@ private:
bool verbose{};
typedef struct psa_nanomsg_serializer_entry {
+ psa_nanomsg_serializer_entry(const char*_serType, long _svcId, pubsub_serializer_service_t *_svc) :
+ serType{_serType}, svcId{_svcId}, svc{_svc} {
+
+ }
+
const char *serType;
long svcId;
pubsub_serializer_service_t *svc;
} psa_nanomsg_serializer_entry_t;
- ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
+ ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
@@ -131,4 +134,3 @@ extern "C" {
#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
-
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index db8469b..9f77a4c 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -158,7 +158,7 @@ void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &
void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
- L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
+ L_DEBUG("[PSA_NANOMSG] TopicReceiver %s/%s connecting to nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -181,7 +181,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
}
void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
- L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
+ L_DEBUG("[PSA NANOMSG] TopicReceiver %s/%s disconnect from nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -316,13 +316,13 @@ void pubsub::nanomsg::topic_receiver::recvThread_exec() {
processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
nn_freemsg(msg);
} else if (recvBytes >= 0) {
- L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
+ L_ERROR("[PSA_NANOMSG_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
} else if (errno == EAGAIN || errno == ETIMEDOUT) {
//nop
} else if (errno == EINTR) {
- L_DEBUG("[PSA_ZMQ_TR] zmsg_recv interrupted");
+ L_DEBUG("[PSA_NANOMSG_TR] nn_recvmsg interrupted");
} else {
- L_WARN("[PSA_ZMQ_TR] Error receiving zmq message: errno %d: %s\n", errno, strerror(errno));
+ L_WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno %d: %s\n", errno, strerror(errno));
}
} // while
http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index ff0d4f7..d5ed28f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -37,9 +37,9 @@
#include "pubsub_psa_nanomsg_constants.h"
#include "pubsub_nanomsg_common.h"
-#define FIRST_SEND_DELAY_IN_SECONDS 2
+#define FIRST_SEND_DELAY_IN_SECONDS 2
#define NANOMSG_BIND_MAX_RETRY 10
-/*
+
#define L_DEBUG(...) \
logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
#define L_INFO(...) \
@@ -48,11 +48,6 @@
logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
#define L_ERROR(...) \
logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-*/
-#define L_DEBUG printf
-#define L_INFO printf
-#define L_WARN printf
-#define L_ERROR printf
struct pubsub_nanomsg_topic_sender {
celix_bundle_context_t *ctx;
@@ -349,7 +344,7 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId
return status;
}
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t */*sender*/) {
+static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) {
static bool firstSend = true;