You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/10/12 17:10:05 UTC
celix git commit: Report error to user if serializaton fails
Repository: celix
Updated Branches:
refs/heads/develop ca2d0901c -> 7b4433ba9
Report error to user if serializaton fails
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7b4433ba
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7b4433ba
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7b4433ba
Branch: refs/heads/develop
Commit: 7b4433ba9e8503076f32f9e8e42510441d77c2cb
Parents: ca2d090
Author: Erjan Altena <er...@gmail.com>
Authored: Fri Oct 12 19:07:43 2018 +0200
Committer: Erjan Altena <er...@gmail.com>
Committed: Fri Oct 12 19:09:39 2018 +0200
----------------------------------------------------------------------
.../src/pubsub_udpmc_topic_sender.c | 48 +++++-----
.../src/pubsub_zmq_topic_sender.c | 97 +++++++++++---------
2 files changed, 78 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/7b4433ba/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 64d3d59..59c809c 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -254,35 +254,39 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
if (msgSer != NULL) {
int major=0, minor=0;
- pubsub_msg_header_t *msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
- strncpy(msg_hdr->topic,entry->parent->topic,MAX_TOPIC_LEN-1);
- msg_hdr->type = msgTypeId;
+ void* serializedOutput = NULL;
+ size_t serializedOutputLen = 0;
+ if (msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen) == CELIX_SUCCESS) {
+ pubsub_msg_header_t *msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
+ strncpy(msg_hdr->topic,entry->parent->topic,MAX_TOPIC_LEN-1);
+ msg_hdr->type = msgTypeId;
- if (msgSer->msgVersion != NULL){
- version_getMajor(msgSer->msgVersion, &major);
- version_getMinor(msgSer->msgVersion, &minor);
- msg_hdr->major = major;
- msg_hdr->minor = minor;
- }
- void* serializedOutput = NULL;
- size_t serializedOutputLen = 0;
- msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+ if (msgSer->msgVersion != NULL){
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
+ msg_hdr->major = major;
+ msg_hdr->minor = minor;
+ }
- pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
- msg->header = msg_hdr;
- msg->payload = (char*)serializedOutput;
- msg->payloadSize = serializedOutputLen;
+ pubsub_msg_t *msg = calloc(1, sizeof(pubsub_msg_t));
+ msg->header = msg_hdr;
+ msg->payload = (char *) serializedOutput;
+ msg->payloadSize = serializedOutputLen;
- if(psa_udpmc_sendMsg(entry, msg,true, NULL) == false) {
- status = -1;
- }
- free(msg_hdr);
- free(msg);
- free(serializedOutput);
+ if (psa_udpmc_sendMsg(entry, msg, true, NULL) == false) {
+ status = -1;
+ }
+ free(msg);
+ free(msg_hdr);
+ free(serializedOutput);
+ } else {
+ printf("[PSA_UDPMC/TopicSender] Serialization of msg type id %d failed\n", msgTypeId);
+ status=-1;
+ }
} else {
printf("[PSA_UDPMC/TopicSender] No msg serializer available for msg type id %d\n", msgTypeId);
http://git-wip-us.apache.org/repos/asf/celix/blob/7b4433ba/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index dc735c2..ad36820 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -443,55 +443,62 @@ static int psa_zmq_topicPublicationSendMultipart(void *handle, unsigned int msgT
void *serializedOutput = NULL;
size_t serializedOutputLen = 0;
- msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
-
- pubsub_msg_t *msg = calloc(1,sizeof(struct pubsub_msg));
- msg->header = msg_hdr;
- msg->payload = (char*)serializedOutput;
- msg->payloadSize = (int)serializedOutputLen;
- bool snd = true;
-
- switch(flags) {
- case PUBSUB_PUBLISHER_FIRST_MSG:
- bound->multipart.inProgress = true;
- celix_arrayList_add(bound->multipart.parts, msg);
- break;
- case PUBSUB_PUBLISHER_PART_MSG:
- if(!bound->multipart.inProgress){
- L_INFO("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n");
- status = -4;
- }
- else{
+ status = msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+ if(status == CELIX_SUCCESS) {
+ pubsub_msg_t *msg = calloc(1, sizeof(struct pubsub_msg));
+ msg->header = msg_hdr;
+ msg->payload = (char *) serializedOutput;
+ msg->payloadSize = (int) serializedOutputLen;
+ bool snd = true;
+
+ switch (flags) {
+ case PUBSUB_PUBLISHER_FIRST_MSG:
+ bound->multipart.inProgress = true;
celix_arrayList_add(bound->multipart.parts, msg);
- }
- break;
- case PUBSUB_PUBLISHER_LAST_MSG:
- if(!bound->multipart.inProgress){
- L_INFO("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n");
+ break;
+ case PUBSUB_PUBLISHER_PART_MSG:
+ if (!bound->multipart.inProgress) {
+ L_INFO("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n");
+ status = -4;
+ } else {
+ celix_arrayList_add(bound->multipart.parts, msg);
+ }
+ break;
+ case PUBSUB_PUBLISHER_LAST_MSG:
+ if (!bound->multipart.inProgress) {
+ L_INFO("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n");
+ status = -4;
+ } else {
+ celix_arrayList_add(bound->multipart.parts, msg);
+ snd = psa_zmq_sendMsgParts(bound->parent, bound->multipart.parts);
+ bound->multipart.inProgress = false;
+ assert(celix_arrayList_size(bound->multipart.parts) == 0); //should be cleanup by sendMsg
+ }
+ break;
+ case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case
+ snd = psa_zmq_sendMsg(bound->parent, msg, true);
+ break;
+ default:
+ L_INFO("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
status = -4;
- }
- else{
- celix_arrayList_add(bound->multipart.parts, msg);
- snd = psa_zmq_sendMsgParts(bound->parent, bound->multipart.parts);
- bound->multipart.inProgress = false;
- assert(celix_arrayList_size(bound->multipart.parts) == 0); //should be cleanup by sendMsg
- }
- break;
- case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case
- snd = psa_zmq_sendMsg(bound->parent, msg, true);
- break;
- default:
- L_INFO("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
- status = -4;
- break;
- }
+ break;
+ }
- if (status==-4) {
- free(msg);
- }
+ if (status == -4) {
+ free(msg);
+ }
- if (!snd) {
- L_WARN("[PSA_ZMQ] Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
+ if (!snd) {
+ L_WARN("[PSA_ZMQ] Failed to send %s message %u.\n",
+ flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart",
+ msgTypeId);
+ status = -1;
+ }
+ } else {
+ L_WARN("[PSA_ZMQ] Failed to serialize %s message %u.\n",
+ flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart",
+ msgTypeId);
+ status = -1;
}
} else {