You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2019/02/01 10:46:36 UTC

[celix] branch feature/CELIX-459-pubsub-hums updated: CELIX-459: Fixes some issues in the pubsub monitoring and update the zmq sender to use zero copy constructions

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/CELIX-459-pubsub-hums
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/CELIX-459-pubsub-hums by this push:
     new 93a8674  CELIX-459: Fixes some issues in the pubsub monitoring and update the zmq sender to use zero copy constructions
93a8674 is described below

commit 93a867477e30cb1f8f1b401db7aa2cab295adafb
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Fri Feb 1 11:45:51 2019 +0100

    CELIX-459: Fixes some issues in the pubsub monitoring and update the zmq sender to use zero copy constructions
---
 .../publisher/private/src/pubsub_publisher.c       |  3 +-
 .../subscriber/private/src/pubsub_subscriber.c     |  2 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_common.c       |  2 +-
 .../src/pubsub_zmq_topic_receiver.c                |  4 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 57 ++++++++++++----------
 .../src/pubsub_topology_manager.c                  | 16 ++++--
 libs/framework/include/celix_bundle.h              |  2 +
 libs/framework/src/bundle.c                        | 10 ++++
 8 files changed, 62 insertions(+), 34 deletions(-)

diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 5369a22..1eba05b 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -68,7 +68,7 @@ static void* send_thread(void* arg){
 
 	place->name = name;
 	place->description = desc;
-	place->extra = "DONT PANIC";
+	place->extra = "extra value";
 	printf("TOPIC : %s\n",st_struct->topic);
 	unsigned int msgId = 0;
 	if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,st_struct->topic,&msgId) == 0 ){
@@ -81,6 +81,7 @@ static void* send_thread(void* arg){
 			for(int i = 0; i < (nr_char-1); i++) {
 				place->data[i] = i%10 + '0';
 			}
+			place->data[nr_char-1] = '\0';
 			if(publish_svc->send) {
 				if(publish_svc->send(publish_svc->handle,msgId,place)==0){
 					printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char);
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index 34a1c59..61cc288 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -56,7 +56,7 @@ void subscriber_destroy(pubsub_receiver_pt subscriber){
 int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release){
 
 	location_t place = (location_t)msg;
-	printf("Recv (%s): [%f, %f] (%s, %s, %s, %s)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, place->data);
+	printf("Recv (%s): [%f, %f] (%s, %s, %s, len data %li)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, (long)(strlen(place->data) + 1));
 
 	return 0;
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
index dcb6ba1..e77398c 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
@@ -76,7 +76,7 @@ static int readLong(const unsigned char *data, int offset, int64_t *val) {
             ((int64_t)data[offset+6] << 8 ) |
             ((int64_t)data[offset+7] << 0 )
     );
-    return offset + 4;
+    return offset + 8;
 }
 
 celix_status_t psa_zmq_decodeHeader(const unsigned char *data, size_t dataLen, pubsub_zmq_msg_header_t *header) {
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 3c26559..13ae1a1 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -595,7 +595,9 @@ static void* psa_zmq_recvThread(void * data) {
                 zframe_t *filter = zmsg_pop(zmsg); //char[5] filter
                 zframe_t *header = zmsg_pop(zmsg); //pubsub_zmq_msg_header_t
                 zframe_t *payload = zmsg_pop(zmsg); //serialized payload
-                if (header != NULL && payload != NULL) {
+                if (filter != NULL && strncmp(receiver->scopeAndTopicFilter, (char*)zframe_data(filter), zframe_size(filter)) != 0 ) {
+                    L_ERROR("[PSA_ZMQ_TR] Invalid ZQM filter, Found '%4s'. Expected %s\n", (char*)zframe_data(filter), receiver->scopeAndTopicFilter);
+                } else if (header != NULL && payload != NULL) {
                     struct timespec receiveTime;
                     clock_gettime(CLOCK_REALTIME, &receiveTime);
                     pubsub_zmq_msg_header_t msgHeader;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index acb23be..1320114 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -468,16 +468,15 @@ pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
     return result;
 }
 
-//static void psa_zmq_freeMsg(void *msg, void *hint __attribute__((unused))) {
-//    free(msg);
-//}
+static void psa_zmq_freeMsg(void *msg, void *hint __attribute__((unused))) {
+    free(msg);
+}
 
 static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
     int status = CELIX_SUCCESS;
     psa_zmq_bounded_service_entry_t *bound = handle;
     pubsub_zmq_topic_sender_t *sender = bound->parent;
     bool monitor = sender->metricsEnabled;
-    unsigned char hdr[48];
 
     psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId));
 
@@ -522,37 +521,43 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 }
                 msg_hdr.seqNr = entry->seqNr++;
             }
+            unsigned char *hdr = calloc(sizeof(pubsub_zmq_msg_header_t), sizeof(unsigned char));
             psa_zmq_encodeHeader(&msg_hdr, hdr, sizeof(pubsub_zmq_msg_header_t));
 
             errno = 0;
-
-            zmsg_t *msg = zmsg_new();
-            zmsg_addstr(msg, sender->scopeAndTopicFilter);
-            zmsg_addmem(msg, &hdr, sizeof(pubsub_zmq_msg_header_t));
-            zmsg_addmem(msg, serializedOutput, serializedOutputLen);
-            int rc = zmsg_send(&msg, sender->zmq.socket);
-            free(serializedOutput);
-
-//            zmq_msg_t msg1; //filter
-//            zmq_msg_t msg2; //header
-//            zmq_msg_t msg3; //payload
-//            zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
-//            zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), NULL, bound);
-//            zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
-//            int rc = zmq_msg_send(&msg1, sender->zmq.socket, ZMQ_SNDMORE);
-//            if (rc == 0) {
-//                rc = zmq_msg_send(&msg2, sender->zmq.socket, ZMQ_SNDMORE);
-//            }
-//            if (rc == 0) {
-//                rc = zmq_msg_send(&msg3, sender->zmq.socket, 0);
-//            }
+            bool sendOk;
+
+//            zmsg_t *msg = zmsg_new();
+//            zmsg_addstr(msg, sender->scopeAndTopicFilter);
+//            zmsg_addmem(msg, hdr, sizeof(pubsub_zmq_msg_header_t));
+//            zmsg_addmem(msg, serializedOutput, serializedOutputLen);
+//            int rc = zmsg_send(&msg, sender->zmq.socket);
+//            sendOk = rc == 0;
+//            free(serializedOutput);
+//            free(hdr);
+
+            zmq_msg_t msg1; //filter
+            zmq_msg_t msg2; //header
+            zmq_msg_t msg3; //payload
+            zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
+            zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), psa_zmq_freeMsg, bound);
+            zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
+            void *socket = zsock_resolve(sender->zmq.socket);
+            int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+            if (rc > 0) {
+                rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
+            }
+            if (rc > 0) {
+                rc = zmq_msg_send(&msg3, socket, 0);
+            }
+            sendOk = rc > 0;
 
 
             if (monitor) {
                 //unlock send
                 entry->sendLock = 0;
             }
-            if (rc == 0) {
+            if (sendOk) {
                 sendCountUpdate = 1;
             } else {
                 sendErrorUpdate = 1;
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 7690ce5..1f1da9c 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -25,6 +25,7 @@
 #include <pubsub_utils.h>
 #include <assert.h>
 #include <pubsub_admin_metrics.h>
+#include <uuid/uuid.h>
 
 #include "hash_map.h"
 #include "celix_array_list.h"
@@ -1090,6 +1091,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
     return CELIX_SUCCESS;
 }
 
+static void fetchBundleName(void *handle, const bundle_t *bundle) {
+    const char **out = handle;
+    *out = celix_bundle_getSymbolicName(bundle);
+}
+
 static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *manager, char *commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
     celix_array_list_t *psaMetrics = celix_arrayList_create();
     celixThreadMutex_lock(&manager->psaMetrics.mutex);
@@ -1111,7 +1117,9 @@ static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *
                 if (sm->msgMetrics[j].nrOfMessagesSend == 0 && sm->msgMetrics[j].nrOfMessagesSendFailed == 0 && sm->msgMetrics[j].nrOfSerializationErrors == 0) {
                     continue;
                 }
-                fprintf(os, "   |- Message & Bundle %s/%li:\n", sm->msgMetrics[j].typeFqn, sm->msgMetrics->bndId);
+                const char *bndName = NULL;
+                celix_bundleContext_useBundle(manager->context, sm->msgMetrics->bndId, &bndName, fetchBundleName);
+                fprintf(os, "   |- Message '%s' from bundle '%s' (%li):\n", sm->msgMetrics[j].typeFqn, bndName, sm->msgMetrics->bndId);
                 fprintf(os, "      |- msg type = %i\n", sm->msgMetrics[j].typeId);
                 fprintf(os, "      |- send count = %li\n", sm->msgMetrics[j].nrOfMessagesSend);
                 fprintf(os, "      |- fail count = %li\n", sm->msgMetrics[j].nrOfMessagesSendFailed);
@@ -1133,7 +1141,7 @@ static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *
                     }
                     char uuidStr[UUID_STR_LEN+1];
                     uuid_unparse(rm->msgTypes[j].origins[m].originUUID, uuidStr);
-                    fprintf(os, "   |- Message & Origin %s/%s:\n", rm->msgTypes[j].typeFqn, uuidStr);
+                    fprintf(os, "   |- Message '%s' from framework UUID %s:\n", rm->msgTypes[j].typeFqn, uuidStr);
                     fprintf(os, "      |- msg type = %i\n", rm->msgTypes[j].typeId);
                     fprintf(os, "      |- receive count = %li\n", rm->msgTypes[j].origins[m].nrOfMessagesReceived);
                     fprintf(os, "      |- serialization error = %li\n", rm->msgTypes[j].origins[m].nrOfSerializationErrors);
@@ -1155,8 +1163,8 @@ static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *
 celix_status_t pubsub_topologyManager_shellCommand(void *handle, char *commandLine, FILE *os, FILE *errorStream) {
     pubsub_topology_manager_t *manager = handle;
 
-    static const char * topCmd = "pstm topology";
-    static const char * metricsCmd = "pstm metrics";
+    static const char * topCmd = "pstm t"; //"topology";
+    static const char * metricsCmd = "pstm m"; //"metrics"
 
     if (strncmp(commandLine, topCmd, strlen(topCmd)) == 0) {
         return pubsub_topologyManager_topology(manager, commandLine, os, errorStream);
diff --git a/libs/framework/include/celix_bundle.h b/libs/framework/include/celix_bundle.h
index 70b54b0..f0ef683 100644
--- a/libs/framework/include/celix_bundle.h
+++ b/libs/framework/include/celix_bundle.h
@@ -56,6 +56,8 @@ char* celix_bundle_getEntry(const celix_bundle_t* bnd, const char *path);
 
 const char* celix_bundle_getGroup(const celix_bundle_t *bnd);
 
+const char* celix_bundle_getSymbolicName(const celix_bundle_t *bnd);
+
 
 #ifdef __cplusplus
 }
diff --git a/libs/framework/src/bundle.c b/libs/framework/src/bundle.c
index e4576ca..6e7a1d9 100644
--- a/libs/framework/src/bundle.c
+++ b/libs/framework/src/bundle.c
@@ -630,3 +630,13 @@ const char* celix_bundle_getGroup(const celix_bundle_t *bnd) {
 	}
 	return result;
 }
+
+const char* celix_bundle_getSymbolicName(const celix_bundle_t *bnd) {
+	const char *result = NULL;
+	module_pt mod = NULL;
+	bundle_getCurrentModule((bundle_pt)bnd, &mod);
+	if (mod != NULL) {
+		module_getSymbolicName(mod, &result);
+	}
+	return result;
+}
\ No newline at end of file