You are viewing a plain text version of this content. The canonical link for it is here.
Posted to proton@qpid.apache.org by Garlapati Sreeram Kumar <sr...@live.com> on 2016/02/17 06:05:01 UTC

Proton-j: Receiver API Contracts

Hello Folks,

Please help me understand few concepts on the usage of proton-j/reactor API:

While using the receiver – I am overriding BaseHandler.onDelivery event-callback to receive a batch of Messages. I am proactively flushing all available deliveries that comes as a batch – if available. Here’s pseudo-code of what I am doing – pls. correct if the intent is right. We are running into blocked Receiver when we are running 1 receiver per connection (using reactor framework) – does the below code ring any bells ?

       	while (delivery != null && delivery.isReadable() && !delivery.isPartial())
        	{    
        		int size = delivery.pending();
            		byte[] buffer = new byte[size];
            		int read = receiveLink.recv(buffer, 0, buffer.length);
            
            		Message msg = Proton.message();
            		msg.decode(buffer, 0, read);
            
            		messages.add(msg);
            		deliveries.add(delivery);
            
            		if (!receiveLink.advance())
           		 {
            			break;
           		 }
           		 else
           		 {
            			delivery = receiveLink.current();
           		 }
        	}
	application.onReceiveCallBack(messages);
	for(Delivery unsettledDelivery: deliveries)
        	{
        		unsettledDelivery.settle();
        	}
	
- Qstn: on a Receiver, Under what circumstances will a delivery be not readable ?        	

Thanks a lot for the Collaboration effort!
Sree


Re: Proton-j: Receiver API Contracts

Posted by Robbie Gemmell <ro...@gmail.com>.
On 17 February 2016 at 05:05, Garlapati Sreeram Kumar <sr...@live.com> wrote:
> Hello Folks,
>
> Please help me understand few concepts on the usage of proton-j/reactor API:
>
> While using the receiver – I am overriding BaseHandler.onDelivery event-callback to receive a batch of Messages. I am proactively flushing all available deliveries that comes as a batch – if available. Here’s pseudo-code of what I am doing – pls. correct if the intent is right. We are running into blocked Receiver when we are running 1 receiver per connection (using reactor framework) – does the below code ring any bells ?
>
>         while (delivery != null && delivery.isReadable() && !delivery.isPartial())
>                 {
>                         int size = delivery.pending();
>                         byte[] buffer = new byte[size];
>                         int read = receiveLink.recv(buffer, 0, buffer.length);
>
>                         Message msg = Proton.message();
>                         msg.decode(buffer, 0, read);
>
>                         messages.add(msg);
>                         deliveries.add(delivery);
>
>                         if (!receiveLink.advance())
>                          {
>                                 break;
>                          }
>                          else
>                          {
>                                 delivery = receiveLink.current();
>                          }
>                 }
>         application.onReceiveCallBack(messages);
>         for(Delivery unsettledDelivery: deliveries)
>                 {
>                         unsettledDelivery.settle();
>                 }
>
> - Qstn: on a Receiver, Under what circumstances will a delivery be not readable ?
>
> Thanks a lot for the Collaboration effort!
> Sree
>


The above doesnt look unreasonable for the most part, assuming a line
before what is shown is calling receiveLink.current(). It does look
like you dont apply any accepted/rejected/etc disposition to the
deliveries before settling, which may or may not cause issue for the
peer.

I'm not clear on what you mean exactly by a 'blocked reciever'
however. Is it not recieving messages you think it should? Not sending
disposition for messages you think it should? Not flowing new credit?

>From looking at the code, the delivery "isReadable()" returns true
when it the Dellivery was for a receiver link, and is the 'current'
delivery for that link, meaning that you can call receiver.rcv(..) to
grab its related content. A Delivery might not be readable if e.g the
receiver has already been advanced to the next delivery and it is no
longer the 'current' delivery, or perhaps the delivery event
corresponded to a disposition change sent by the peer after the
delviery content had been previously read and the link advanced.

Robbie