You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2019/02/11 12:42:07 UTC

[celix] branch feature/CELIX-459-pubsub-hums updated: CELIX-460: Fixed signed/unsigned mismatch in pubsub which caused missing messages

This is an automated email from the ASF dual-hosted git repository.

erjanaltena pushed a commit to branch feature/CELIX-459-pubsub-hums
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/CELIX-459-pubsub-hums by this push:
     new 855c5e4  CELIX-460: Fixed signed/unsigned mismatch in pubsub which caused missing messages
855c5e4 is described below

commit 855c5e4e7232e86544d43a05d6c9de5de9dc37b4
Author: Erjan Altena <er...@gmail.com>
AuthorDate: Mon Feb 11 13:41:57 2019 +0100

    CELIX-460: Fixed signed/unsigned mismatch in pubsub which caused missing messages
---
 .../pubsub_admin_zmq/src/pubsub_zmq_common.c       |  6 +--
 .../pubsub_admin_zmq/src/pubsub_zmq_common.h       | 12 +++---
 .../src/pubsub_zmq_topic_receiver.c                | 44 ++++++++++++----------
 .../src/pubsub_topology_manager.c                  |  4 +-
 4 files changed, 35 insertions(+), 31 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
index e77398c..0854a93 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
@@ -60,12 +60,12 @@ void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char *
     }
 }
 
-static int readInt(const unsigned char *data, int offset, int32_t *val) {
+static int readInt(const unsigned char *data, int offset, uint32_t *val) {
     *val = ((data[offset+0] << 24) | (data[offset+1] << 16) | (data[offset+2] << 8) | (data[offset+3] << 0));
     return offset + 4;
 }
 
-static int readLong(const unsigned char *data, int offset, int64_t *val) {
+static int readLong(const unsigned char *data, int offset, uint64_t *val) {
     *val = (
             ((int64_t)data[offset+0] << 56) |
             ((int64_t)data[offset+1] << 48) |
@@ -134,4 +134,4 @@ void psa_zmq_encodeHeader(const pubsub_zmq_msg_header_t *msgHeader, unsigned cha
     index += 16;
     index = writeLong(data, index, msgHeader->sendtimeSeconds);
     writeLong(data, index, msgHeader->sendTimeNanoseconds);
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
index 8a4c7ff..3e0e52f 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
@@ -38,13 +38,13 @@
 
 
 struct pubsub_zmq_msg_header {
-    int32_t type; //msg type id (hash of fqn)
-    int8_t major;
-    int8_t minor;
-    int32_t seqNr;
+    uint32_t type; //msg type id (hash of fqn)
+    uint8_t major;
+    uint8_t minor;
+    uint32_t seqNr;
     unsigned char originUUID[16];
-    int64_t sendtimeSeconds; //seconds since epoch
-    int64_t sendTimeNanoseconds; //ns since epoch
+    uint64_t sendtimeSeconds; //seconds since epoch
+    uint64_t sendTimeNanoseconds; //ns since epoch
 };
 
 typedef struct pubsub_zmq_msg_header pubsub_zmq_msg_header_t;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 3790bdb..b8a5cc7 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -90,19 +90,19 @@ typedef struct psa_zmq_requested_connection_entry {
 } psa_zmq_requested_connection_entry_t;
 
 typedef struct psa_zmq_subscriber_metrics_entry_t {
-    int msgTypeId;
+    unsigned int msgTypeId;
     uuid_t origin;
 
-    long nrOfMessagesReceived;
-    long nrOfSerializationErrors;
+    unsigned long nrOfMessagesReceived;
+    unsigned long nrOfSerializationErrors;
     struct timespec lastMessageReceived;
     double averageTimeBetweenMessagesInSeconds;
     double averageSerializationTimeInSeconds;
     double averageDelayInSeconds;
     double maxDelayInSeconds;
     double minDelayInSeconds;
-    int lastSeqNr;
-    long nrOfMissingSeqNumbers;
+    unsigned int lastSeqNr;
+    unsigned long nrOfMissingSeqNumbers;
 } psa_zmq_subscriber_metrics_entry_t;
 
 typedef struct psa_zmq_subscriber_entry {
@@ -495,7 +495,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
             }
         }
     } else {
-        L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id %i", hdr->type);
+        L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", hdr->type);
     }
 
     if (msgSer != NULL && monitor) {
@@ -673,19 +673,23 @@ pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topi
                 psa_zmq_subscriber_metrics_entry_t *metrics = hashMapIterator_nextValue(&iter3);
                 result->msgTypes[i].typeId = metrics->msgTypeId;
                 pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)metrics->msgTypeId);
-                snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
-                uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin);
-                result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived;
-                result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors;
-                result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
-                result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
-                result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
-                result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
-                result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
-                result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
-                result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
-
-                k += 1;
+		if (msgSer) {
+                    snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
+                    uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin);
+                    result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived;
+                    result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors;
+                    result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
+                    result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
+                    result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
+                    result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+                    result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
+                    result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
+                    result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
+
+                    k += 1;
+                } else {
+                    L_WARN("[PSA_ZMQ]: Error cannot find key 0x%X in msg map during metrics collection!\n", metrics->msgTypeId);
+                }
             }
             i +=1 ;
         }
@@ -741,4 +745,4 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
         receiver->subscribers.allInitialized = allInitialized;
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index abc8687..be18131 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -1124,7 +1124,7 @@ static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *
                 const char *bndName = NULL;
                 celix_bundleContext_useBundle(manager->context, sm->msgMetrics->bndId, &bndName, fetchBundleName);
                 fprintf(os, "   |- Message '%s' from bundle '%s' (%li):\n", sm->msgMetrics[j].typeFqn, bndName, sm->msgMetrics->bndId);
-                fprintf(os, "      |- msg type = %i\n", sm->msgMetrics[j].typeId);
+                fprintf(os, "      |- msg type = 0x%X\n", sm->msgMetrics[j].typeId);
                 fprintf(os, "      |- send count = %li\n", sm->msgMetrics[j].nrOfMessagesSend);
                 fprintf(os, "      |- fail count = %li\n", sm->msgMetrics[j].nrOfMessagesSendFailed);
                 fprintf(os, "      |- serialization failed = %li\n", sm->msgMetrics[j].nrOfSerializationErrors);
@@ -1177,4 +1177,4 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char *commandLi
     } else { //default
         return pubsub_topologyManager_topology(manager, commandLine, os, errorStream);
     }
-}
\ No newline at end of file
+}