You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2019/01/28 20:03:14 UTC

[celix] branch develop updated: CELIX-454: Enables JSON serializer logging in the json pubsub serializer and update the PSA ZMQ implementation so that it works with the INAETICS java pubsubadmin

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new 69795da  CELIX-454: Enables JSON serializer logging in the json pubsub serializer and update the PSA ZMQ implementation so that it works with the INAETICS java pubsubadmin
69795da is described below

commit 69795dac5ce5646062167f194d0ded9dbc89b798
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jan 28 21:02:05 2019 +0100

    CELIX-454: Enables JSON serializer logging in the json pubsub serializer and update the PSA ZMQ implementation so that it works with the INAETICS java pubsubadmin
---
 .../pubsub/msg_descriptors/msg_poi1.descriptor     |  2 +-
 .../pubsub/msg_descriptors/msg_poi2.descriptor     |  2 +-
 .../subscriber/private/src/pubsub_subscriber.c     |  3 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_common.c       | 32 ++++++++++++++++++++++
 .../pubsub_admin_zmq/src/pubsub_zmq_common.h       |  4 ++-
 .../src/pubsub_zmq_topic_receiver.c                |  4 ++-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c |  9 +++---
 .../src/pubsub_serializer_impl.c                   | 16 +++++++++++
 libs/dfi/src/json_serializer.c                     |  4 ++-
 9 files changed, 65 insertions(+), 11 deletions(-)

diff --git a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
index e547b62..367bf34 100644
--- a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
+++ b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
@@ -5,6 +5,6 @@ version=1.0.0
 :annotations
 classname=org.example.PointOfInterest
 :types
-location={DD lat long}
+location={DD lat lon}
 :message
 {llocation;tttt location name description extra data}
diff --git a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
index 0c369b5..5d9504a 100644
--- a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+++ b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
@@ -5,6 +5,6 @@ version=1.0.0
 :annotations
 classname=org.example.PointOfInterest
 :types
-location={DD lat long}
+location={DD lat lon}
 :message
 {llocation;tttt location name description extra data}
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index 7cfbedb..34a1c59 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -56,8 +56,7 @@ void subscriber_destroy(pubsub_receiver_pt subscriber){
 int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release){
 
 	location_t place = (location_t)msg;
-	int nrchars = 25;
-	printf("Recv (%s): [%f, %f] (%s, %s) data_len = %ld data =%*.*s\n",msgType, place->position.lat, place->position.lon,place->name,place->description, strlen(place->data) + 1, nrchars, nrchars, place->data);
+	printf("Recv (%s): [%f, %f] (%s, %s, %s, %s)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, place->data);
 
 	return 0;
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
index fe01433..f65565c 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
@@ -29,6 +29,11 @@ bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t *
     bool check=false;
     int major=0,minor=0;
 
+    if (hdr->major == 0 && hdr->minor == 0) {
+        //no check
+        return true;
+    }
+
     if (msgVersion!=NULL) {
         version_getMajor(msgVersion,&major);
         version_getMinor(msgVersion,&minor);
@@ -52,4 +57,31 @@ void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char *
         filter[2] = topic[0];
         filter[3] = topic[1];
     }
+}
+
+
+celix_status_t psa_zmq_decodeHeader(const unsigned char *data, size_t dataLen, pubsub_zmq_msg_header_t *header) {
+    int status = CELIX_ILLEGAL_ARGUMENT;
+    if (dataLen == 6) {
+        header->type = ((data[0] << 24) | (data[1] << 16) | (data[2] << 8) | (data[3] << 0));
+        header->major = (unsigned char) data[4];
+        header->minor = (unsigned char) data[5];
+        status = CELIX_SUCCESS;
+    }
+    return status;
+}
+
+celix_status_t psa_zmq_encodeHeader(const pubsub_zmq_msg_header_t *msgHeader, unsigned char *data, size_t dataLen) {
+    int status = CELIX_ILLEGAL_ARGUMENT;
+    if (dataLen == 6) {
+        unsigned int tmp = msgHeader->type & 0xFFFFFFFF;
+        data[0] = (unsigned char)((tmp >> 24) & 0xFF);
+        data[1] = (unsigned char)((tmp >> 16) & 0xFF);
+        data[2] = (unsigned char)((tmp >> 8) & 0xFF);
+        data[3] = (unsigned char)((tmp >> 0) & 0xFF);
+        data[4] = (char)msgHeader->major;
+        data[5] = (char)msgHeader->minor;
+        status = CELIX_SUCCESS;
+    }
+    return status;
 }
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
index ef4a896..ce71ce5 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
@@ -39,7 +39,7 @@
 
 struct pubsub_zmq_msg_header {
     //header
-    unsigned int type;
+    int type;
     unsigned char major;
     unsigned char minor;
 };
@@ -52,5 +52,7 @@ void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char *
 
 bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t *hdr);
 
+celix_status_t psa_zmq_decodeHeader(const unsigned char *data, size_t dataLen, pubsub_zmq_msg_header_t *header);
+celix_status_t psa_zmq_encodeHeader(const pubsub_zmq_msg_header_t *msgHeader, unsigned char *data, size_t dataLen);
 
 #endif //CELIX_PUBSUB_ZMQ_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 6886109..aef6c3c 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -478,7 +478,9 @@ static void* psa_zmq_recvThread(void * data) {
                 zframe_t *header = zmsg_pop(zmsg); //pubsub_zmq_msg_header_t
                 zframe_t *payload = zmsg_pop(zmsg); //serialized payload
                 if (header != NULL && payload != NULL) {
-                    processMsg(receiver, (pubsub_zmq_msg_header_t*)zframe_data(header), zframe_data(payload), zframe_size(payload));
+                    pubsub_zmq_msg_header_t msgHeader;
+                    psa_zmq_decodeHeader(zframe_data(header), zframe_size(header), &msgHeader);
+                    processMsg(receiver, &msgHeader, zframe_data(payload), zframe_size(payload));
                 }
                 zframe_destroy(&filter);
                 zframe_destroy(&header);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index d87412e..7249a45 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -373,15 +373,16 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
 
         int major = 0, minor = 0;
 
-        pubsub_zmq_msg_header_t msg_hdr;// = calloc(1, sizeof(*msg_hdr));
-        msg_hdr.type = msgTypeId;
-
+        pubsub_zmq_msg_header_t msg_hdr;
+        msg_hdr.type = (int)msgTypeId;
         if (msgSer->msgVersion != NULL) {
             version_getMajor(msgSer->msgVersion, &major);
             version_getMinor(msgSer->msgVersion, &minor);
             msg_hdr.major = (unsigned char) major;
             msg_hdr.minor = (unsigned char) minor;
         }
+        unsigned char hdr[6];
+        psa_zmq_encodeHeader(&msg_hdr, hdr, 6);
 
         void *serializedOutput = NULL;
         size_t serializedOutputLen = 0;
@@ -391,7 +392,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
             //TODO revert to use zmq_msg_init_data (or something like that) for zero copy for the payload
             //TODO remove socket mutex .. not needed (initialized during creation)
             zmsg_addstr(msg, sender->scopeAndTopicFilter);
-            zmsg_addmem(msg, &msg_hdr, sizeof(msg_hdr));
+            zmsg_addmem(msg, &hdr, 6);
             zmsg_addmem(msg, serializedOutput, serializedOutputLen);
             errno = 0;
             int rc = zmsg_send(&msg, sender->zmq.socket);
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
index 2569ee7..9c84e9b 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
@@ -61,6 +61,18 @@ static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
 static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
 static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle_pt bundle);
 
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+	va_list ap;
+	pubsub_json_serializer_t *serializer = handle;
+	char *logStr = NULL;
+	va_start(ap, msg);
+	vasprintf(&logStr, msg, ap);
+	va_end(ap);
+	logHelper_log(serializer->loghelper, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+	free(logStr);
+}
+
 celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_json_serializer_t** serializer) {
 	celix_status_t status = CELIX_SUCCESS;
 
@@ -75,6 +87,10 @@ celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_json_se
 
 		if (logHelper_create(context, &(*serializer)->loghelper) == CELIX_SUCCESS) {
 			logHelper_start((*serializer)->loghelper);
+			jsonSerializer_logSetup(dfi_log, (*serializer), 1);
+			dynFunction_logSetup(dfi_log, (*serializer), 1);
+			dynType_logSetup(dfi_log, (*serializer), 1);
+			dynCommon_logSetup(dfi_log, (*serializer), 1);
 		}
 
 	}
diff --git a/libs/dfi/src/json_serializer.c b/libs/dfi/src/json_serializer.c
index 4b78aeb..106c745 100644
--- a/libs/dfi/src/json_serializer.c
+++ b/libs/dfi/src/json_serializer.c
@@ -215,7 +215,9 @@ static int jsonSerializer_parseAny(dyn_type *type, void *loc, json_t *val) {
             *ul = (uint64_t) json_integer_value(val);
             break;
         case 't' :
-            if (json_is_string(val)) {
+            if (json_is_null(val)) {
+                //nop
+            } else if (json_is_string(val)) {
                 dynType_text_allocAndInit(type, loc, json_string_value(val));
             } else {
                 status = ERROR;