You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:02:59 UTC
[3/8] celix git commit: nanomsg Topic receiver to class
nanomsg Topic receiver to class
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/0abbf432
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/0abbf432
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/0abbf432
Branch: refs/heads/nanomsg
Commit: 0abbf4323838b5823b6f275ab418438727dfe289
Parents: c19a5bd
Author: Erjan Altena <er...@gmail.com>
Authored: Wed Nov 21 20:34:12 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Wed Nov 21 21:10:41 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_common.h | 4 +-
.../src/pubsub_nanomsg_topic_receiver.cc | 90 ++++++--------------
.../src/pubsub_nanomsg_topic_receiver.h | 42 +++++----
3 files changed, 47 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
index 28293a8..3d5d48d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -37,14 +37,14 @@
*/
-struct pubsub_zmq_msg_header {
+struct pubsub_nanomsg_msg_header {
//header
unsigned int type;
unsigned char major;
unsigned char minor;
};
-typedef struct pubsub_zmq_msg_header pubsub_nanmosg_msg_header_t;
+typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t;
int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 889d79d..88886c6 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -17,6 +17,7 @@
*under the License.
*/
+#include <iostream>
#include <mutex>
#include <memory.h>
#include <vector>
@@ -61,34 +62,6 @@
#define L_WARN printf
#define L_ERROR printf
-struct pubsub_nanomsg_topic_receiver {
- celix_bundle_context_t *ctx;
- log_helper_t *logHelper;
- long serializerSvcId;
- pubsub_serializer_service_t *serializer;
- char *scope;
- char *topic;
- char scopeAndTopicFilter[5];
-
- int nanoMsgSocket;
-
- struct {
- celix_thread_t thread;
- std::mutex mutex;
- bool running;
- } recvThread;
-
- struct {
- std::mutex mutex;
- hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
- } requestedConnections;
-
- long subscriberTrackerId;
- struct {
- std::mutex mutex;
- hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
- } subscribers;
-};
typedef struct psa_zmq_requested_connection_entry {
char *url;
@@ -96,23 +69,14 @@ typedef struct psa_zmq_requested_connection_entry {
int id;
} psa_nanomsg_requested_connection_entry_t;
-typedef struct psa_zmq_subscriber_entry {
- int usageCount;
- hash_map_t *msgTypes; //map from serializer svc
- pubsub_subscriber_t *svc;
-} psa_nanomsg_subscriber_entry_t;
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+ const celix_bundle_t *owner);
static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
const celix_bundle_t *owner);
-static void* psa_nanomsg_recvThread(void *data);
-//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-// log_helper_t *logHelper, const char *scope,
-// const char *topic, long serializerSvcId,
-// pubsub_serializer_service_t *serializer) {
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
log_helper_t *_logHelper,
const char *_scope,
@@ -149,6 +113,7 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
m_topic = strndup(m_topic, 1024 * 1024);
subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
@@ -159,15 +124,13 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.filter = buf;
opts.callbackHandle = this;
- opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
+ opts.addWithOwner = pubsub_nanomsgTopicReceiver_addSubscriber;
opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
recvThread.running = true;
- celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this);
- std::stringstream namestream;
- namestream << "NANOMSG TR " << m_scope << "/" << m_topic;
- celixThread_setName(&recvThread.thread, namestream.str().c_str());
+
+ recvThread.thread = std::thread([this]() {this->recvThread_exec();});
}
}
@@ -177,7 +140,7 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
std::lock_guard<std::mutex> _lock(recvThread.mutex);
recvThread.running = false;
}
- celixThread_join(recvThread.thread, NULL);
+ recvThread.thread.join();
celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
@@ -285,12 +248,13 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
}
}
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
+static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+ const celix_bundle_t *bnd) {
+ auto *receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
long bndId = celix_bundle_getId(bnd);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
- if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+ if (strncmp(subScope, receiver->m_scope, strlen(receiver->m_scope)) != 0) {
//not the same scope. ignore
return;
}
@@ -309,7 +273,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
if (rc == 0) {
hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
} else {
- L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
free(entry);
}
}
@@ -317,7 +281,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
- pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
+ auto receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
long bndId = celix_bundle_getId(bnd);
@@ -331,13 +295,13 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s
hashMap_remove(receiver->subscribers.map, (void*)bndId);
int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
if (rc != 0) {
- L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
}
free(entry);
}
}
-static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
pubsub_subscriber_t *svc = entry->svc;
@@ -353,7 +317,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r
msgSer->freeMsg(msgSer->handle, deserializedMsg);
}
} else {
- L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
+ //L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, scope, topic);
}
}
} else {
@@ -361,13 +325,13 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r
}
}
-static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
- std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+ std::lock_guard<std::mutex> _lock(subscribers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
if (entry != NULL) {
- processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize);
+ processMsgForSubscriberEntry(entry, hdr, payload, payloadSize);
}
}
}
@@ -377,12 +341,11 @@ struct Message {
char payload[];
};
-static void* psa_nanomsg_recvThread(void *data) {
- pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data);
+void pubsub::nanomsg::topic_receiver::recvThread_exec() {
bool running{};
{
- std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
- running = receiver->recvThread.running;
+ std::lock_guard<std::mutex> _lock(recvThread.mutex);
+ running = recvThread.running;
}
while (running) {
Message *msg = nullptr;
@@ -400,9 +363,9 @@ static void* psa_nanomsg_recvThread(void *data) {
msgHdr.msg_controllen = 0;
errno = 0;
- int recvBytes = nn_recvmsg(receiver->nanoMsgSocket, &msgHdr, 0);
+ int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0);
if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
- processMsg(receiver, &msg->header, msg->payload, recvBytes-sizeof(msg->header));
+ processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
nn_freemsg(msg);
} else if (recvBytes >= 0) {
L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
@@ -415,5 +378,4 @@ static void* psa_nanomsg_recvThread(void *data) {
}
} // while
- return NULL;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 6cd216b..f977917 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -16,15 +16,24 @@
*specific language governing permissions and limitations
*under the License.
*/
-#ifndef CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
-#define CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
+#pragma once
#include <string>
#include <vector>
+#include <thread>
+#include <mutex>
#include "pubsub_serializer.h"
#include "log_helper.h"
#include "celix_bundle_context.h"
+#include "pubsub_nanomsg_common.h"
+#include "pubsub/subscriber.h"
+
+typedef struct psa_zmq_subscriber_entry {
+ int usageCount;
+ hash_map_t *msgTypes; //map from serializer svc
+ pubsub_subscriber_t *svc;
+} psa_nanomsg_subscriber_entry_t;
+
-//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
namespace pubsub {
namespace nanomsg {
class topic_receiver {
@@ -46,7 +55,10 @@ namespace pubsub {
void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
void connectTo(const char *url);
void disconnectFrom(const char *url);
- private:
+ void recvThread_exec();
+ void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize);
+ void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
+ //private:
celix_bundle_context_t *ctx{nullptr};
log_helper_t *logHelper{nullptr};
long m_serializerSvcId{0};
@@ -58,7 +70,7 @@ namespace pubsub {
int m_nanoMsgSocket{0};
struct {
- celix_thread_t thread;
+ std::thread thread;
std::mutex mutex;
bool running;
} recvThread{};
@@ -74,22 +86,6 @@ namespace pubsub {
hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
} subscribers{};
};
- }}
-//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-// log_helper_t *logHelper, const char *scope,
-// const char *topic, long serializerSvcId,
-// pubsub_serializer_service_t *serializer);
-//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
-
-//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
-//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
-
-//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
-//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
-// std::vector<std::string> &connectedUrls,
-// std::vector<std::string> &unconnectedUrls);
-
-//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
-//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+ }
+}
-#endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H