You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by _cristian_ <cr...@gmail.com> on 2010/03/03 23:12:47 UTC

SubscriptionManager.stop() never returns

I have a program that reads messages from "local_queue". It works fine. It
gets messages from local_queue and prints them to the screen. This program
starts a subscriptionManager that should be stopped when  receives SIGINT or
SIGTERM, but it never returns from Subscription manager.stop() function. It
is something wrong with the program or it is a bug ? 

If i use:
 subscriptions->start();
 subscriptions->wait() 
 instead of : subscriptions->run(), the program returns 0  - normal exit.
[but i still get the same output in gdb] 
 
Here is the code for the program that reads messages from queue:

#include <iostream>
#include <string>
#include <signal.h>
#include <stdexcept>
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/SubscriptionManager.h>

using namespace qpid::client;
using namespace qpid::framing;

using std::stringstream;
using std::string;

class GatherListener : public MessageListener {
  private:
    AsyncSession asyncSession;
  public:
    GatherListener(Session& session): asyncSession(session) {}

    virtual void received(Message& request) {
        stringstream text;
        text << request.getData();
        std::cout<<text.str()<<std::endl;
    }
};

//subscriptions variable is global because i want to use it in
shutdown_handler() directly
qpid::client::SubscriptionManager *subscriptions=NULL;

void shutdown_handler(int signal) {
    std::cout<<"Before stop()"<<std::endl;
    subscriptions->stop();
    std::cout<<"After stop()"<<std::endl;
}

int main(int argc, char** argv) {
    signal(SIGINT, shutdown_handler);
    signal(SIGTERM, shutdown_handler);

    qpid::client::Connection connection;

    try {
        connection.open("192.168.0.5",5672, "user", "password");
    } catch( const std::exception &e ) {
        std::cout<<e.what()<<std::endl;
    }

    if( connection.isOpen() ) {
        qpid::client::Session session;

        session = connection.newSession();
        session.queueDeclare(qpid::client::arg::queue = "local_queue");
        session.exchangeBind( qpid::client::arg::exchange = "amq.direct",
qpid::client::arg::queue = "local_queue",  qpid::client::arg::bindingKey =
"local_queue");

        subscriptions = new qpid::client::SubscriptionManager(session);
        subscriptions->setAutoStop();

        GatherListener gather_listener(session);
        subscriptions->subscribe(gather_listener, "local_queue");

        std::cout<<"Before run()"<<std::endl;
        subscriptions->run();
        std::cout<<"After run()"<<std::endl;
        connection.close();
        session.close();
    }

    if( subscriptions != NULL ) {
        delete subscriptions;
    }

    return (EXIT_SUCCESS);
}




The gdb output of the program:

Starting program: /root/NetBeansProjects/test/dist/Debug/GNU-Linux-x86/test
[Thread debugging using libthread_db enabled]
[New Thread 0x2b6751957080 (LWP 5853)]
[New Thread 0x429e5940 (LWP 5856)]
Before run()



Program received signal SIGINT, Interrupt.
0x0000003fa200ad09 in pthread_cond_wait@@GLIBC_2.3.2 () from
/lib64/libpthread.so.0
(gdb)
(gdb)
(gdb) threads all apply bt
Undefined command: "threads".  Try "help".
(gdb) thread all apply bt
No symbol "all" in current context.
(gdb) thread apply all bt

Thread 2 (Thread 0x429e5940 (LWP 5856)):
#0  0x0000003fa14d4018 in epoll_wait () from /lib64/libc.so.6
#1  0x00002b6750e05874 in qpid::sys::Poller::wait (this=0x1e8e7360,
timeout=<value optimized out>) at qpid/sys/epoll/EpollPoller.cpp:432
#2  0x00002b6750e06517 in qpid::sys::Poller::run (this=0x1e8e7360) at
qpid/sys/epoll/EpollPoller.cpp:398
#3  0x00002b6750a1207c in qpid::client::TCPConnector::run (this=0x1e8e6f70)
at qpid/client/Connector.cpp:393
#4  0x00002b6750dfcb9a in runRunnable (p=0x9) at
qpid/sys/posix/Thread.cpp:35
#5  0x0000003fa2006617 in start_thread () from /lib64/libpthread.so.0
#6  0x0000003fa14d3c2d in clone () from /lib64/libc.so.6

Thread 1 (Thread 0x2b6751957080 (LWP 5853)):
#0  0x0000003fa200ad09 in pthread_cond_wait@@GLIBC_2.3.2 () from
/lib64/libpthread.so.0
#1  0x00002b6750a23a60 in qpid::client::Dispatcher::run (this=0x1e8e8d90) at
./qpid/sys/posix/Condition.h:63
#2  0x0000000000403b6f in main (argc=1, argv=0x7fff237fa108) at main.cpp:63
(gdb)












-- 
View this message in context: http://n2.nabble.com/SubscriptionManager-stop-never-returns-tp4670874p4670874.html
Sent from the Apache Qpid developers mailing list archive at Nabble.com.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: SubscriptionManager.stop() never returns

Posted by Alan Conway <ac...@redhat.com>.
On 03/03/2010 05:12 PM, _cristian_ wrote:
>
> I have a program that reads messages from "local_queue". It works fine. It
> gets messages from local_queue and prints them to the screen. This program
> starts a subscriptionManager that should be stopped when  receives SIGINT or
> SIGTERM, but it never returns from Subscription manager.stop() function. It
> is something wrong with the program or it is a bug ?

SubscriptionManager::stop() is a blocking function. It signals the 
SubscriptionManager to stop all subscriptions but it also waits for any 
subscription threads that are working on a message to finish. A function that 
can block like this is not safe to call inside a signal handler.

I've attached a modified version of your program that just sets a boolean flag 
in the signal handler and uses  a separate thread that watches the flag and 
calls stop() when it is set. That seems to work fine.