You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2019/02/01 10:46:36 UTC
[celix] branch feature/CELIX-459-pubsub-hums updated: CELIX-459:
Fixes some issues in the pubsub monitoring and update the zmq sender to use
zero copy constructions
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/CELIX-459-pubsub-hums
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/CELIX-459-pubsub-hums by this push:
new 93a8674 CELIX-459: Fixes some issues in the pubsub monitoring and update the zmq sender to use zero copy constructions
93a8674 is described below
commit 93a867477e30cb1f8f1b401db7aa2cab295adafb
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Fri Feb 1 11:45:51 2019 +0100
CELIX-459: Fixes some issues in the pubsub monitoring and update the zmq sender to use zero copy constructions
---
.../publisher/private/src/pubsub_publisher.c | 3 +-
.../subscriber/private/src/pubsub_subscriber.c | 2 +-
.../pubsub_admin_zmq/src/pubsub_zmq_common.c | 2 +-
.../src/pubsub_zmq_topic_receiver.c | 4 +-
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 57 ++++++++++++----------
.../src/pubsub_topology_manager.c | 16 ++++--
libs/framework/include/celix_bundle.h | 2 +
libs/framework/src/bundle.c | 10 ++++
8 files changed, 62 insertions(+), 34 deletions(-)
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 5369a22..1eba05b 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -68,7 +68,7 @@ static void* send_thread(void* arg){
place->name = name;
place->description = desc;
- place->extra = "DONT PANIC";
+ place->extra = "extra value";
printf("TOPIC : %s\n",st_struct->topic);
unsigned int msgId = 0;
if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,st_struct->topic,&msgId) == 0 ){
@@ -81,6 +81,7 @@ static void* send_thread(void* arg){
for(int i = 0; i < (nr_char-1); i++) {
place->data[i] = i%10 + '0';
}
+ place->data[nr_char-1] = '\0';
if(publish_svc->send) {
if(publish_svc->send(publish_svc->handle,msgId,place)==0){
printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char);
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index 34a1c59..61cc288 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -56,7 +56,7 @@ void subscriber_destroy(pubsub_receiver_pt subscriber){
int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release){
location_t place = (location_t)msg;
- printf("Recv (%s): [%f, %f] (%s, %s, %s, %s)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, place->data);
+ printf("Recv (%s): [%f, %f] (%s, %s, %s, len data %li)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, (long)(strlen(place->data) + 1));
return 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
index dcb6ba1..e77398c 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
@@ -76,7 +76,7 @@ static int readLong(const unsigned char *data, int offset, int64_t *val) {
((int64_t)data[offset+6] << 8 ) |
((int64_t)data[offset+7] << 0 )
);
- return offset + 4;
+ return offset + 8;
}
celix_status_t psa_zmq_decodeHeader(const unsigned char *data, size_t dataLen, pubsub_zmq_msg_header_t *header) {
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 3c26559..13ae1a1 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -595,7 +595,9 @@ static void* psa_zmq_recvThread(void * data) {
zframe_t *filter = zmsg_pop(zmsg); //char[5] filter
zframe_t *header = zmsg_pop(zmsg); //pubsub_zmq_msg_header_t
zframe_t *payload = zmsg_pop(zmsg); //serialized payload
- if (header != NULL && payload != NULL) {
+ if (filter != NULL && strncmp(receiver->scopeAndTopicFilter, (char*)zframe_data(filter), zframe_size(filter)) != 0 ) {
+ L_ERROR("[PSA_ZMQ_TR] Invalid ZQM filter, Found '%4s'. Expected %s\n", (char*)zframe_data(filter), receiver->scopeAndTopicFilter);
+ } else if (header != NULL && payload != NULL) {
struct timespec receiveTime;
clock_gettime(CLOCK_REALTIME, &receiveTime);
pubsub_zmq_msg_header_t msgHeader;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index acb23be..1320114 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -468,16 +468,15 @@ pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
return result;
}
-//static void psa_zmq_freeMsg(void *msg, void *hint __attribute__((unused))) {
-// free(msg);
-//}
+static void psa_zmq_freeMsg(void *msg, void *hint __attribute__((unused))) {
+ free(msg);
+}
static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
int status = CELIX_SUCCESS;
psa_zmq_bounded_service_entry_t *bound = handle;
pubsub_zmq_topic_sender_t *sender = bound->parent;
bool monitor = sender->metricsEnabled;
- unsigned char hdr[48];
psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId));
@@ -522,37 +521,43 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
}
msg_hdr.seqNr = entry->seqNr++;
}
+ unsigned char *hdr = calloc(sizeof(pubsub_zmq_msg_header_t), sizeof(unsigned char));
psa_zmq_encodeHeader(&msg_hdr, hdr, sizeof(pubsub_zmq_msg_header_t));
errno = 0;
-
- zmsg_t *msg = zmsg_new();
- zmsg_addstr(msg, sender->scopeAndTopicFilter);
- zmsg_addmem(msg, &hdr, sizeof(pubsub_zmq_msg_header_t));
- zmsg_addmem(msg, serializedOutput, serializedOutputLen);
- int rc = zmsg_send(&msg, sender->zmq.socket);
- free(serializedOutput);
-
-// zmq_msg_t msg1; //filter
-// zmq_msg_t msg2; //header
-// zmq_msg_t msg3; //payload
-// zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
-// zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), NULL, bound);
-// zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
-// int rc = zmq_msg_send(&msg1, sender->zmq.socket, ZMQ_SNDMORE);
-// if (rc == 0) {
-// rc = zmq_msg_send(&msg2, sender->zmq.socket, ZMQ_SNDMORE);
-// }
-// if (rc == 0) {
-// rc = zmq_msg_send(&msg3, sender->zmq.socket, 0);
-// }
+ bool sendOk;
+
+// zmsg_t *msg = zmsg_new();
+// zmsg_addstr(msg, sender->scopeAndTopicFilter);
+// zmsg_addmem(msg, hdr, sizeof(pubsub_zmq_msg_header_t));
+// zmsg_addmem(msg, serializedOutput, serializedOutputLen);
+// int rc = zmsg_send(&msg, sender->zmq.socket);
+// sendOk = rc == 0;
+// free(serializedOutput);
+// free(hdr);
+
+ zmq_msg_t msg1; //filter
+ zmq_msg_t msg2; //header
+ zmq_msg_t msg3; //payload
+ zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
+ zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), psa_zmq_freeMsg, bound);
+ zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
+ void *socket = zsock_resolve(sender->zmq.socket);
+ int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+ if (rc > 0) {
+ rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
+ }
+ if (rc > 0) {
+ rc = zmq_msg_send(&msg3, socket, 0);
+ }
+ sendOk = rc > 0;
if (monitor) {
//unlock send
entry->sendLock = 0;
}
- if (rc == 0) {
+ if (sendOk) {
sendCountUpdate = 1;
} else {
sendErrorUpdate = 1;
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 7690ce5..1f1da9c 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -25,6 +25,7 @@
#include <pubsub_utils.h>
#include <assert.h>
#include <pubsub_admin_metrics.h>
+#include <uuid/uuid.h>
#include "hash_map.h"
#include "celix_array_list.h"
@@ -1090,6 +1091,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
return CELIX_SUCCESS;
}
+static void fetchBundleName(void *handle, const bundle_t *bundle) {
+ const char **out = handle;
+ *out = celix_bundle_getSymbolicName(bundle);
+}
+
static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *manager, char *commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
celix_array_list_t *psaMetrics = celix_arrayList_create();
celixThreadMutex_lock(&manager->psaMetrics.mutex);
@@ -1111,7 +1117,9 @@ static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *
if (sm->msgMetrics[j].nrOfMessagesSend == 0 && sm->msgMetrics[j].nrOfMessagesSendFailed == 0 && sm->msgMetrics[j].nrOfSerializationErrors == 0) {
continue;
}
- fprintf(os, " |- Message & Bundle %s/%li:\n", sm->msgMetrics[j].typeFqn, sm->msgMetrics->bndId);
+ const char *bndName = NULL;
+ celix_bundleContext_useBundle(manager->context, sm->msgMetrics->bndId, &bndName, fetchBundleName);
+ fprintf(os, " |- Message '%s' from bundle '%s' (%li):\n", sm->msgMetrics[j].typeFqn, bndName, sm->msgMetrics->bndId);
fprintf(os, " |- msg type = %i\n", sm->msgMetrics[j].typeId);
fprintf(os, " |- send count = %li\n", sm->msgMetrics[j].nrOfMessagesSend);
fprintf(os, " |- fail count = %li\n", sm->msgMetrics[j].nrOfMessagesSendFailed);
@@ -1133,7 +1141,7 @@ static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *
}
char uuidStr[UUID_STR_LEN+1];
uuid_unparse(rm->msgTypes[j].origins[m].originUUID, uuidStr);
- fprintf(os, " |- Message & Origin %s/%s:\n", rm->msgTypes[j].typeFqn, uuidStr);
+ fprintf(os, " |- Message '%s' from framework UUID %s:\n", rm->msgTypes[j].typeFqn, uuidStr);
fprintf(os, " |- msg type = %i\n", rm->msgTypes[j].typeId);
fprintf(os, " |- receive count = %li\n", rm->msgTypes[j].origins[m].nrOfMessagesReceived);
fprintf(os, " |- serialization error = %li\n", rm->msgTypes[j].origins[m].nrOfSerializationErrors);
@@ -1155,8 +1163,8 @@ static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *
celix_status_t pubsub_topologyManager_shellCommand(void *handle, char *commandLine, FILE *os, FILE *errorStream) {
pubsub_topology_manager_t *manager = handle;
- static const char * topCmd = "pstm topology";
- static const char * metricsCmd = "pstm metrics";
+ static const char * topCmd = "pstm t"; //"topology";
+ static const char * metricsCmd = "pstm m"; //"metrics"
if (strncmp(commandLine, topCmd, strlen(topCmd)) == 0) {
return pubsub_topologyManager_topology(manager, commandLine, os, errorStream);
diff --git a/libs/framework/include/celix_bundle.h b/libs/framework/include/celix_bundle.h
index 70b54b0..f0ef683 100644
--- a/libs/framework/include/celix_bundle.h
+++ b/libs/framework/include/celix_bundle.h
@@ -56,6 +56,8 @@ char* celix_bundle_getEntry(const celix_bundle_t* bnd, const char *path);
const char* celix_bundle_getGroup(const celix_bundle_t *bnd);
+const char* celix_bundle_getSymbolicName(const celix_bundle_t *bnd);
+
#ifdef __cplusplus
}
diff --git a/libs/framework/src/bundle.c b/libs/framework/src/bundle.c
index e4576ca..6e7a1d9 100644
--- a/libs/framework/src/bundle.c
+++ b/libs/framework/src/bundle.c
@@ -630,3 +630,13 @@ const char* celix_bundle_getGroup(const celix_bundle_t *bnd) {
}
return result;
}
+
+const char* celix_bundle_getSymbolicName(const celix_bundle_t *bnd) {
+ const char *result = NULL;
+ module_pt mod = NULL;
+ bundle_getCurrentModule((bundle_pt)bnd, &mod);
+ if (mod != NULL) {
+ module_getSymbolicName(mod, &result);
+ }
+ return result;
+}
\ No newline at end of file