You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Michael Arnold <my...@gmail.com> on 2017/12/08 14:44:13 UTC

How to create a QPID topic exchange that holds messages even then then there is no active receiver

Disconnecting the QPID session that creates a durable topic exchange (and
thus closing the only open receiver) seems to delete the topic exchange and
its contents - any messages not yet retrieved from the exchange are lost
and any subsequent messages sent while no receiver is open, are likewise
lost.

Desired behaviour is that topic exchanges marked as durable should survive
session close(); topic exchanges marked as durable should hold messages for
the ttl even then there is no receiver currently open.

The below code:

Part A: Creates a durable topic exchange and receiver; posts some messages
to the exchange and then successfully retrieves them

Part B: Additional messages are then posted; the receiver (and session) is
closed and further messages posted

Part C: Tries, but fails to retrieve any of the messages send in Part B

Since the exchange and messages are marked as durable, the expectation is
that all the messages posted in part B, can be retrieved in part C.

Using:
qpid-cpp-1.36.0
Fedora 27 (4.13.16)

test.cpp---------------------------------------------------------------------------------
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>

#include <iostream>

using namespace qpid::messaging;
using namespace std;

void SendMessage(int i) {
    Connection QPIDSenderConnection(string("localhost:5672"));
     try {
         QPIDSenderConnection.setOption("username", string(""));
         QPIDSenderConnection.setOption("password", string(""));
         QPIDSenderConnection.setOption("reconnect", true);
         QPIDSenderConnection.setOption("reconnect_limit", 10);
         QPIDSenderConnection.setOption("reconnect_interval", 5);
         QPIDSenderConnection.setOption("heartbeat", 30);
        QPIDSenderConnection.open();

        Session QPIDSenderSession = QPIDSenderConnection.createSession();

        qpid::types::Variant::Map MessageContentMap;
        MessageContentMap["Value"] = i;

        string SenderAddress = "Topic.Subtopic; {create: always,
node:{type: topic, durable: true}, link:{reliability: at-least-once,
durable: true}}";
        qpid::messaging::Sender QPIDSender =
QPIDSenderSession.createSender(SenderAddress);

        qpid::messaging::Message QPIDMessageToSend;
        QPIDMessageToSend.setSubject(string("Subject"));
        qpid::messaging::encode(MessageContentMap, QPIDMessageToSend);
        QPIDMessageToSend.setDurable(false);
        QPIDMessageToSend.setTtl((qpid::messaging::Duration) 30000);
        QPIDMessageToSend.setContentType("amqp/map");
        QPIDSender.send(QPIDMessageToSend);

        QPIDSenderSession.close();
        QPIDSenderConnection.close();

    } catch(const exception& error) {
        cout << "Failed with: " << error.what() << " - quitting." << endl;
    }

}

int main(int argc, char** argv) {

    //Part A: Creates a topic exchange and reciever; posts some messages to
the exchange and then successfully retrieves them
    Connection QPIDConnection(string("localhost:5672"));
     try {
        QPIDConnection.setOption("username", string(""));
        QPIDConnection.setOption("password", string(""));
        QPIDConnection.setOption("reconnect", true);
        QPIDConnection.setOption("reconnect_limit", 10);
        QPIDConnection.setOption("reconnect_interval", 5);
        QPIDConnection.setOption("heartbeat", 30);
         QPIDConnection.open();

         Session QPIDSession = QPIDConnection.createSession();

        string ReceiverAddress = "Topic.Subtopic/Subject; {create: always,
node:{type: topic, durable: true}, link:{reliability: at-least-once,
durable: true}}";
        qpid::messaging::Receiver QPIDReceiver;

        try {
            QPIDReceiver = QPIDSession.createReceiver(ReceiverAddress);
        } catch(const exception& error) {
            cout << "Unable to connect receiver to the exchange: " <<
error.what();
        }

        cout << "Sending Messages..." << endl;
        for(int i = 0; i < 3; i++)
                SendMessage(i);

        cout << "...now retrieving messages" << endl;
        qpid::messaging::Message QPIDMessage;
        for(int i = 0; i < 3; i++){
                if(QPIDReceiver.fetch(QPIDMessage,
qpid::messaging::Duration::SECOND * 10)) {
                    QPIDSession.acknowledge(QPIDMessage);
                    qpid::types::Variant::Map MessageContent;
                            decode(QPIDMessage, MessageContent);
                    std::cout << "Got message with subject: '" <<
QPIDMessage.getSubject() << "' and content value: " <<
MessageContent["Value"] << endl;
                }
                else
                    cout << "Got nothing" << endl;

        };

        //Part B: Additional messages are then posted; the receiver (and
session) is closed and further messages posted
        cout << "Sending some more messages before we close the session..."
<< endl;
        for(int i = 3; i < 6; i++)
                SendMessage(i);

        //Optional - comment out from here to show that the disconnect and
reconnect is significant
        cout << "Now disconnecting the session that created the topic" <<
endl;
        QPIDSession.close();

        cout << "Sending even more messages after the session is closed..."
<< endl;
        for(int i = 6; i < 9; i++)
                SendMessage(i);

        cout << "...now reconnect the session.." << endl;
         QPIDSession = QPIDConnection.createSession();

        cout << "..and send even more messages after the session is
created, but before the receiver is recreated..." << endl;
        for(int i = 9; i < 12; i++)
                SendMessage(i);

        cout << "...now create the receiver.." << endl;
        try {
            QPIDReceiver = QPIDSession.createReceiver(ReceiverAddress);
        } catch(const exception& error) {
            cout << "Unable to reconnect receiver session to the exchange:
" << error.what();
        }
        //Optional - Comment out till here

        //Part C: Tries, but failes to retrieve any of the messages send in
Part B
        cout << "...try to retrieve the 9 messages aleady sent" << endl;
        for(int i = 0; i < 3; i++){
                if(QPIDReceiver.fetch(QPIDMessage,
qpid::messaging::Duration::SECOND * 10)) {
                    QPIDSession.acknowledge(QPIDMessage);
                    qpid::types::Variant::Map MessageContent;
                            decode(QPIDMessage, MessageContent);
                    std::cout << "Got message with subject: '" <<
QPIDMessage.getSubject() << "' and content: " << MessageContent["Value"] <<
endl;
                }
                else
                    cout << "Got nothing" << endl;
        };

    } catch(const exception& error) {
        cerr << "Failed with: " << error.what() << "- quitting." << endl;
    }

    QPIDConnection.close();

    std::cout << "Finished" << std::endl;

}
----------------------------------------------------------------------
Makefile---------------------------------------------------------
CC = g++
CPPFLAGS += -I/usr/include
CFLAGS += -std=c++0x
CFLAGS += -c
CFLAGS += -g
LDFLAGS += -g

LDLIBS += -L/usr/lib64
LDLIBS += -lqpidtypes
LDLIBS += -lqpidmessaging

SOURCES = test.cpp
OBJECTS = $(SOURCES:.cpp=.o)
EXECUTABLE = test

all: $(EXECUTABLE)

$(EXECUTABLE): $(OBJECTS)
    $(CC) $(LDFLAGS) $(OBJECTS) $(LDLIBS) -o $@

.cpp.o:
    $(CC) $(CPPFLAGS) $(CFLAGS) $< -o $@

install:
    @echo "Build complete!"
----------------------------------------------------------------------

Re: How to create a QPID topic exchange that holds messages even then then there is no active receiver

Posted by Michael Arnold <my...@gmail.com>.
Yes, that works for me!

Thank-you so much for your help!

On Mon, Dec 11, 2017 at 9:43 PM, Gordon Sim <gs...@redhat.com> wrote:

> On 09/12/17 02:20, Michael Arnold wrote:
>
>> Thanks for the help - but am still stuck:
>>
>> 1. Added in a link name as follows:
>> string ReceiverAddress = "Topic.Subtopic/Subject; {create: always,
>> node:{type: topic, durable: true}, link:{reliability: at-least-once,
>> durable: true, name: subscription-name}}";
>> Messages are still lost
>>
>>> qpid-config list queue
>>>
>> Does not show 'subscription-name'
>> Also tried setting a link name on the SenderAddress (both the same and
>> different to the receiver name), without success
>> Also tried the following, without success
>> string ReceiverAddress = "Topic.Subtopic/Subject; {create: always,
>> node:{type: topic, durable: true}, link:{reliability: at-least-once,
>> durable: true, name: 'subscription-name', x-declare:{auto-delete:false}}
>> }";
>>
>> 2. Tried to enable AMQP1.0, but always get 'Failed with: Invalid URL' at
>> run-time.  Tried the following:
>>      Connection QPIDConnection(string("localho
>> st:5672?protocol='amqp1.0'"));
>>      Connection
>> QPIDConnection(string("localhost:5672?protocol='amqp1.0'&
>> container_id='ABC123'"));
>>
>
> The options are passed in a a separate map (or string representation of
> map).
>
> the following results in 'Failed with: Invalid option: protocol not
>> recognised'
>>           QPIDConnection.setOption("protocol", string("amqp1.0"));
>>
>> Could you please elaborate further?
>>
>
> The attached test, when built against 1.36, works for me. If I start the
> test program, then kill it and run qpid-stat -q I see
> my-client_subscription-name listed. (If I don't use 1.0, I see
> subscription-name as the queue).
>
> Does this work for you?
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>

Re: How to create a QPID topic exchange that holds messages even then then there is no active receiver

Posted by Gordon Sim <gs...@redhat.com>.
On 09/12/17 02:20, Michael Arnold wrote:
> Thanks for the help - but am still stuck:
> 
> 1. Added in a link name as follows:
> string ReceiverAddress = "Topic.Subtopic/Subject; {create: always,
> node:{type: topic, durable: true}, link:{reliability: at-least-once,
> durable: true, name: subscription-name}}";
> Messages are still lost
>> qpid-config list queue
> Does not show 'subscription-name'
> Also tried setting a link name on the SenderAddress (both the same and
> different to the receiver name), without success
> Also tried the following, without success
> string ReceiverAddress = "Topic.Subtopic/Subject; {create: always,
> node:{type: topic, durable: true}, link:{reliability: at-least-once,
> durable: true, name: 'subscription-name', x-declare:{auto-delete:false}}}";
> 
> 2. Tried to enable AMQP1.0, but always get 'Failed with: Invalid URL' at
> run-time.  Tried the following:
>      Connection QPIDConnection(string("localhost:5672?protocol='amqp1.0'"));
>      Connection
> QPIDConnection(string("localhost:5672?protocol='amqp1.0'&container_id='ABC123'"));

The options are passed in a a separate map (or string representation of 
map).

> the following results in 'Failed with: Invalid option: protocol not
> recognised'
>           QPIDConnection.setOption("protocol", string("amqp1.0"));
> 
> Could you please elaborate further?

The attached test, when built against 1.36, works for me. If I start the 
test program, then kill it and run qpid-stat -q I see
my-client_subscription-name listed. (If I don't use 1.0, I see 
subscription-name as the queue).

Does this work for you?

Re: How to create a QPID topic exchange that holds messages even then then there is no active receiver

Posted by Michael Arnold <my...@gmail.com>.
Thanks for the help - but am still stuck:

1. Added in a link name as follows:
string ReceiverAddress = "Topic.Subtopic/Subject; {create: always,
node:{type: topic, durable: true}, link:{reliability: at-least-once,
durable: true, name: subscription-name}}";
Messages are still lost
> qpid-config list queue
Does not show 'subscription-name'
Also tried setting a link name on the SenderAddress (both the same and
different to the receiver name), without success
Also tried the following, without success
string ReceiverAddress = "Topic.Subtopic/Subject; {create: always,
node:{type: topic, durable: true}, link:{reliability: at-least-once,
durable: true, name: 'subscription-name', x-declare:{auto-delete:false}}}";

2. Tried to enable AMQP1.0, but always get 'Failed with: Invalid URL' at
run-time.  Tried the following:
    Connection QPIDConnection(string("localhost:5672?protocol='amqp1.0'"));
    Connection
QPIDConnection(string("localhost:5672?protocol='amqp1.0'&container_id='ABC123'"));
the following results in 'Failed with: Invalid option: protocol not
recognised'
         QPIDConnection.setOption("protocol", string("amqp1.0"));

Could you please elaborate further?


On Fri, Dec 8, 2017 at 11:31 PM, Gordon Sim <gs...@redhat.com> wrote:

> On 08/12/17 14:44, Michael Arnold wrote:
>
>> Disconnecting the QPID session that creates a durable topic exchange (and
>> thus closing the only open receiver) seems to delete the topic exchange
>> and
>> its contents - any messages not yet retrieved from the exchange are lost
>> and any subsequent messages sent while no receiver is open, are likewise
>> lost.
>>
>> Desired behaviour is that topic exchanges marked as durable should survive
>> session close(); topic exchanges marked as durable should hold messages
>> for
>> the ttl even then there is no receiver currently open.
>>
>
> Exchanges don't store messages, queues (or subscriptions) do. To reattach
> to a durable subscriber, you need to provide a link name in the address
> (and if using AMQP 1.0 also a container_id in the connection options) in
> order to identify the subscription.
>
>
> The below code:
>>
>> Part A: Creates a durable topic exchange and receiver; posts some messages
>> to the exchange and then successfully retrieves them
>>
>> Part B: Additional messages are then posted; the receiver (and session) is
>> closed and further messages posted
>>
>> Part C: Tries, but fails to retrieve any of the messages send in Part B
>>
>> Since the exchange and messages are marked as durable, the expectation is
>> that all the messages posted in part B, can be retrieved in part C.
>>
>> Using:
>> qpid-cpp-1.36.0
>> Fedora 27 (4.13.16)
>>
>> test.cpp----------------------------------------------------
>> -----------------------------
>> #include <qpid/messaging/Connection.h>
>> #include <qpid/messaging/Message.h>
>> #include <qpid/messaging/Receiver.h>
>> #include <qpid/messaging/Sender.h>
>> #include <qpid/messaging/Session.h>
>>
>> #include <iostream>
>>
>> using namespace qpid::messaging;
>> using namespace std;
>>
>> void SendMessage(int i) {
>>      Connection QPIDSenderConnection(string("localhost:5672"));
>>       try {
>>           QPIDSenderConnection.setOption("username", string(""));
>>           QPIDSenderConnection.setOption("password", string(""));
>>           QPIDSenderConnection.setOption("reconnect", true);
>>           QPIDSenderConnection.setOption("reconnect_limit", 10);
>>           QPIDSenderConnection.setOption("reconnect_interval", 5);
>>           QPIDSenderConnection.setOption("heartbeat", 30);
>>          QPIDSenderConnection.open();
>>
>>          Session QPIDSenderSession = QPIDSenderConnection.createSes
>> sion();
>>
>>          qpid::types::Variant::Map MessageContentMap;
>>          MessageContentMap["Value"] = i;
>>
>>          string SenderAddress = "Topic.Subtopic; {create: always,
>> node:{type: topic, durable: true}, link:{reliability: at-least-once,
>> durable: true}}";
>>          qpid::messaging::Sender QPIDSender =
>> QPIDSenderSession.createSender(SenderAddress);
>>
>>          qpid::messaging::Message QPIDMessageToSend;
>>          QPIDMessageToSend.setSubject(string("Subject"));
>>          qpid::messaging::encode(MessageContentMap, QPIDMessageToSend);
>>          QPIDMessageToSend.setDurable(false);
>>          QPIDMessageToSend.setTtl((qpid::messaging::Duration) 30000);
>>          QPIDMessageToSend.setContentType("amqp/map");
>>          QPIDSender.send(QPIDMessageToSend);
>>
>>          QPIDSenderSession.close();
>>          QPIDSenderConnection.close();
>>
>>      } catch(const exception& error) {
>>          cout << "Failed with: " << error.what() << " - quitting." <<
>> endl;
>>      }
>>
>> }
>>
>> int main(int argc, char** argv) {
>>
>>      //Part A: Creates a topic exchange and reciever; posts some messages
>> to
>> the exchange and then successfully retrieves them
>>      Connection QPIDConnection(string("localhost:5672"));
>>       try {
>>          QPIDConnection.setOption("username", string(""));
>>          QPIDConnection.setOption("password", string(""));
>>          QPIDConnection.setOption("reconnect", true);
>>          QPIDConnection.setOption("reconnect_limit", 10);
>>          QPIDConnection.setOption("reconnect_interval", 5);
>>          QPIDConnection.setOption("heartbeat", 30);
>>           QPIDConnection.open();
>>
>>           Session QPIDSession = QPIDConnection.createSession();
>>
>>          string ReceiverAddress = "Topic.Subtopic/Subject; {create:
>> always,
>> node:{type: topic, durable: true}, link:{reliability: at-least-once,
>> durable: true}}";
>>          qpid::messaging::Receiver QPIDReceiver;
>>
>>          try {
>>              QPIDReceiver = QPIDSession.createReceiver(ReceiverAddress);
>>          } catch(const exception& error) {
>>              cout << "Unable to connect receiver to the exchange: " <<
>> error.what();
>>          }
>>
>>          cout << "Sending Messages..." << endl;
>>          for(int i = 0; i < 3; i++)
>>                  SendMessage(i);
>>
>>          cout << "...now retrieving messages" << endl;
>>          qpid::messaging::Message QPIDMessage;
>>          for(int i = 0; i < 3; i++){
>>                  if(QPIDReceiver.fetch(QPIDMessage,
>> qpid::messaging::Duration::SECOND * 10)) {
>>                      QPIDSession.acknowledge(QPIDMessage);
>>                      qpid::types::Variant::Map MessageContent;
>>                              decode(QPIDMessage, MessageContent);
>>                      std::cout << "Got message with subject: '" <<
>> QPIDMessage.getSubject() << "' and content value: " <<
>> MessageContent["Value"] << endl;
>>                  }
>>                  else
>>                      cout << "Got nothing" << endl;
>>
>>          };
>>
>>          //Part B: Additional messages are then posted; the receiver (and
>> session) is closed and further messages posted
>>          cout << "Sending some more messages before we close the
>> session..."
>> << endl;
>>          for(int i = 3; i < 6; i++)
>>                  SendMessage(i);
>>
>>          //Optional - comment out from here to show that the disconnect
>> and
>> reconnect is significant
>>          cout << "Now disconnecting the session that created the topic" <<
>> endl;
>>          QPIDSession.close();
>>
>>          cout << "Sending even more messages after the session is
>> closed..."
>> << endl;
>>          for(int i = 6; i < 9; i++)
>>                  SendMessage(i);
>>
>>          cout << "...now reconnect the session.." << endl;
>>           QPIDSession = QPIDConnection.createSession();
>>
>>          cout << "..and send even more messages after the session is
>> created, but before the receiver is recreated..." << endl;
>>          for(int i = 9; i < 12; i++)
>>                  SendMessage(i);
>>
>>          cout << "...now create the receiver.." << endl;
>>          try {
>>              QPIDReceiver = QPIDSession.createReceiver(ReceiverAddress);
>>          } catch(const exception& error) {
>>              cout << "Unable to reconnect receiver session to the
>> exchange:
>> " << error.what();
>>          }
>>          //Optional - Comment out till here
>>
>>          //Part C: Tries, but failes to retrieve any of the messages send
>> in
>> Part B
>>          cout << "...try to retrieve the 9 messages aleady sent" << endl;
>>          for(int i = 0; i < 3; i++){
>>                  if(QPIDReceiver.fetch(QPIDMessage,
>> qpid::messaging::Duration::SECOND * 10)) {
>>                      QPIDSession.acknowledge(QPIDMessage);
>>                      qpid::types::Variant::Map MessageContent;
>>                              decode(QPIDMessage, MessageContent);
>>                      std::cout << "Got message with subject: '" <<
>> QPIDMessage.getSubject() << "' and content: " << MessageContent["Value"]
>> <<
>> endl;
>>                  }
>>                  else
>>                      cout << "Got nothing" << endl;
>>          };
>>
>>      } catch(const exception& error) {
>>          cerr << "Failed with: " << error.what() << "- quitting." << endl;
>>      }
>>
>>      QPIDConnection.close();
>>
>>      std::cout << "Finished" << std::endl;
>>
>> }
>> ----------------------------------------------------------------------
>> Makefile---------------------------------------------------------
>> CC = g++
>> CPPFLAGS += -I/usr/include
>> CFLAGS += -std=c++0x
>> CFLAGS += -c
>> CFLAGS += -g
>> LDFLAGS += -g
>>
>> LDLIBS += -L/usr/lib64
>> LDLIBS += -lqpidtypes
>> LDLIBS += -lqpidmessaging
>>
>> SOURCES = test.cpp
>> OBJECTS = $(SOURCES:.cpp=.o)
>> EXECUTABLE = test
>>
>> all: $(EXECUTABLE)
>>
>> $(EXECUTABLE): $(OBJECTS)
>>      $(CC) $(LDFLAGS) $(OBJECTS) $(LDLIBS) -o $@
>>
>> .cpp.o:
>>      $(CC) $(CPPFLAGS) $(CFLAGS) $< -o $@
>>
>> install:
>>      @echo "Build complete!"
>> ----------------------------------------------------------------------
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

Re: How to create a QPID topic exchange that holds messages even then then there is no active receiver

Posted by Gordon Sim <gs...@redhat.com>.
On 08/12/17 14:44, Michael Arnold wrote:
> Disconnecting the QPID session that creates a durable topic exchange (and
> thus closing the only open receiver) seems to delete the topic exchange and
> its contents - any messages not yet retrieved from the exchange are lost
> and any subsequent messages sent while no receiver is open, are likewise
> lost.
> 
> Desired behaviour is that topic exchanges marked as durable should survive
> session close(); topic exchanges marked as durable should hold messages for
> the ttl even then there is no receiver currently open.

Exchanges don't store messages, queues (or subscriptions) do. To 
reattach to a durable subscriber, you need to provide a link name in the 
address (and if using AMQP 1.0 also a container_id in the connection 
options) in order to identify the subscription.

> The below code:
> 
> Part A: Creates a durable topic exchange and receiver; posts some messages
> to the exchange and then successfully retrieves them
> 
> Part B: Additional messages are then posted; the receiver (and session) is
> closed and further messages posted
> 
> Part C: Tries, but fails to retrieve any of the messages send in Part B
> 
> Since the exchange and messages are marked as durable, the expectation is
> that all the messages posted in part B, can be retrieved in part C.
> 
> Using:
> qpid-cpp-1.36.0
> Fedora 27 (4.13.16)
> 
> test.cpp---------------------------------------------------------------------------------
> #include <qpid/messaging/Connection.h>
> #include <qpid/messaging/Message.h>
> #include <qpid/messaging/Receiver.h>
> #include <qpid/messaging/Sender.h>
> #include <qpid/messaging/Session.h>
> 
> #include <iostream>
> 
> using namespace qpid::messaging;
> using namespace std;
> 
> void SendMessage(int i) {
>      Connection QPIDSenderConnection(string("localhost:5672"));
>       try {
>           QPIDSenderConnection.setOption("username", string(""));
>           QPIDSenderConnection.setOption("password", string(""));
>           QPIDSenderConnection.setOption("reconnect", true);
>           QPIDSenderConnection.setOption("reconnect_limit", 10);
>           QPIDSenderConnection.setOption("reconnect_interval", 5);
>           QPIDSenderConnection.setOption("heartbeat", 30);
>          QPIDSenderConnection.open();
> 
>          Session QPIDSenderSession = QPIDSenderConnection.createSession();
> 
>          qpid::types::Variant::Map MessageContentMap;
>          MessageContentMap["Value"] = i;
> 
>          string SenderAddress = "Topic.Subtopic; {create: always,
> node:{type: topic, durable: true}, link:{reliability: at-least-once,
> durable: true}}";
>          qpid::messaging::Sender QPIDSender =
> QPIDSenderSession.createSender(SenderAddress);
> 
>          qpid::messaging::Message QPIDMessageToSend;
>          QPIDMessageToSend.setSubject(string("Subject"));
>          qpid::messaging::encode(MessageContentMap, QPIDMessageToSend);
>          QPIDMessageToSend.setDurable(false);
>          QPIDMessageToSend.setTtl((qpid::messaging::Duration) 30000);
>          QPIDMessageToSend.setContentType("amqp/map");
>          QPIDSender.send(QPIDMessageToSend);
> 
>          QPIDSenderSession.close();
>          QPIDSenderConnection.close();
> 
>      } catch(const exception& error) {
>          cout << "Failed with: " << error.what() << " - quitting." << endl;
>      }
> 
> }
> 
> int main(int argc, char** argv) {
> 
>      //Part A: Creates a topic exchange and reciever; posts some messages to
> the exchange and then successfully retrieves them
>      Connection QPIDConnection(string("localhost:5672"));
>       try {
>          QPIDConnection.setOption("username", string(""));
>          QPIDConnection.setOption("password", string(""));
>          QPIDConnection.setOption("reconnect", true);
>          QPIDConnection.setOption("reconnect_limit", 10);
>          QPIDConnection.setOption("reconnect_interval", 5);
>          QPIDConnection.setOption("heartbeat", 30);
>           QPIDConnection.open();
> 
>           Session QPIDSession = QPIDConnection.createSession();
> 
>          string ReceiverAddress = "Topic.Subtopic/Subject; {create: always,
> node:{type: topic, durable: true}, link:{reliability: at-least-once,
> durable: true}}";
>          qpid::messaging::Receiver QPIDReceiver;
> 
>          try {
>              QPIDReceiver = QPIDSession.createReceiver(ReceiverAddress);
>          } catch(const exception& error) {
>              cout << "Unable to connect receiver to the exchange: " <<
> error.what();
>          }
> 
>          cout << "Sending Messages..." << endl;
>          for(int i = 0; i < 3; i++)
>                  SendMessage(i);
> 
>          cout << "...now retrieving messages" << endl;
>          qpid::messaging::Message QPIDMessage;
>          for(int i = 0; i < 3; i++){
>                  if(QPIDReceiver.fetch(QPIDMessage,
> qpid::messaging::Duration::SECOND * 10)) {
>                      QPIDSession.acknowledge(QPIDMessage);
>                      qpid::types::Variant::Map MessageContent;
>                              decode(QPIDMessage, MessageContent);
>                      std::cout << "Got message with subject: '" <<
> QPIDMessage.getSubject() << "' and content value: " <<
> MessageContent["Value"] << endl;
>                  }
>                  else
>                      cout << "Got nothing" << endl;
> 
>          };
> 
>          //Part B: Additional messages are then posted; the receiver (and
> session) is closed and further messages posted
>          cout << "Sending some more messages before we close the session..."
> << endl;
>          for(int i = 3; i < 6; i++)
>                  SendMessage(i);
> 
>          //Optional - comment out from here to show that the disconnect and
> reconnect is significant
>          cout << "Now disconnecting the session that created the topic" <<
> endl;
>          QPIDSession.close();
> 
>          cout << "Sending even more messages after the session is closed..."
> << endl;
>          for(int i = 6; i < 9; i++)
>                  SendMessage(i);
> 
>          cout << "...now reconnect the session.." << endl;
>           QPIDSession = QPIDConnection.createSession();
> 
>          cout << "..and send even more messages after the session is
> created, but before the receiver is recreated..." << endl;
>          for(int i = 9; i < 12; i++)
>                  SendMessage(i);
> 
>          cout << "...now create the receiver.." << endl;
>          try {
>              QPIDReceiver = QPIDSession.createReceiver(ReceiverAddress);
>          } catch(const exception& error) {
>              cout << "Unable to reconnect receiver session to the exchange:
> " << error.what();
>          }
>          //Optional - Comment out till here
> 
>          //Part C: Tries, but failes to retrieve any of the messages send in
> Part B
>          cout << "...try to retrieve the 9 messages aleady sent" << endl;
>          for(int i = 0; i < 3; i++){
>                  if(QPIDReceiver.fetch(QPIDMessage,
> qpid::messaging::Duration::SECOND * 10)) {
>                      QPIDSession.acknowledge(QPIDMessage);
>                      qpid::types::Variant::Map MessageContent;
>                              decode(QPIDMessage, MessageContent);
>                      std::cout << "Got message with subject: '" <<
> QPIDMessage.getSubject() << "' and content: " << MessageContent["Value"] <<
> endl;
>                  }
>                  else
>                      cout << "Got nothing" << endl;
>          };
> 
>      } catch(const exception& error) {
>          cerr << "Failed with: " << error.what() << "- quitting." << endl;
>      }
> 
>      QPIDConnection.close();
> 
>      std::cout << "Finished" << std::endl;
> 
> }
> ----------------------------------------------------------------------
> Makefile---------------------------------------------------------
> CC = g++
> CPPFLAGS += -I/usr/include
> CFLAGS += -std=c++0x
> CFLAGS += -c
> CFLAGS += -g
> LDFLAGS += -g
> 
> LDLIBS += -L/usr/lib64
> LDLIBS += -lqpidtypes
> LDLIBS += -lqpidmessaging
> 
> SOURCES = test.cpp
> OBJECTS = $(SOURCES:.cpp=.o)
> EXECUTABLE = test
> 
> all: $(EXECUTABLE)
> 
> $(EXECUTABLE): $(OBJECTS)
>      $(CC) $(LDFLAGS) $(OBJECTS) $(LDLIBS) -o $@
> 
> .cpp.o:
>      $(CC) $(CPPFLAGS) $(CFLAGS) $< -o $@
> 
> install:
>      @echo "Build complete!"
> ----------------------------------------------------------------------
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org