You are viewing a plain text version of this content. The canonical link for it is here.
Posted to proton@qpid.apache.org by xavier <xa...@eaton.com> on 2014/12/05 18:29:00 UTC

Re: CorrelationId

Hi all,

I found the solution (for ActiveMq, with a connector in AMQP, and a
connector with openwire), so I share the solution.
I used the idea of Dominic Evans, so If I want to get a message from a
filter (for example correlationId ), I did this:

pn_messenger_t* msgConsumer= pn_messenger(NULL);
pn_messenger_set_timeout (msgConsumer, 1000);
pn_messenger_set_blocking (msgConsumer, true);
pn_messenger_set_incoming_window (msgConsumer, 1);

pn_messenger_subscribe(msgConsumer,
"amqp://127.0.0.1:5672/queue://myqueue");
pn_link_t* link = pn_messenger_get_link(msgConsumer,
("amqp://127.0.0.1:5672/queue://myqueue").c_str(), false);

pn_terminus_t* terminus = pn_link_source(link);
pn_data_t* data = pn_terminus_filter (terminus);
/* Map creation with selector*/
std::string selector = "jms-selector";
pn_data_put_map(data);
pn_data_enter(data);;
pn_data_put_symbol(data, pn_bytes(selector.size(), selector.c_str()));
// Described of the JMS_SELECTOR line 1262
std::string filter = "JMSCorrelationID='12346789'";
pn_data_put_described(data);
pn_data_enter(data);
pn_data_put_string(data, pn_bytes(6, "string"));
pn_data_put_string(data, pn_bytes(filter.size(), filter.c_str()));
pn_data_exit(data);

pn_messenger_start (msgConsumer);
pn_messenger_recv(msgConsumer, -1);
if (pn_messenger_incoming(msgConsumer))
{
      // The message is arrived
      pn_message_t* message = pn_message();
      pn_messenger_get(msgConsumer, message);
      .....
      .....
      pn_message_free(message);
}

.......

It's works, so thanks to Dominic Evans to give me the begining way.

enjoy











 



--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617208.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

RE: CorrelationId

Posted by xavier <xa...@eaton.com>.
Here the right code
//Doing this one time
pn_messenger_t* msgConsumer= pn_messenger(NULL);
pn_messenger_set_timeout (msgConsumer, 1000);
pn_messenger_set_blocking (msgConsumer, true);
pn_messenger_set_incoming_window (msgConsumer, 1);

// Doing this more time
pn_link_t* link;
pn_messenger_subscribe(msgConsumer,
"amqp://127.0.0.1:5672/queue://myqueue");
link = pn_messenger_get_link(msgConsumer,
("amqp://127.0.0.1:5672/queue://myqueue").c_str(), false);
pn_link_open(link);

pn_terminus_t* terminus = pn_link_source(link);
pn_data_t* data = pn_terminus_filter (terminus);
/* Map creation with selector*/
std::string selector = "jms-selector";
pn_data_put_map(data);
pn_data_enter(data);;
pn_data_put_symbol(data, pn_bytes(selector.size(), selector.c_str()));
// Described of the JMS_SELECTOR line 1262
std::string filter = "JMSCorrelationID='12346789'";
pn_data_put_described(data);
pn_data_enter(data);
pn_data_put_string(data, pn_bytes(6, "string"));
pn_data_put_string(data, pn_bytes(filter.size(), filter.c_str()));
pn_data_exit(data);

pn_messenger_recv(msgConsumer, -1);
if (pn_messenger_incoming(msgConsumer))
{
      // The message is arrived
      pn_message_t* message = pn_message();
      pn_messenger_get(msgConsumer, message);
      .....
      .....
      pn_message_free(message);
}
pn_link_close(link);



--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617333.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

RE: CorrelationId

Posted by xavier <xa...@eaton.com>.
No, because I am on windows side, but for me the pb is

msgConsumer is done ONE time
and I call this function more one time, and each time, the memory grow up ???????????

pn_link_t* link;
pn_messenger_subscribe(msgConsumer, "amqp://127.0.0.1:5672/queue://myqueue");
link = pn_messenger_get_link(msgConsumer, ("amqp://127.0.0.1:5672/queue://myqueue").c_str(), false);
pn_link_open(link);

pn_terminus_t* terminus = pn_link_source(link);
pn_data_t* data = pn_terminus_filter (terminus);
/* Map creation with selector*/
std::string selector = "jms-selector";
pn_data_put_map(data);
pn_data_enter(data);;
pn_data_put_symbol(data, pn_bytes(selector.size(), selector.c_str()));
// Described of the JMS_SELECTOR line 1262
std::string filter = "JMSCorrelationID='12346789'";
pn_data_put_described(data);
pn_data_enter(data);
pn_data_put_string(data, pn_bytes(6, "string"));
pn_data_put_string(data, pn_bytes(filter.size(), filter.c_str()));
pn_data_exit(data);

pn_messenger_recv(msgConsumer, -1);
if (pn_messenger_incoming(msgConsumer))
{
      // The message is arrived
      pn_message_t* message = pn_message();
      pn_messenger_get(msgConsumer, message);
      .....
      .....
      pn_message_free(message);
}
pn_link_close(link);


From: Dominic Evans [via Qpid] [mailto:ml-node+s2158936n7617738h95@n2.nabble.com]
Sent: jeudi 18 décembre 2014 17:44
To: Millieret, Xavier
Subject: RE: CorrelationId

xavier wrote
Perhaps I must free some object, but in debug mode and checking the memory, memory grow up, each time I do pn_messenger_subscribe, pn_messenger_recv, etc.
did you try running your binary via valgrind memcheck?

________________________________
If you reply to this email, your message will be added to the discussion below:
http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617738.html
To unsubscribe from CorrelationId, click here<http://qpid.2158936.n2.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7614606&code=eGF2aWVybWlsbGllcmV0QGVhdG9uLmNvbXw3NjE0NjA2fDIzNDEwNTMzMA==>.
NAML<http://qpid.2158936.n2.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>



________________________________
Eaton Industries (France) S.A.S ~ Siège social: 110 Rue Blaise Pascal, Immeuble Le Viséo - Bâtiment A Innovallée, 38330, Montbonnot-St.-Martin, France ~ Lieu d'enregistrement au registre du commerce: Grenoble ~ Numéro d'enregistrement: 509 653 176 ~ Capital social souscrit et liberé:EUR 16215441 ~ Numéro de TVA: FR47509653176

________________________________






--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617739.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

RE: CorrelationId

Posted by Dominic Evans <do...@uk.ibm.com>.
xavier wrote
> Perhaps I must free some object, but in debug mode and checking the
> memory, memory grow up, each time I do pn_messenger_subscribe,
> pn_messenger_recv, etc.

did you try running your binary via valgrind memcheck?




--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617738.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

RE: CorrelationId

Posted by xavier <xa...@eaton.com>.
Hi,

Any body have an idea about this? The new version of the lib will fix this,
or it's my code?

regards



--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617531.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

RE: CorrelationId

Posted by xavier <xa...@eaton.com>.
Right!!!
I changed the order of my close and open, and it's works. Thanks a lot, I will do any test this afternoon, and post on the thread the final code.

So thanks a lot for your help, perhaps document the PN_TRACE_FRM=1 because it's very helpful.

A another time. Thanks a lot

From: Dominic Evans [via Qpid] [mailto:ml-node+s2158936n7617327h54@n2.nabble.com]
Sent: mercredi 10 décembre 2014 12:07
To: Millieret, Xavier
Subject: Re: CorrelationId

Hi Xavier, so 006FE360 is your messenger that is responsible for subscribing and receiving the messages based upon the selector.
xavier wrote
[006FE360]:  -> SASL
[006FE360]:0 -> @sasl-init(65) [mechanism=:PLAIN, initial-response=b"\x00emc2\x00emc2"]
[006FE360]:  <- SASL
[006FE360]:0 <- @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]]
[006FE360]:0 <- @sasl-outcome(68) [code=0]
[006FE360]:  -> AMQP
[006FE360]:0 -> @open(16) [container-id="6aae955e-cd32-488a-8119-67ec0b715924", hostname="localhost"]
[006FE360]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
[006FE360]:0 -> @attach(18) [name="queue://emc2-ea2a65", handle=0, role=true, snd-settle-mode=1, rcv-settle-mode=0, source=@source(40) [address="queue
://emc2-ea2a65", durable=0, timeout=0, dynamic=false, filter={:"jms-selector"=@"string" "JMSCorrelationID='28a8d7e8-bb82-e41e-1236-5c9f77a057c7'"}], t
arget=@target(41) [address="queue://emc2-ea2a65", durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
[006FE360]:0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0, delivery-count=0, link-credit=1024, drain=fals
e]
[006FE360]:  <- AMQP
[006FE360]:0 <- @open(16) [container-id="", hostname="", max-frame-size=4294967295, channel-max=32767, properties={:"x-opt-anonymous-relay"="$relay"}]

[006FE360]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1, incoming-window=0, outgoing-window=0, handle-max=1024]
[006FE360]:0 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=1, outgoing-window=0]
[006FE360]:0 <- @attach(18) [name="queue://emc2-ea2a65", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="queu
e://emc2-ea2a65", durable=0, expiry-policy=:"session-end", timeout=0, dynamic=false, filter={:"jms-selector"=@"string" "JMSCorrelationID='28a8d7e8-bb8
2-e41e-1236-5c9f77a057c7'"}], target=@target(41) [address="queue://emc2-ea2a65"], incomplete-unsettled=false, initial-delivery-count=0]
[006FE360]:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"", message-format=0, settled=true] (407) "\x00Sp\xc0\x04\x02BP\x04\x00Sr\xc1\x1
7\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0\x7f\x0a\xa12ID:GREFRWHP1006921-61781-1418144200447-1:1:383:1:1@\xa1\x13queue://emc2-ea2a65@@\xa1$28a
8d7e8-bb82-e41e-1236-5c9f77a057c7@@@\x83\x00\x00\x01J3\xc9\xc9\x12\x00Sw\xa1\xe9{"header":{"manufacturer":"com.eaton.pqsoft","uuid":"c1b3e315-a5e4-47b
4-8128-51b0bf6284b7","timeStamp":1418208069,"providerId":"emc4j","flowType":"reply","destination":"TestSimpleBus.add","contentType":"java.lang.Integer
"},"body":18}"
We can see it connect, make the @attach, receive an ACK to that @attach, and then receive its first message via the @transfer. However, after that we no longer see that messenger doing anything? Based upon the earlier discussions, I'd have expected to see an @close for the pn_link_{close,detach} and then a fresh @attach for a link with the new correlation id selector.
________________________________
If you reply to this email, your message will be added to the discussion below:
http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617327.html
To unsubscribe from CorrelationId, click here<http://qpid.2158936.n2.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7614606&code=eGF2aWVybWlsbGllcmV0QGVhdG9uLmNvbXw3NjE0NjA2fDIzNDEwNTMzMA==>.
NAML<http://qpid.2158936.n2.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>



________________________________
Eaton Industries (France) S.A.S ~ Siège social: 110 Rue Blaise Pascal, Immeuble Le Viséo - Bâtiment A Innovallée, 38330, Montbonnot-St.-Martin, France ~ Lieu d'enregistrement au registre du commerce: Grenoble ~ Numéro d'enregistrement: 509 653 176 ~ Capital social souscrit et liberé:EUR 16215441 ~ Numéro de TVA: FR47509653176

________________________________






--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617329.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

RE: CorrelationId

Posted by xavier <xa...@eaton.com>.
Dominic,

I posted my code for the community.
But I progress, and all works, but I saw, with this code, the memory grow up in each time, I did this function:

pn_link_t* link;
pn_messenger_subscribe(msgConsumer, "amqp://127.0.0.1:5672/queue://myqueue");
link = pn_messenger_get_link(msgConsumer, ("amqp://127.0.0.1:5672/queue://myqueue").c_str(), false);
pn_link_open(link);

pn_terminus_t* terminus = pn_link_source(link);
pn_data_t* data = pn_terminus_filter (terminus);
/* Map creation with selector*/
std::string selector = "jms-selector";
pn_data_put_map(data);
pn_data_enter(data);;
pn_data_put_symbol(data, pn_bytes(selector.size(), selector.c_str()));
// Described of the JMS_SELECTOR line 1262
std::string filter = "JMSCorrelationID='12346789'";
pn_data_put_described(data);
pn_data_enter(data);
pn_data_put_string(data, pn_bytes(6, "string"));
pn_data_put_string(data, pn_bytes(filter.size(), filter.c_str()));
pn_data_exit(data);

pn_messenger_recv(msgConsumer, -1);
if (pn_messenger_incoming(msgConsumer))
{
      // The message is arrived
      pn_message_t* message = pn_message();
      pn_messenger_get(msgConsumer, message);
      .....
      .....
      pn_message_free(message);
}
pn_link_close(link);


The goal is simple, why I implemented this, because I implemented the pattern request/reply, and the reply is in function of the correlationid (setted in the  request), so in my application I have the msgConsumer builds one time, and I reuse it each time, that I wait the reply.
Perhaps I must free some object, but in debug mode and checking the memory, memory grow up, each time I do pn_messenger_subscribe, pn_messenger_recv, etc.


Cheers

From: Dominic Evans [via Qpid] [mailto:ml-node+s2158936n7617327h54@n2.nabble.com]
Sent: mercredi 10 décembre 2014 12:07
To: Millieret, Xavier
Subject: Re: CorrelationId

Hi Xavier, so 006FE360 is your messenger that is responsible for subscribing and receiving the messages based upon the selector.
xavier wrote
[006FE360]:  -> SASL
[006FE360]:0 -> @sasl-init(65) [mechanism=:PLAIN, initial-response=b"\x00emc2\x00emc2"]
[006FE360]:  <- SASL
[006FE360]:0 <- @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]]
[006FE360]:0 <- @sasl-outcome(68) [code=0]
[006FE360]:  -> AMQP
[006FE360]:0 -> @open(16) [container-id="6aae955e-cd32-488a-8119-67ec0b715924", hostname="localhost"]
[006FE360]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0]
[006FE360]:0 -> @attach(18) [name="queue://emc2-ea2a65", handle=0, role=true, snd-settle-mode=1, rcv-settle-mode=0, source=@source(40) [address="queue
://emc2-ea2a65", durable=0, timeout=0, dynamic=false, filter={:"jms-selector"=@"string" "JMSCorrelationID='28a8d7e8-bb82-e41e-1236-5c9f77a057c7'"}], t
arget=@target(41) [address="queue://emc2-ea2a65", durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
[006FE360]:0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0, delivery-count=0, link-credit=1024, drain=fals
e]
[006FE360]:  <- AMQP
[006FE360]:0 <- @open(16) [container-id="", hostname="", max-frame-size=4294967295, channel-max=32767, properties={:"x-opt-anonymous-relay"="$relay"}]

[006FE360]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1, incoming-window=0, outgoing-window=0, handle-max=1024]
[006FE360]:0 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=1, outgoing-window=0]
[006FE360]:0 <- @attach(18) [name="queue://emc2-ea2a65", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="queu
e://emc2-ea2a65", durable=0, expiry-policy=:"session-end", timeout=0, dynamic=false, filter={:"jms-selector"=@"string" "JMSCorrelationID='28a8d7e8-bb8
2-e41e-1236-5c9f77a057c7'"}], target=@target(41) [address="queue://emc2-ea2a65"], incomplete-unsettled=false, initial-delivery-count=0]
[006FE360]:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"", message-format=0, settled=true] (407) "\x00Sp\xc0\x04\x02BP\x04\x00Sr\xc1\x1
7\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0\x7f\x0a\xa12ID:GREFRWHP1006921-61781-1418144200447-1:1:383:1:1@\xa1\x13queue://emc2-ea2a65@@\xa1$28a
8d7e8-bb82-e41e-1236-5c9f77a057c7@@@\x83\x00\x00\x01J3\xc9\xc9\x12\x00Sw\xa1\xe9{"header":{"manufacturer":"com.eaton.pqsoft","uuid":"c1b3e315-a5e4-47b
4-8128-51b0bf6284b7","timeStamp":1418208069,"providerId":"emc4j","flowType":"reply","destination":"TestSimpleBus.add","contentType":"java.lang.Integer
"},"body":18}"
We can see it connect, make the @attach, receive an ACK to that @attach, and then receive its first message via the @transfer. However, after that we no longer see that messenger doing anything? Based upon the earlier discussions, I'd have expected to see an @close for the pn_link_{close,detach} and then a fresh @attach for a link with the new correlation id selector.
________________________________
If you reply to this email, your message will be added to the discussion below:
http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617327.html
To unsubscribe from CorrelationId, click here<http://qpid.2158936.n2.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7614606&code=eGF2aWVybWlsbGllcmV0QGVhdG9uLmNvbXw3NjE0NjA2fDIzNDEwNTMzMA==>.
NAML<http://qpid.2158936.n2.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>



________________________________
Eaton Industries (France) S.A.S ~ Siège social: 110 Rue Blaise Pascal, Immeuble Le Viséo - Bâtiment A Innovallée, 38330, Montbonnot-St.-Martin, France ~ Lieu d'enregistrement au registre du commerce: Grenoble ~ Numéro d'enregistrement: 509 653 176 ~ Capital social souscrit et liberé:EUR 16215441 ~ Numéro de TVA: FR47509653176

________________________________






--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617349.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

Re: CorrelationId

Posted by Dominic Evans <do...@uk.ibm.com>.
Hi Xavier, so 006FE360 is your messenger that is responsible for subscribing
and receiving the messages based upon the selector.


xavier wrote
> [006FE360]:  -> SASL
> [006FE360]:0 -> @sasl-init(65) [mechanism=:PLAIN,
> initial-response=b"\x00emc2\x00emc2"]
> [006FE360]:  <- SASL
> [006FE360]:0 <- @sasl-mechanisms(64)
> [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]]
> [006FE360]:0 <- @sasl-outcome(68) [code=0]
> [006FE360]:  -> AMQP
> [006FE360]:0 -> @open(16)
> [container-id="6aae955e-cd32-488a-8119-67ec0b715924",
> hostname="localhost"]
> [006FE360]:0 -> @begin(17) [next-outgoing-id=0,
> incoming-window=2147483647, outgoing-window=0]
> [006FE360]:0 -> @attach(18) [name="queue://emc2-ea2a65", handle=0,
> role=true, snd-settle-mode=1, rcv-settle-mode=0, source=@source(40)
> [address="queue
> ://emc2-ea2a65", durable=0, timeout=0, dynamic=false,
> filter={:"jms-selector"=@"string"
> "JMSCorrelationID='28a8d7e8-bb82-e41e-1236-5c9f77a057c7'"}], t
> arget=@target(41) [address="queue://emc2-ea2a65", durable=0, timeout=0,
> dynamic=false], initial-delivery-count=0]
> [006FE360]:0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0,
> outgoing-window=0, handle=0, delivery-count=0, link-credit=1024,
> drain=fals
> e]
> [006FE360]:  <- AMQP
> [006FE360]:0 <- @open(16) [container-id="", hostname="",
> max-frame-size=4294967295, channel-max=32767,
> properties={:"x-opt-anonymous-relay"="$relay"}]
> 
> [006FE360]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1,
> incoming-window=0, outgoing-window=0, handle-max=1024]
> [006FE360]:0 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647,
> next-outgoing-id=1, outgoing-window=0]
> [006FE360]:0 <- @attach(18) [name="queue://emc2-ea2a65", handle=0,
> role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
> [address="queu
> e://emc2-ea2a65", durable=0, expiry-policy=:"session-end", timeout=0,
> dynamic=false, filter={:"jms-selector"=@"string"
> "JMSCorrelationID='28a8d7e8-bb8
> 2-e41e-1236-5c9f77a057c7'"}], target=@target(41)
> [address="queue://emc2-ea2a65"], incomplete-unsettled=false,
> initial-delivery-count=0]
> [006FE360]:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"",
> message-format=0, settled=true] (407)
> "\x00Sp\xc0\x04\x02BP\x04\x00Sr\xc1\x1
> 7\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0\x7f\x0a\xa12ID:GREFRWHP1006921-61781-1418144200447-1:1:383:1:1@\xa1\x13queue://emc2-ea2a65@@\xa1$28a
> 8d7e8-bb82-e41e-1236-5c9f77a057c7@@@\x83\x00\x00\x01J3\xc9\xc9\x12\x00Sw\xa1\xe9{"header":{"manufacturer":"com.eaton.pqsoft","uuid":"c1b3e315-a5e4-47b
> 4-8128-51b0bf6284b7","timeStamp":1418208069,"providerId":"emc4j","flowType":"reply","destination":"TestSimpleBus.add","contentType":"java.lang.Integer
> "},"body":18}"

We can see it connect, make the @attach, receive an ACK to that @attach, and
then receive its first message via the @transfer. However, after that we no
longer see that messenger doing anything? Based upon the earlier
discussions, I'd have expected to see an @close for the
pn_link_{close,detach} and then a fresh @attach for a link with the new
correlation id selector.



--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617327.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

Re: CorrelationId

Posted by xavier <xa...@eaton.com>.
Hi Dominic,

I set PN_TRACE_FRM=1  and here my trace.


[006FC110]:  -> SASL
[006FC110]:0 -> @sasl-init(65) [mechanism=:PLAIN,
initial-response=b"\x00emc2\x00emc2"]
[006FC110]:  <- SASL
[006FC110]:0 <- @sasl-mechanisms(64)
[sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]]
[006FC110]:0 <- @sasl-outcome(68) [code=0]
[006FC110]:  -> AMQP
[006FC110]:0 -> @open(16)
[container-id="a74c872a-e7df-46a8-9ebd-48c933eabf6f", hostname="localhost"]
[006FC110]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647,
outgoing-window=0]
[006FC110]:0 -> @attach(18) [name="queue://emc2-connexion", handle=0,
role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
[address="qu
eue://emc2-connexion", durable=0, timeout=0, dynamic=false],
target=@target(41) [address="queue://emc2-connexion", durable=0, timeout=0,
dynamic=false
], initial-delivery-count=0]
[006FC110]:0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0,
outgoing-window=0, handle=0, delivery-count=0, link-credit=1024, drain=fals
e]
[006FC110]:  <- AMQP
[006FC110]:0 <- @open(16) [container-id="", hostname="",
max-frame-size=4294967295, channel-max=32767,
properties={:"x-opt-anonymous-relay"="$relay"}]

[006FC110]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1,
incoming-window=0, outgoing-window=0, handle-max=1024]
[006FC110]:0 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647,
next-outgoing-id=1, outgoing-window=0]
[006FC110]:0 <- @attach(18) [name="queue://emc2-connexion", handle=0,
role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
[address="q
ueue://emc2-connexion"], target=@target(41)
[address="queue://emc2-connexion"], incomplete-unsettled=false,
initial-delivery-count=0]
[006FC110]:0 -> @detach(22) [handle=0, closed=true]
[006FC110]:0 -> @close(24) []
[006FC110]:  -> EOS
[006FC110]:  -> EOS
[006FC110]:  -> EOS
[006FC110]:0 <- @detach(22) [handle=0, closed=true]
[006FC110]:  -> EOS
[006FC110]:  -> EOS
[006FC110]:  -> EOS
[006FC110]:0 <- @close(24) []
[006FC110]:  <- EOS
[006FC110]:  -> EOS
[006FC110]:  -> EOS
[006FC110]:  -> EOS
[006FC110]:  -> EOS

[02B82D30]:  -> SASL
[02B82D30]:0 -> @sasl-init(65) [mechanism=:PLAIN,
initial-response=b"\x00emc2\x00emc2"]
[02B82D30]:  <- SASL
[02B82D30]:0 <- @sasl-mechanisms(64)
[sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]]
[02B82D30]:0 <- @sasl-outcome(68) [code=0]
[02B82D30]:  -> AMQP
[02B82D30]:0 -> @open(16)
[container-id="da1205e2-c0d8-4a71-ac6e-903aa6797788", hostname="localhost"]
[02B82D30]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647,
outgoing-window=1]
[02B82D30]:0 -> @attach(18) [name="sender-xxx", handle=0, role=false,
snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
[address="queue://TestS
impleBus", durable=0, timeout=0, dynamic=false], target=@target(41)
[address="queue://TestSimpleBus", durable=0, timeout=0, dynamic=false],
initial-de
livery-count=0]
[02B82D30]:  <- AMQP
[02B82D30]:0 <- @open(16) [container-id="", hostname="",
max-frame-size=4294967295, channel-max=32767,
properties={:"x-opt-anonymous-relay"="$relay"}]

[02B82D30]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1,
incoming-window=0, outgoing-window=0, handle-max=1024]
[02B82D30]:0 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647,
next-outgoing-id=1, outgoing-window=0]
[02B82D30]:0 <- @attach(18) [name="sender-xxx", handle=0, role=true,
snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
[address="queue://TestSi
mpleBus"], target=@target(41) [address="queue://TestSimpleBus"]]
[02B82D30]:0 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647,
next-outgoing-id=1, outgoing-window=0, handle=0, delivery-count=0, link-cre
dit=100]
[02B82D30]:0 -> @transfer(20) [handle=0, delivery-id=0,
delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x00", message-format=0,
settled=true, more=false]
(405)
"\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00\x86\x00\x00\x00\x0d@@\xa1+amqp://localhost:5672/queue://TestSimpl
eBus@\xa1\x0bemc2-ea2a65\xa1$28a8d7e8-bb82-e41e-1236-5c9f77a057c7\xa3\x06string@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x
00@R\x00@\x00Sw\xa1\xef{"header":{"manufacturer":"com.eaton.pqsoft","uuid":"c6e83b5c-cfdb-0f9d-a8c7-c49eb24980e8","timeStamp":1418208069.848,"provider
Id":"emc2-ea2a65","flowType":"request","destination":"TestSimpleBus.add","bodyType":null},"body":{"a":10,"b":8}}"
[006FE360]:  -> SASL
[006FE360]:0 -> @sasl-init(65) [mechanism=:PLAIN,
initial-response=b"\x00emc2\x00emc2"]
[006FE360]:  <- SASL
[006FE360]:0 <- @sasl-mechanisms(64)
[sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]]
[006FE360]:0 <- @sasl-outcome(68) [code=0]
[006FE360]:  -> AMQP
[006FE360]:0 -> @open(16)
[container-id="6aae955e-cd32-488a-8119-67ec0b715924", hostname="localhost"]
[006FE360]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647,
outgoing-window=0]
[006FE360]:0 -> @attach(18) [name="queue://emc2-ea2a65", handle=0,
role=true, snd-settle-mode=1, rcv-settle-mode=0, source=@source(40)
[address="queue
://emc2-ea2a65", durable=0, timeout=0, dynamic=false,
filter={:"jms-selector"=@"string"
"JMSCorrelationID='28a8d7e8-bb82-e41e-1236-5c9f77a057c7'"}], t
arget=@target(41) [address="queue://emc2-ea2a65", durable=0, timeout=0,
dynamic=false], initial-delivery-count=0]
[006FE360]:0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0,
outgoing-window=0, handle=0, delivery-count=0, link-credit=1024, drain=fals
e]
[006FE360]:  <- AMQP
[006FE360]:0 <- @open(16) [container-id="", hostname="",
max-frame-size=4294967295, channel-max=32767,
properties={:"x-opt-anonymous-relay"="$relay"}]

[006FE360]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1,
incoming-window=0, outgoing-window=0, handle-max=1024]
[006FE360]:0 <- @flow(19) [next-incoming-id=0, incoming-window=2147483647,
next-outgoing-id=1, outgoing-window=0]
[006FE360]:0 <- @attach(18) [name="queue://emc2-ea2a65", handle=0,
role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
[address="queu
e://emc2-ea2a65", durable=0, expiry-policy=:"session-end", timeout=0,
dynamic=false, filter={:"jms-selector"=@"string"
"JMSCorrelationID='28a8d7e8-bb8
2-e41e-1236-5c9f77a057c7'"}], target=@target(41)
[address="queue://emc2-ea2a65"], incomplete-unsettled=false,
initial-delivery-count=0]
[006FE360]:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"",
message-format=0, settled=true] (407) "\x00Sp\xc0\x04\x02BP\x04\x00Sr\xc1\x1
7\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0\x7f\x0a\xa12ID:GREFRWHP1006921-61781-1418144200447-1:1:383:1:1@\xa1\x13queue://emc2-ea2a65@@\xa1$28a
8d7e8-bb82-e41e-1236-5c9f77a057c7@@@\x83\x00\x00\x01J3\xc9\xc9\x12\x00Sw\xa1\xe9{"header":{"manufacturer":"com.eaton.pqsoft","uuid":"c1b3e315-a5e4-47b
4-8128-51b0bf6284b7","timeStamp":1418208069,"providerId":"emc4j","flowType":"reply","destination":"TestSimpleBus.add","contentType":"java.lang.Integer
"},"body":18}"

[02B82D30]:0 -> @transfer(20) [handle=0, delivery-id=1,
delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0,
settled=true, more=false]
(405)
"\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00\x86\x00\x00\x00\x0d@@\xa1+amqp://localhost:5672/queue://TestSimpl
eBus@\xa1\x0bemc2-ea2a65\xa1$3d28790d-1a5d-49d5-36d9-e0bd143a97fe\xa3\x06string@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x
00@R\x00@\x00Sw\xa1\xef{"header":{"manufacturer":"com.eaton.pqsoft","uuid":"c3046674-4e25-775f-d043-ecf657cf33ca","timeStamp":1418208081.919,"provider
Id":"emc2-ea2a65","flowType":"request","destination":"TestSimpleBus.add","bodyType":null},"body":{"a":10,"b":8}}"
2014/12/10-11:42:48.581 | AmqpMessageBusTransport | !!!!  ERROR : Timeout
reached on the get synchronous reply, queue: emc2-ea2a65






--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617326.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

Re: CorrelationId

Posted by xavier <xa...@eaton.com>.
Hi Dominic

unfortunately, it 's does not work!!
I try your idea, see my code (I move the pn_link_detach() and pn_link_open()
in all my code, but .....)
Here my code:

// Init one time to use many time
pn_messenger_t* msgConsumer= pn_messenger(NULL);
pn_messenger_set_timeout (msgConsumer, 1000);
pn_messenger_set_blocking (msgConsumer, true);
pn_messenger_set_incoming_window (msgConsumer, 1);

// After in a method
pn_messenger_subscribe(msgConsumer,
"amqp://127.0.0.1:5672/queue://myqueue");
pn_link_t* link = pn_messenger_get_link(msgConsumer,
("amqp://127.0.0.1:5672/queue://myqueue").c_str(), false);

*pn_link_detach(link);*

pn_terminus_t* terminus = pn_link_source(link);
pn_data_t* data = pn_terminus_filter (terminus);
/* Map creation with selector*/
std::string selector = "jms-selector";
pn_data_put_map(data);
pn_data_enter(data);
pn_data_put_symbol(data, pn_bytes(selector.size(), selector.c_str()));
// Described
std::string filter = "JMSCorrelationID='12346789'";
pn_data_put_described(data);
pn_data_enter(data);
pn_data_put_string(data, pn_bytes(6, "string"));
pn_data_put_string(data, pn_bytes(filter.size(), filter.c_str()));
pn_data_exit(data);

*pn_link_open(link);*

pn_messenger_recv(msgConsumer, -1);
if (pn_messenger_incoming(msgConsumer))
{
    // The message is arrived
    pn_message_t* message = pn_message();
    pn_messenger_get(msgConsumer, message);
    .....
    .....
    pn_message_free(message);
}


And unfortunately, I have a time out, the message is on the queue, and like
before (if I do pn_messenger_start  before pn_messenger_recv and
pn_messenger_stop after, it's works!!!


Your help is very important, I believe, I am not very far, but.....

So what do you thinks Dominic???

Thanks a lot








--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617311.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.

Re: CorrelationId

Posted by Dominic Evans <do...@uk.ibm.com>.
Hi Xavier,


xavier wrote
> I posted my solution, it works, but I did some performance test, and it's
> not very good, I found why. In my solution, I start, and stop for any
> receiver the messenger, because if I don't do this, with my code (posted)
> I receive one time, the message with a filter (attached to the link) but
> for the second message, (I change the filter before) I don't receive the
> message.
> 
> I have not the competency on the engine, so I am lost how do this without
> start and stop messenger on each reception

So the behaviour you are seeing is because for the AMQP 1.0 protocol your
selector filter is set on the Link at attachment time, and I don't believe
there is any action that allows you to modify a filter once the link has
been established.

Theoretically, rather than doing a full pn_messenger_stop +
pn_messenger_start, you should be able to just do a pn_link_detach(link); /*
modify the filter */ ; pn_link_open(link); and that might give you
*slightly* better  performance. But I think if you are going to be doing
this on a per-message basis then you would be better of receiving all
messages on the Link and just accepting those that you are interested in via
correl id 



--
View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617293.html
Sent from the Apache Qpid Proton mailing list archive at Nabble.com.