You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Andrew L <kh...@hotmail.com> on 2010/08/12 20:05:11 UTC

Cancelling subscription prevents subsequent subscriptions?

I am trying to use a single SubscriptionManager to create and cancel multiple subscriptions.  After canceling a subscription, future subscriptions created by the SubscriptionManager would never be delivered messages.  I modified the example program to test it and got the same results.  
 
Should this consumer not receive any messages on message_queue2?
 
If I comment out the sub.cancel() and the subsequent manager.run() the application receives messages on both subscriptions successfully.  
 
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>
#include <cstdlib>
#include <iostream>
using namespace qpid::client;
using namespace qpid::framing;

class Listener : public MessageListener{
  private:
    SubscriptionManager& subscriptions;
  public:
    Listener(SubscriptionManager& subscriptions);
    virtual void received(Message& message);
};
Listener::Listener(SubscriptionManager& subs) : subscriptions(subs)
{}
void Listener::received(Message& message) {
  std::cout << "Message: " << message.getData() << std::endl;
}
int main(int argc, char** argv) {
    const char* host = argc>1 ? argv[1] : "127.0.0.1";
    int port = argc>2 ? atoi(argv[2]) : 5672;
    Connection connection;
    try {
      connection.open(host, port);
      Session session =  connection.newSession();
  //--------- Main body of program --------------------------------------------
      SubscriptionManager subscriptions(session);
      // Create a listener and subscribe it to the queue named "message_queue"
      Listener listener(subscriptions), listener2(subscriptions);
      Subscription sub = subscriptions.subscribe(listener,
"message_queue");
      // Receive messages until the subscription is cancelled
      // by Listener::received()
      subscriptions.start();
  //---------------------------------------------------------------------------
        char test;
        std::cout << "input" << std::endl;
        std::cin >> test;
      sub.cancel();
        std::cout << " again " << std::endl;
        std::cin >> test;
        sub = subscriptions.subscribe(listener2, "message_queue2");
      subscriptions.run();
        std::cout << "after start" << std::endl;
        std::cout << " 222" << std::endl;
        std::cin >> test;
      connection.close();
      return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
    }
    return 1;
}
 		 	   		  

Re: Cancelling subscription prevents subsequent subscriptions?

Posted by Gordon Sim <gs...@redhat.com>.
On 08/12/2010 07:05 PM, Andrew L wrote:
>
> I am trying to use a single SubscriptionManager to create and cancel multiple subscriptions.  After canceling a subscription, future subscriptions created by the SubscriptionManager would never be delivered messages.  I modified the example program to test it and got the same results.

The problem is that once stopped, a SubscriptionManager cannot be 
restarted and in fact the normal message dispatch on the session is 
itself not restartable.

With 'auto-stop' on, when you cancel your last subscription, dispatch 
stops.

You could therefore turn auto-stop off (which would require you to make 
an explicit stop() call when ready to shutdown). That way cancelling 
then resubscribing would still work.

Another option would be to use a LocalQueue for the subscriptions (or 
even just the first one), and actively pull messages off as needed.

The best option will likely also depend on the details of your application.

Also, I would advise against using SubscriptionManager::start(). All it 
does is spin off a new thread and call run(), but it doesn't handle 
exceptions during dispatch. I would recommend using run() and if needed 
starting your own dispatch thread for that.

If you are just starting out, it would perhaps be worth looking at the 
new API that has been developed: 
http://qpid.apache.org/books/0.7/Programming-In-Apache-Qpid/html/

This is I think much simpler to use. It is not yet available in released 
form (the 0.6 release is only an early preview), but is now stable on trunk.

>
> Should this consumer not receive any messages on message_queue2?
>
> If I comment out the sub.cancel() and the subsequent manager.run() the application receives messages on both subscriptions successfully.
>
> #include<qpid/client/Connection.h>
> #include<qpid/client/Session.h>
> #include<qpid/client/Message.h>
> #include<qpid/client/MessageListener.h>
> #include<qpid/client/SubscriptionManager.h>
> #include<cstdlib>
> #include<iostream>
> using namespace qpid::client;
> using namespace qpid::framing;
>
> class Listener : public MessageListener{
>    private:
>      SubscriptionManager&  subscriptions;
>    public:
>      Listener(SubscriptionManager&  subscriptions);
>      virtual void received(Message&  message);
> };
> Listener::Listener(SubscriptionManager&  subs) : subscriptions(subs)
> {}
> void Listener::received(Message&  message) {
>    std::cout<<  "Message: "<<  message.getData()<<  std::endl;
> }
> int main(int argc, char** argv) {
>      const char* host = argc>1 ? argv[1] : "127.0.0.1";
>      int port = argc>2 ? atoi(argv[2]) : 5672;
>      Connection connection;
>      try {
>        connection.open(host, port);
>        Session session =  connection.newSession();
>    //--------- Main body of program --------------------------------------------
>        SubscriptionManager subscriptions(session);
>        // Create a listener and subscribe it to the queue named "message_queue"
>        Listener listener(subscriptions), listener2(subscriptions);
>        Subscription sub = subscriptions.subscribe(listener,
> "message_queue");
>        // Receive messages until the subscription is cancelled
>        // by Listener::received()
>        subscriptions.start();
>    //---------------------------------------------------------------------------
>          char test;
>          std::cout<<  "input"<<  std::endl;
>          std::cin>>  test;
>        sub.cancel();
>          std::cout<<  " again "<<  std::endl;
>          std::cin>>  test;
>          sub = subscriptions.subscribe(listener2, "message_queue2");
>        subscriptions.run();
>          std::cout<<  "after start"<<  std::endl;
>          std::cout<<  " 222"<<  std::endl;
>          std::cin>>  test;
>        connection.close();
>        return 0;
>      } catch(const std::exception&  error) {
>          std::cout<<  error.what()<<  std::endl;
>      }
>      return 1;
> }
>   		 	   		


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org