You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celix.apache.org by GitBox <gi...@apache.org> on 2020/12/17 14:29:37 UTC

[GitHub] [celix] pnoltes commented on a change in pull request #279: Feature/tcp admin msg segmentation

pnoltes commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r545132329



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -559,31 +549,39 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);

Review comment:
       The interceptor can be used monitor (pre/post) pubsub. 
   For example to observe the the recieve processing time by writting the difftime using a pre and postReceive interceptor call. 
   
   The interceptors do not need the actual subscriber service. 
   Although the wish to intercept the usage of a pubsub receive was also discussed, this is more adapter than interceptor functionality. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org