You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2019/02/11 12:42:07 UTC
[celix] branch feature/CELIX-459-pubsub-hums updated: CELIX-460:
Fixed signed/unsigned mismatch in pubsub which caused missing messages
This is an automated email from the ASF dual-hosted git repository.
erjanaltena pushed a commit to branch feature/CELIX-459-pubsub-hums
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/CELIX-459-pubsub-hums by this push:
new 855c5e4 CELIX-460: Fixed signed/unsigned mismatch in pubsub which caused missing messages
855c5e4 is described below
commit 855c5e4e7232e86544d43a05d6c9de5de9dc37b4
Author: Erjan Altena <er...@gmail.com>
AuthorDate: Mon Feb 11 13:41:57 2019 +0100
CELIX-460: Fixed signed/unsigned mismatch in pubsub which caused missing messages
---
.../pubsub_admin_zmq/src/pubsub_zmq_common.c | 6 +--
.../pubsub_admin_zmq/src/pubsub_zmq_common.h | 12 +++---
.../src/pubsub_zmq_topic_receiver.c | 44 ++++++++++++----------
.../src/pubsub_topology_manager.c | 4 +-
4 files changed, 35 insertions(+), 31 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
index e77398c..0854a93 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
@@ -60,12 +60,12 @@ void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char *
}
}
-static int readInt(const unsigned char *data, int offset, int32_t *val) {
+static int readInt(const unsigned char *data, int offset, uint32_t *val) {
*val = ((data[offset+0] << 24) | (data[offset+1] << 16) | (data[offset+2] << 8) | (data[offset+3] << 0));
return offset + 4;
}
-static int readLong(const unsigned char *data, int offset, int64_t *val) {
+static int readLong(const unsigned char *data, int offset, uint64_t *val) {
*val = (
((int64_t)data[offset+0] << 56) |
((int64_t)data[offset+1] << 48) |
@@ -134,4 +134,4 @@ void psa_zmq_encodeHeader(const pubsub_zmq_msg_header_t *msgHeader, unsigned cha
index += 16;
index = writeLong(data, index, msgHeader->sendtimeSeconds);
writeLong(data, index, msgHeader->sendTimeNanoseconds);
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
index 8a4c7ff..3e0e52f 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
@@ -38,13 +38,13 @@
struct pubsub_zmq_msg_header {
- int32_t type; //msg type id (hash of fqn)
- int8_t major;
- int8_t minor;
- int32_t seqNr;
+ uint32_t type; //msg type id (hash of fqn)
+ uint8_t major;
+ uint8_t minor;
+ uint32_t seqNr;
unsigned char originUUID[16];
- int64_t sendtimeSeconds; //seconds since epoch
- int64_t sendTimeNanoseconds; //ns since epoch
+ uint64_t sendtimeSeconds; //seconds since epoch
+ uint64_t sendTimeNanoseconds; //ns since epoch
};
typedef struct pubsub_zmq_msg_header pubsub_zmq_msg_header_t;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 3790bdb..b8a5cc7 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -90,19 +90,19 @@ typedef struct psa_zmq_requested_connection_entry {
} psa_zmq_requested_connection_entry_t;
typedef struct psa_zmq_subscriber_metrics_entry_t {
- int msgTypeId;
+ unsigned int msgTypeId;
uuid_t origin;
- long nrOfMessagesReceived;
- long nrOfSerializationErrors;
+ unsigned long nrOfMessagesReceived;
+ unsigned long nrOfSerializationErrors;
struct timespec lastMessageReceived;
double averageTimeBetweenMessagesInSeconds;
double averageSerializationTimeInSeconds;
double averageDelayInSeconds;
double maxDelayInSeconds;
double minDelayInSeconds;
- int lastSeqNr;
- long nrOfMissingSeqNumbers;
+ unsigned int lastSeqNr;
+ unsigned long nrOfMissingSeqNumbers;
} psa_zmq_subscriber_metrics_entry_t;
typedef struct psa_zmq_subscriber_entry {
@@ -495,7 +495,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
}
}
} else {
- L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id %i", hdr->type);
+ L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", hdr->type);
}
if (msgSer != NULL && monitor) {
@@ -673,19 +673,23 @@ pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topi
psa_zmq_subscriber_metrics_entry_t *metrics = hashMapIterator_nextValue(&iter3);
result->msgTypes[i].typeId = metrics->msgTypeId;
pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)metrics->msgTypeId);
- snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
- uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin);
- result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived;
- result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors;
- result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
- result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
- result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
- result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
- result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
- result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
- result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
-
- k += 1;
+ if (msgSer) {
+ snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
+ uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin);
+ result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived;
+ result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors;
+ result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
+ result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
+ result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
+ result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+ result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
+ result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
+ result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
+
+ k += 1;
+ } else {
+ L_WARN("[PSA_ZMQ]: Error cannot find key 0x%X in msg map during metrics collection!\n", metrics->msgTypeId);
+ }
}
i +=1 ;
}
@@ -741,4 +745,4 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
receiver->subscribers.allInitialized = allInitialized;
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index abc8687..be18131 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -1124,7 +1124,7 @@ static celix_status_t pubsub_topologyManager_metrics(pubsub_topology_manager_t *
const char *bndName = NULL;
celix_bundleContext_useBundle(manager->context, sm->msgMetrics->bndId, &bndName, fetchBundleName);
fprintf(os, " |- Message '%s' from bundle '%s' (%li):\n", sm->msgMetrics[j].typeFqn, bndName, sm->msgMetrics->bndId);
- fprintf(os, " |- msg type = %i\n", sm->msgMetrics[j].typeId);
+ fprintf(os, " |- msg type = 0x%X\n", sm->msgMetrics[j].typeId);
fprintf(os, " |- send count = %li\n", sm->msgMetrics[j].nrOfMessagesSend);
fprintf(os, " |- fail count = %li\n", sm->msgMetrics[j].nrOfMessagesSendFailed);
fprintf(os, " |- serialization failed = %li\n", sm->msgMetrics[j].nrOfSerializationErrors);
@@ -1177,4 +1177,4 @@ celix_status_t pubsub_topologyManager_shellCommand(void *handle, char *commandLi
} else { //default
return pubsub_topologyManager_topology(manager, commandLine, os, errorStream);
}
-}
\ No newline at end of file
+}