You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Michael Ivanov <iv...@logit-ag.de> on 2015/02/02 10:46:31 UTC

proton reply handling again

Hallo!

I am implementing message handling event loop using proton library.
I create a pm_messenger, subscribe to several incoming queues and wait
for input. I also need to send outgoing messages when handling incoming
ones and for some of the outgoing messages I want to get an immediate
reply (which has to be received outside of the main event loop). To get
a reply I use a temporary queue (created using "#" token). As far as I
understand I should not subscribe to this queue in my primary messenger,
since I cannot temporary suspend or cancel other subscriptions, which
have to be handled in primary event loop. So at the start of a process,
I create a separate pn_messenger for immediate replies, subscribe it
to the temporary queue and use wherever I need the reply. Can you
confirm that this second messenger will not conflict with the primary
one, in particular that when I read the reples the input pending for
the queues to which the primary messenger is subscribed will not be
affected in any way?

Another issue: as I can see neither pm_messenger_recv nor pn_messenger_get
hae a timeout option. Do I miss sthing or to get a timeout for message
input I need to use a selectables together with poll or select syscall?

Best regards,
-- 
 \   / |			           |
 (OvO) |  Mikhail Iwanow                   |
 (^^^) |      Voice:   +7 (911) 223-1300   |
  \^/  |      E-mail:  ivans@logit-ag.de   |
  ^ ^  |                                   |

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: proton reply handling again

Posted by Michael Ivanov <iv...@isle.spb.ru>.
Many thanks, the queue name without host address worked!

18.02.2015 14:18, Gordon Sim пишет:
> On 02/18/2015 11:03 AM, Michael Ivanov wrote:
>> Ok I have run my test program with PN_TRACE_FRM set to 1. The program sends a request
>> to qpidd to create a queue TESTQUEUE. As can be seen the queue is actually created but
>> temporary reply queue has dissappeared right after send operation.
>>
>> The output is as follows:
>>
>> 18-Feb-2015 13:53:45.973 @D ######################## EVENTLOOP RUN ########################
>>>> A TESTQUEUE
>> [0x177f450]:  -> SASL
>> [0x177f450]:0 -> @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b""]
>> [0x177f450]:  <- SASL
>> [0x177f450]:0 <- @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]]
>> [0x177f450]:0 <- @sasl-outcome(68) [code=0]
>> [0x177f450]:  <- AMQP
>> [0x177f450]:  -> AMQP
>> [0x177f450]:0 -> @open(16) [container-id="2a4b9ebb-a82c-4f01-9ede-a0259d1f134b", hostname="127.0.0.1"]
>> [0x177f450]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
>> [0x177f450]:0 -> @attach(18) [name="receiver-xxx", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
>> [durable=0, timeout=0, dynamic=true], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
>> [0x177f450]:0 <- @open(16) [container-id="9d6de7ab-a0ac-43fb-aee4-e43ec4701ef5"]
>> [0x177f450]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
>> [0x177f450]:0 <- @attach(18) [name="receiver-xxx", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0,
>> source=@source(40) [address="2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx", durable=0, timeout=0, dynamic=false],
>> target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
>>
>> 18-Feb-2015 13:54:07.826 @D SUBSCRIPTION ADDRESS: [amqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx]
>> Queues
>>    queue                                              dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons 
>> bind
>>   
>> ==================================================================================================================================
>>
>>    2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx       Y                 0     0      0       0      0        0        
>> 1     1
>>    773cc3a0-4c56-456c-9fb2-4b1b7fafa319:0.0                Y        Y        0     0      0       0      0        0        
>> 1     2
>>
>> 18-Feb-2015 13:54:07.885 @D SUBSCRIBED TO: [amqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx]
>> Queues
>>    queue                                              dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons 
>> bind
>>   
>> ==================================================================================================================================
>>
>>    06d21658-efe5-4eca-8044-2bacf54ec5b5:0.0                Y        Y        0     0      0       0      0        0        
>> 1     2
>>    2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx       Y                 0     0      0       0      0        0        
>> 1     1
>>
>> 18-Feb-2015 13:54:07.940 @D SEND message to [amqp://127.0.0.1:5672/qmf.default.direct], reply to
>> [amqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx]:
>> 18-Feb-2015 13:54:07.940 @D   PROPERTIES: {"method"="request", "qmf.opcode"="_method_request", "x-amqp-0-10.app-id"="qmf2"}
>> 18-Feb-2015 13:54:07.940 @D   DATA:       {"_object_id"={"_object_name"="org.apache.qpid.broker:broker:amqp-broker"},
>> "_method_name"="create", "_arguments"={"type"="queue", "name"="TESTQUEUE", "properties"={"durable"=true}, "strict"=true}}
>> [0x177f450]:1 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=1]
>> [0x177f450]:1 -> @attach(18) [name="sender-xxx", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
>> [address="qmf.default.direct", durable=0, timeout=0, dynamic=false], target=@target(41) [address="qmf.default.direct",
>> durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
>> [0x177f450]:1 <- @begin(17) [remote-channel=1, next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
>> [0x177f450]:1 <- @attach(18) [name="sender-xxx", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
>> [durable=0, timeout=0, dynamic=false], target=@target(41) [address="qmf.default.direct", durable=0, timeout=0, dynamic=false],
>> initial-delivery-count=0]
>> [0x177f450]:1 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0,
>> delivery-count=0, link-credit=100, drain=false]
>> [0x177f450]:1 -> @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x00", message-format=0,
>> settled=true, more=false] (472)
>> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00\x9e\x00\x00\x00\x0d@@\xa1(amqp://127.0.0.1:5672/qmf.default.direct\xa1\x06broker\xa1Gamqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxxq\x00\x00\x00\x01@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R\x00@\x00St\xd1\x00\x00\x00L\x00\x00\x00\x06\xa1\x06method\xa1\x07request\xa1\x0aqmf.opcode\xa1\x0f_method_request\xa1\x12x-amqp-0-10.app-id\xa1\x04qmf2\x00Sw\xd1\x00\x00\x00\xc3\x00\x00\x00\x06\xa1\x0a_object_id\xd1\x00\x00\x00=\x00\x00\x00\x02\xa1\x0c_object_name\xa1)org.apache.qpid.broker:broker:amqp-broker\xa1\x0c_method_name\xa1\x06create\xa1\x0a_arguments\xd1\x00\x00\x00J\x00\x00\x00\x08\xa1\x04type\xa1\x05queue\xa1\x04name\xa1\x09TESTQUEUE\xa1\x0aproperties\xd1\x00\x00\x00\x0e\x00\x00\x00\x02\xa1\x07durableA\xa1\x06strictA"
>>
> 
> The problem is that the reply-to is set to be the full url. It need to only be the 'path' portion of that. (You could use
> proton/url.h to separate out the path)
> 
> The broker is trying to reply to the address given, but failing to do so and closing the connection as a result. Which means the
> associated temp queue goes away also.
> 
>>
>> 18-Feb-2015 13:54:07.940 @D REQUEST SENT
>> Queues
>>    queue                                     dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons  bind
>>    =========================================================================================================================
>>    302effc8-e870-46db-a37c-13dfd4b8d108:0.0       Y        Y        0     0      0       0      0        0         1     2
>>    TESTQUEUE                                 Y                      0     0      0       0      0        0         0     1
>>
>> [0x177f450]:0 -> @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0,
>> delivery-count=0, link-credit=1, drain=false]
>> [0x177f450]:0 <- @close(24) [error=@error(29) [condition=:"amqp:internal-error", description="not-found: Exchange not found:
>> amqp: (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)"]]
>> [0x177f450]:  <- EOS
>> CONNECTION ERROR (amqp:internal-error) not-found: Exchange not found: amqp:
>> (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)
>> [0x177f450]:0 -> @close(24) []
>> [0x177f450]:  -> EOS
>> [0x177f450]:  -> EOS
>> [0x177f450]:  -> EOS
>> 18-Feb-2015 13:54:07.995 @E Error reading messages: no valid sources
>>
>> Best regards,
>>
>> 18.02.2015 12:37, Gordon Sim пишет:
>>> On 02/18/2015 08:11 AM, Michael Ivanov wrote:
>>>> Sorry I still cannot get the reply working :-(
>>>>
>>>> I do the following:
>>>>
>>>>      _sender messenger is created.
>>>>      I create a subscription to "amqp://127.0.0.1/#" for this messenger and
>>>>      keep the reply address queue:
>>>>
>>>>          s = pn_messenger_subscribe(_sender, "amqp://127.0.0.1/#"));
>>>>         _reply_addr = pn_subscription_address(s);
>>>>
>>>>      At this moment I check the qpidd state with qpid-stat -q and verify
>>>>      that temporary queue is created.
>>>>
>>>>      At some later tome I prepare the message and set a reply address to is
>>>>      as follows:
>>>>
>>>>         pn_message_set_reply_to(message, _reply_addr);
>>>>
>>>>      I verify again with qpid-stat -q, reply queue is still there.
>>>>
>>>>      Now I assign the target address to message and send it:
>>>>
>>>>         pn_messenger_set_timeout(_sender, timeout);
>>>>         pn_messenger_put(_sender, msg);
>>>>         pn_messenger_send(_sender, -1);
>>>>
>>>>      Immediately before pn_messenger_set_timeout I print the value of
>>>>      pn_message_get_reply_to() and it is correct.
>>>>
>>>>      After messenger send I check again with qpid-stat -q and I see that
>>>>      the reply queue is gone! Btw, the message is received and executed
>>>>      by broker correctly (the actual message is queue create request
>>>>      directed to qmf.default.direct, the queue is created properly).
>>>>
>>>>      When I run pn_messenger_recv(_sender, 1) I get an error:
>>>>
>>>>         CONNECTION ERROR (amqp:internal-error) not-found: Exchange not found: \
>>>>            amqp: (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)
>>>>         18-Feb-2015 10:49:45.677 @E Error reading messages: no valid sources
>>>
>>> That is an error coming back from the broker and is caused by having amqp://127.0.0.1/ prefixed to the address, which the broker
>>> doesn't recognise.
>>>
>>> Can you set the env var PN_TRACE_FRM=1 before running to capture the protocol trace? That would help see where exactly the
>>> address is being used in this form over the wire. It may be that the reply-to address needs to have the prefix trimmed off
>>> first.
>>>
>>>>
>>>>      I guess this is because the reply queue has somehow dissappeared.
>>>>
>>>> What's wrong here?
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>>> For additional commands, e-mail: users-help@qpid.apache.org
>>>
>>
>>
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
> 


-- 
 \   / |			           |
 (OvO) |  Михаил Иванов                    |
 (^^^) |      Тел.:    +7(911) 223-1300    |
  \^/  |      E-mail:  ivans@isle.spb.ru   |
  ^ ^  |                                   |

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: proton reply handling again

Posted by Gordon Sim <gs...@redhat.com>.
On 02/18/2015 11:03 AM, Michael Ivanov wrote:
> Ok I have run my test program with PN_TRACE_FRM set to 1. The program sends a request
> to qpidd to create a queue TESTQUEUE. As can be seen the queue is actually created but
> temporary reply queue has dissappeared right after send operation.
>
> The output is as follows:
>
> 18-Feb-2015 13:53:45.973 @D ######################## EVENTLOOP RUN ########################
>>> A TESTQUEUE
> [0x177f450]:  -> SASL
> [0x177f450]:0 -> @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b""]
> [0x177f450]:  <- SASL
> [0x177f450]:0 <- @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]]
> [0x177f450]:0 <- @sasl-outcome(68) [code=0]
> [0x177f450]:  <- AMQP
> [0x177f450]:  -> AMQP
> [0x177f450]:0 -> @open(16) [container-id="2a4b9ebb-a82c-4f01-9ede-a0259d1f134b", hostname="127.0.0.1"]
> [0x177f450]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
> [0x177f450]:0 -> @attach(18) [name="receiver-xxx", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
> [durable=0, timeout=0, dynamic=true], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
> [0x177f450]:0 <- @open(16) [container-id="9d6de7ab-a0ac-43fb-aee4-e43ec4701ef5"]
> [0x177f450]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
> [0x177f450]:0 <- @attach(18) [name="receiver-xxx", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0,
> source=@source(40) [address="2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx", durable=0, timeout=0, dynamic=false],
> target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
>
> 18-Feb-2015 13:54:07.826 @D SUBSCRIPTION ADDRESS: [amqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx]
> Queues
>    queue                                              dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons  bind
>    ==================================================================================================================================
>    2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx       Y                 0     0      0       0      0        0         1     1
>    773cc3a0-4c56-456c-9fb2-4b1b7fafa319:0.0                Y        Y        0     0      0       0      0        0         1     2
>
> 18-Feb-2015 13:54:07.885 @D SUBSCRIBED TO: [amqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx]
> Queues
>    queue                                              dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons  bind
>    ==================================================================================================================================
>    06d21658-efe5-4eca-8044-2bacf54ec5b5:0.0                Y        Y        0     0      0       0      0        0         1     2
>    2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx       Y                 0     0      0       0      0        0         1     1
>
> 18-Feb-2015 13:54:07.940 @D SEND message to [amqp://127.0.0.1:5672/qmf.default.direct], reply to
> [amqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx]:
> 18-Feb-2015 13:54:07.940 @D   PROPERTIES: {"method"="request", "qmf.opcode"="_method_request", "x-amqp-0-10.app-id"="qmf2"}
> 18-Feb-2015 13:54:07.940 @D   DATA:       {"_object_id"={"_object_name"="org.apache.qpid.broker:broker:amqp-broker"},
> "_method_name"="create", "_arguments"={"type"="queue", "name"="TESTQUEUE", "properties"={"durable"=true}, "strict"=true}}
> [0x177f450]:1 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=1]
> [0x177f450]:1 -> @attach(18) [name="sender-xxx", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
> [address="qmf.default.direct", durable=0, timeout=0, dynamic=false], target=@target(41) [address="qmf.default.direct",
> durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
> [0x177f450]:1 <- @begin(17) [remote-channel=1, next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
> [0x177f450]:1 <- @attach(18) [name="sender-xxx", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
> [durable=0, timeout=0, dynamic=false], target=@target(41) [address="qmf.default.direct", durable=0, timeout=0, dynamic=false],
> initial-delivery-count=0]
> [0x177f450]:1 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0,
> delivery-count=0, link-credit=100, drain=false]
> [0x177f450]:1 -> @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x00", message-format=0,
> settled=true, more=false] (472)
> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00\x9e\x00\x00\x00\x0d@@\xa1(amqp://127.0.0.1:5672/qmf.default.direct\xa1\x06broker\xa1Gamqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxxq\x00\x00\x00\x01@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R\x00@\x00St\xd1\x00\x00\x00L\x00\x00\x00\x06\xa1\x06method\xa1\x07request\xa1\x0aqmf.opcode\xa1\x0f_method_request\xa1\x12x-amqp-0-10.app-id\xa1\x04qmf2\x00Sw\xd1\x00\x00\x00\xc3\x00\x00\x00\x06\xa1\x0a_object_id\xd1\x00\x00\x00=\x00\x00\x00\x02\xa1\x0c_object_name\xa1)org.apache.qpid.broker:broker:amqp-broker\xa1\x0c_method_name\xa1\x06create\xa1\x0a_arguments\xd1\x00\x00\x00J\x00\x00\x00\x08\xa1\x04type\xa1\x05queue\xa1\x04name\xa1\x09TESTQUEUE\xa1\x0aproperties\xd1\x00\x00\x00\x0e\x00\x00\x00\x02\xa1\x07durableA\xa1\x06strictA"

The problem is that the reply-to is set to be the full url. It need to 
only be the 'path' portion of that. (You could use proton/url.h to 
separate out the path)

The broker is trying to reply to the address given, but failing to do so 
and closing the connection as a result. Which means the associated temp 
queue goes away also.

>
> 18-Feb-2015 13:54:07.940 @D REQUEST SENT
> Queues
>    queue                                     dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons  bind
>    =========================================================================================================================
>    302effc8-e870-46db-a37c-13dfd4b8d108:0.0       Y        Y        0     0      0       0      0        0         1     2
>    TESTQUEUE                                 Y                      0     0      0       0      0        0         0     1
>
> [0x177f450]:0 -> @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0,
> delivery-count=0, link-credit=1, drain=false]
> [0x177f450]:0 <- @close(24) [error=@error(29) [condition=:"amqp:internal-error", description="not-found: Exchange not found:
> amqp: (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)"]]
> [0x177f450]:  <- EOS
> CONNECTION ERROR (amqp:internal-error) not-found: Exchange not found: amqp:
> (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)
> [0x177f450]:0 -> @close(24) []
> [0x177f450]:  -> EOS
> [0x177f450]:  -> EOS
> [0x177f450]:  -> EOS
> 18-Feb-2015 13:54:07.995 @E Error reading messages: no valid sources
>
> Best regards,
>
> 18.02.2015 12:37, Gordon Sim пишет:
>> On 02/18/2015 08:11 AM, Michael Ivanov wrote:
>>> Sorry I still cannot get the reply working :-(
>>>
>>> I do the following:
>>>
>>>      _sender messenger is created.
>>>      I create a subscription to "amqp://127.0.0.1/#" for this messenger and
>>>      keep the reply address queue:
>>>
>>>          s = pn_messenger_subscribe(_sender, "amqp://127.0.0.1/#"));
>>>         _reply_addr = pn_subscription_address(s);
>>>
>>>      At this moment I check the qpidd state with qpid-stat -q and verify
>>>      that temporary queue is created.
>>>
>>>      At some later tome I prepare the message and set a reply address to is
>>>      as follows:
>>>
>>>         pn_message_set_reply_to(message, _reply_addr);
>>>
>>>      I verify again with qpid-stat -q, reply queue is still there.
>>>
>>>      Now I assign the target address to message and send it:
>>>
>>>         pn_messenger_set_timeout(_sender, timeout);
>>>         pn_messenger_put(_sender, msg);
>>>         pn_messenger_send(_sender, -1);
>>>
>>>      Immediately before pn_messenger_set_timeout I print the value of
>>>      pn_message_get_reply_to() and it is correct.
>>>
>>>      After messenger send I check again with qpid-stat -q and I see that
>>>      the reply queue is gone! Btw, the message is received and executed
>>>      by broker correctly (the actual message is queue create request
>>>      directed to qmf.default.direct, the queue is created properly).
>>>
>>>      When I run pn_messenger_recv(_sender, 1) I get an error:
>>>
>>>         CONNECTION ERROR (amqp:internal-error) not-found: Exchange not found: \
>>>            amqp: (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)
>>>         18-Feb-2015 10:49:45.677 @E Error reading messages: no valid sources
>>
>> That is an error coming back from the broker and is caused by having amqp://127.0.0.1/ prefixed to the address, which the broker
>> doesn't recognise.
>>
>> Can you set the env var PN_TRACE_FRM=1 before running to capture the protocol trace? That would help see where exactly the
>> address is being used in this form over the wire. It may be that the reply-to address needs to have the prefix trimmed off first.
>>
>>>
>>>      I guess this is because the reply queue has somehow dissappeared.
>>>
>>> What's wrong here?
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: users-help@qpid.apache.org
>>
>
>


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: proton reply handling again

Posted by Michael Ivanov <iv...@logit-ag.de>.
Ok I have run my test program with PN_TRACE_FRM set to 1. The program sends a request
to qpidd to create a queue TESTQUEUE. As can be seen the queue is actually created but
temporary reply queue has dissappeared right after send operation.

The output is as follows:

18-Feb-2015 13:53:45.973 @D ######################## EVENTLOOP RUN ########################
>> A TESTQUEUE
[0x177f450]:  -> SASL
[0x177f450]:0 -> @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b""]
[0x177f450]:  <- SASL
[0x177f450]:0 <- @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]]
[0x177f450]:0 <- @sasl-outcome(68) [code=0]
[0x177f450]:  <- AMQP
[0x177f450]:  -> AMQP
[0x177f450]:0 -> @open(16) [container-id="2a4b9ebb-a82c-4f01-9ede-a0259d1f134b", hostname="127.0.0.1"]
[0x177f450]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
[0x177f450]:0 -> @attach(18) [name="receiver-xxx", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
[durable=0, timeout=0, dynamic=true], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
[0x177f450]:0 <- @open(16) [container-id="9d6de7ab-a0ac-43fb-aee4-e43ec4701ef5"]
[0x177f450]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
[0x177f450]:0 <- @attach(18) [name="receiver-xxx", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0,
source=@source(40) [address="2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx", durable=0, timeout=0, dynamic=false],
target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]

18-Feb-2015 13:54:07.826 @D SUBSCRIPTION ADDRESS: [amqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx]
Queues
  queue                                              dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons  bind
  ==================================================================================================================================
  2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx       Y                 0     0      0       0      0        0         1     1
  773cc3a0-4c56-456c-9fb2-4b1b7fafa319:0.0                Y        Y        0     0      0       0      0        0         1     2

18-Feb-2015 13:54:07.885 @D SUBSCRIBED TO: [amqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx]
Queues
  queue                                              dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons  bind
  ==================================================================================================================================
  06d21658-efe5-4eca-8044-2bacf54ec5b5:0.0                Y        Y        0     0      0       0      0        0         1     2
  2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx       Y                 0     0      0       0      0        0         1     1

18-Feb-2015 13:54:07.940 @D SEND message to [amqp://127.0.0.1:5672/qmf.default.direct], reply to
[amqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxx]:
18-Feb-2015 13:54:07.940 @D   PROPERTIES: {"method"="request", "qmf.opcode"="_method_request", "x-amqp-0-10.app-id"="qmf2"}
18-Feb-2015 13:54:07.940 @D   DATA:       {"_object_id"={"_object_name"="org.apache.qpid.broker:broker:amqp-broker"},
"_method_name"="create", "_arguments"={"type"="queue", "name"="TESTQUEUE", "properties"={"durable"=true}, "strict"=true}}
[0x177f450]:1 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=1]
[0x177f450]:1 -> @attach(18) [name="sender-xxx", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
[address="qmf.default.direct", durable=0, timeout=0, dynamic=false], target=@target(41) [address="qmf.default.direct",
durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
[0x177f450]:1 <- @begin(17) [remote-channel=1, next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
[0x177f450]:1 <- @attach(18) [name="sender-xxx", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
[durable=0, timeout=0, dynamic=false], target=@target(41) [address="qmf.default.direct", durable=0, timeout=0, dynamic=false],
initial-delivery-count=0]
[0x177f450]:1 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0,
delivery-count=0, link-credit=100, drain=false]
[0x177f450]:1 -> @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x00", message-format=0,
settled=true, more=false] (472)
"\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00\x9e\x00\x00\x00\x0d@@\xa1(amqp://127.0.0.1:5672/qmf.default.direct\xa1\x06broker\xa1Gamqp://127.0.0.1:5672/2a4b9ebb-a82c-4f01-9ede-a0259d1f134b_receiver-xxxq\x00\x00\x00\x01@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R\x00@\x00St\xd1\x00\x00\x00L\x00\x00\x00\x06\xa1\x06method\xa1\x07request\xa1\x0aqmf.opcode\xa1\x0f_method_request\xa1\x12x-amqp-0-10.app-id\xa1\x04qmf2\x00Sw\xd1\x00\x00\x00\xc3\x00\x00\x00\x06\xa1\x0a_object_id\xd1\x00\x00\x00=\x00\x00\x00\x02\xa1\x0c_object_name\xa1)org.apache.qpid.broker:broker:amqp-broker\xa1\x0c_method_name\xa1\x06create\xa1\x0a_arguments\xd1\x00\x00\x00J\x00\x00\x00\x08\xa1\x04type\xa1\x05queue\xa1\x04name\xa1\x09TESTQUEUE\xa1\x0aproperties\xd1\x00\x00\x00\x0e\x00\x00\x00\x02\xa1\x07durableA\xa1\x06strictA"

18-Feb-2015 13:54:07.940 @D REQUEST SENT
Queues
  queue                                     dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons  bind
  =========================================================================================================================
  302effc8-e870-46db-a37c-13dfd4b8d108:0.0       Y        Y        0     0      0       0      0        0         1     2
  TESTQUEUE                                 Y                      0     0      0       0      0        0         0     1

[0x177f450]:0 -> @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0,
delivery-count=0, link-credit=1, drain=false]
[0x177f450]:0 <- @close(24) [error=@error(29) [condition=:"amqp:internal-error", description="not-found: Exchange not found:
amqp: (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)"]]
[0x177f450]:  <- EOS
CONNECTION ERROR (amqp:internal-error) not-found: Exchange not found: amqp:
(/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)
[0x177f450]:0 -> @close(24) []
[0x177f450]:  -> EOS
[0x177f450]:  -> EOS
[0x177f450]:  -> EOS
18-Feb-2015 13:54:07.995 @E Error reading messages: no valid sources

Best regards,

18.02.2015 12:37, Gordon Sim пишет:
> On 02/18/2015 08:11 AM, Michael Ivanov wrote:
>> Sorry I still cannot get the reply working :-(
>>
>> I do the following:
>>
>>     _sender messenger is created.
>>     I create a subscription to "amqp://127.0.0.1/#" for this messenger and
>>     keep the reply address queue:
>>
>>         s = pn_messenger_subscribe(_sender, "amqp://127.0.0.1/#"));
>>        _reply_addr = pn_subscription_address(s);
>>
>>     At this moment I check the qpidd state with qpid-stat -q and verify
>>     that temporary queue is created.
>>
>>     At some later tome I prepare the message and set a reply address to is
>>     as follows:
>>
>>        pn_message_set_reply_to(message, _reply_addr);
>>
>>     I verify again with qpid-stat -q, reply queue is still there.
>>
>>     Now I assign the target address to message and send it:
>>
>>        pn_messenger_set_timeout(_sender, timeout);
>>        pn_messenger_put(_sender, msg);
>>        pn_messenger_send(_sender, -1);
>>
>>     Immediately before pn_messenger_set_timeout I print the value of
>>     pn_message_get_reply_to() and it is correct.
>>
>>     After messenger send I check again with qpid-stat -q and I see that
>>     the reply queue is gone! Btw, the message is received and executed
>>     by broker correctly (the actual message is queue create request
>>     directed to qmf.default.direct, the queue is created properly).
>>
>>     When I run pn_messenger_recv(_sender, 1) I get an error:
>>
>>        CONNECTION ERROR (amqp:internal-error) not-found: Exchange not found: \
>>           amqp: (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)
>>        18-Feb-2015 10:49:45.677 @E Error reading messages: no valid sources
> 
> That is an error coming back from the broker and is caused by having amqp://127.0.0.1/ prefixed to the address, which the broker
> doesn't recognise.
> 
> Can you set the env var PN_TRACE_FRM=1 before running to capture the protocol trace? That would help see where exactly the
> address is being used in this form over the wire. It may be that the reply-to address needs to have the prefix trimmed off first.
> 
>>
>>     I guess this is because the reply queue has somehow dissappeared.
>>
>> What's wrong here?
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
> 


-- 
 \   / |			           |
 (OvO) |  Mikhail Iwanow                   |
 (^^^) |      Voice:   +7 (911) 223-1300   |
  \^/  |      E-mail:  ivans@logit-ag.de   |
  ^ ^  |                                   |

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: proton reply handling again

Posted by Gordon Sim <gs...@redhat.com>.
On 02/18/2015 08:11 AM, Michael Ivanov wrote:
> Sorry I still cannot get the reply working :-(
>
> I do the following:
>
>     _sender messenger is created.
>     I create a subscription to "amqp://127.0.0.1/#" for this messenger and
>     keep the reply address queue:
>
>         s = pn_messenger_subscribe(_sender, "amqp://127.0.0.1/#"));
>        _reply_addr = pn_subscription_address(s);
>
>     At this moment I check the qpidd state with qpid-stat -q and verify
>     that temporary queue is created.
>
>     At some later tome I prepare the message and set a reply address to is
>     as follows:
>
>        pn_message_set_reply_to(message, _reply_addr);
>
>     I verify again with qpid-stat -q, reply queue is still there.
>
>     Now I assign the target address to message and send it:
>
>        pn_messenger_set_timeout(_sender, timeout);
>        pn_messenger_put(_sender, msg);
>        pn_messenger_send(_sender, -1);
>
>     Immediately before pn_messenger_set_timeout I print the value of
>     pn_message_get_reply_to() and it is correct.
>
>     After messenger send I check again with qpid-stat -q and I see that
>     the reply queue is gone! Btw, the message is received and executed
>     by broker correctly (the actual message is queue create request
>     directed to qmf.default.direct, the queue is created properly).
>
>     When I run pn_messenger_recv(_sender, 1) I get an error:
>
>        CONNECTION ERROR (amqp:internal-error) not-found: Exchange not found: \
>           amqp: (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)
>        18-Feb-2015 10:49:45.677 @E Error reading messages: no valid sources

That is an error coming back from the broker and is caused by having 
amqp://127.0.0.1/ prefixed to the address, which the broker doesn't 
recognise.

Can you set the env var PN_TRACE_FRM=1 before running to capture the 
protocol trace? That would help see where exactly the address is being 
used in this form over the wire. It may be that the reply-to address 
needs to have the prefix trimmed off first.

>
>     I guess this is because the reply queue has somehow dissappeared.
>
> What's wrong here?


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: proton reply handling again

Posted by Michael Ivanov <iv...@isle.spb.ru>.
Sorry I still cannot get the reply working :-(

I do the following:

   _sender messenger is created.
   I create a subscription to "amqp://127.0.0.1/#" for this messenger and
   keep the reply address queue:

       s = pn_messenger_subscribe(_sender, "amqp://127.0.0.1/#"));
      _reply_addr = pn_subscription_address(s);

   At this moment I check the qpidd state with qpid-stat -q and verify
   that temporary queue is created.

   At some later tome I prepare the message and set a reply address to is
   as follows:

      pn_message_set_reply_to(message, _reply_addr);

   I verify again with qpid-stat -q, reply queue is still there.

   Now I assign the target address to message and send it:

      pn_messenger_set_timeout(_sender, timeout);
      pn_messenger_put(_sender, msg);
      pn_messenger_send(_sender, -1);

   Immediately before pn_messenger_set_timeout I print the value of
   pn_message_get_reply_to() and it is correct.

   After messenger send I check again with qpid-stat -q and I see that
   the reply queue is gone! Btw, the message is received and executed
   by broker correctly (the actual message is queue create request
   directed to qmf.default.direct, the queue is created properly).

   When I run pn_messenger_recv(_sender, 1) I get an error:

      CONNECTION ERROR (amqp:internal-error) not-found: Exchange not found: \
         amqp: (/Archive/misc/mq/qpid-cpp-0.30/src/qpid/broker/ExchangeRegistry.cpp:144)
      18-Feb-2015 10:49:45.677 @E Error reading messages: no valid sources

   I guess this is because the reply queue has somehow dissappeared.

What's wrong here?

Best regards,

02.02.2015 15:02, Rafael Schloming пишет:
> On Mon, Feb 2, 2015 at 4:46 AM, Michael Ivanov <iv...@logit-ag.de> wrote:
> 
>> Hallo!
>>
>> I am implementing message handling event loop using proton library.
>> I create a pm_messenger, subscribe to several incoming queues and wait
>> for input. I also need to send outgoing messages when handling incoming
>> ones and for some of the outgoing messages I want to get an immediate
>> reply (which has to be received outside of the main event loop). To get
>> a reply I use a temporary queue (created using "#" token). As far as I
>> understand I should not subscribe to this queue in my primary messenger,
>> since I cannot temporary suspend or cancel other subscriptions, which
>> have to be handled in primary event loop. So at the start of a process,
>> I create a separate pn_messenger for immediate replies, subscribe it
>> to the temporary queue and use wherever I need the reply. Can you
>> confirm that this second messenger will not conflict with the primary
>> one, in particular that when I read the reples the input pending for
>> the queues to which the primary messenger is subscribed will not be
>> affected in any way?
>>
> 
> They shouldn't interfere with each other. The only caveat here is that they
> can only perform I/O when you pass control to them, so if you have
> heartbeats enabled, you need to be sure to pass control back to each one
> frequently enough. It's a bit awkward, but you can work around this by
> using a timeout that is smaller than your heartbeat timeout, e.g.:
> 
> // main loop:
> pn_messenger_set_timeout(mainMessenger,
> timeout_that_is_less_than_heartbeat_interval);
> while (...) {
>   pn_messenger_recv(mainMessenger);
>   pn_messenger_work(secondaryMessenger, 0);
>   ...
> }
> 
> 
> Another issue: as I can see neither pm_messenger_recv nor pn_messenger_get
>> hae a timeout option. Do I miss sthing or to get a timeout for message
>> input I need to use a selectables together with poll or select syscall?
>>
> 
> There is a pn_messenger_get/set_timeout that you can use to control the
> default timeout for any blocking operations. Also if you use
> pn_messenger_work(), you can pass in the timeout explicitly.
> 
> --Rafael
> 


-- 
 \   / |			           |
 (OvO) |  Михаил Иванов                    |
 (^^^) |      Тел.:    +7(911) 223-1300    |
  \^/  |      E-mail:  ivans@isle.spb.ru   |
  ^ ^  |                                   |

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: proton reply handling again

Posted by Rafael Schloming <rh...@alum.mit.edu>.
On Mon, Feb 2, 2015 at 4:46 AM, Michael Ivanov <iv...@logit-ag.de> wrote:

> Hallo!
>
> I am implementing message handling event loop using proton library.
> I create a pm_messenger, subscribe to several incoming queues and wait
> for input. I also need to send outgoing messages when handling incoming
> ones and for some of the outgoing messages I want to get an immediate
> reply (which has to be received outside of the main event loop). To get
> a reply I use a temporary queue (created using "#" token). As far as I
> understand I should not subscribe to this queue in my primary messenger,
> since I cannot temporary suspend or cancel other subscriptions, which
> have to be handled in primary event loop. So at the start of a process,
> I create a separate pn_messenger for immediate replies, subscribe it
> to the temporary queue and use wherever I need the reply. Can you
> confirm that this second messenger will not conflict with the primary
> one, in particular that when I read the reples the input pending for
> the queues to which the primary messenger is subscribed will not be
> affected in any way?
>

They shouldn't interfere with each other. The only caveat here is that they
can only perform I/O when you pass control to them, so if you have
heartbeats enabled, you need to be sure to pass control back to each one
frequently enough. It's a bit awkward, but you can work around this by
using a timeout that is smaller than your heartbeat timeout, e.g.:

// main loop:
pn_messenger_set_timeout(mainMessenger,
timeout_that_is_less_than_heartbeat_interval);
while (...) {
  pn_messenger_recv(mainMessenger);
  pn_messenger_work(secondaryMessenger, 0);
  ...
}


Another issue: as I can see neither pm_messenger_recv nor pn_messenger_get
> hae a timeout option. Do I miss sthing or to get a timeout for message
> input I need to use a selectables together with poll or select syscall?
>

There is a pn_messenger_get/set_timeout that you can use to control the
default timeout for any blocking operations. Also if you use
pn_messenger_work(), you can pass in the timeout explicitly.

--Rafael