You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Xiong Zou <zo...@gmail.com> on 2014/01/16 03:04:16 UTC

[Client] warning Session was not closed cleanly

Hi All,

I am experimenting qpid Pub-Sub pattern, referring to
examples/old_api/pub-sub sample codes. My qpid version is 0.20.

Sometimes I received warning message like below sample: when I stop my
binary:
2014-01-16 08:58:33 [Client] warning Session was not closed cleanly:
anonymous.fa805922-8f1b-4e67-9710-a3da31e9bfc8

Please also be informed that in my class destructor, I cannot delete
Listiner. If I un-comment the code //delete pListner, I will receive
following stack trace:

Program terminated with signal 6, Aborted.
#0  0x00000030fe2302c5 in raise () from /lib64/libc.so.6
(gdb) bt
#0  0x00000030fe2302c5 in raise () from /lib64/libc.so.6
#1  0x00000030fe231d70 in abort () from /lib64/libc.so.6
#2  0x00002b6987115bd1 in ~Mutex (this=0x5c4b6f0, __in_chrg=<value optimized
out>) at ../include/qpid/sys/posix/Mutex.h:112
#3  qpid::client::Dispatcher::~Dispatcher (this=0x5c4b6f0, __in_chrg=<value
optimized out>) at ./qpid/client/Dispatcher.h:66
#4  0x00002b69871340f2 in
qpid::client::SubscriptionManagerImpl::~SubscriptionManagerImpl
(this=0x5c4b6b0,
    __in_chrg=<value optimized out>) at
qpid/client/SubscriptionManagerImpl.cpp:51
#5  0x00002b6987131948 in dtor (this=0x5c4ac08, __in_chrg=<value optimized
out>) at ./qpid/RefCounted.h:42
#6  qpid::client::SubscriptionManager::~SubscriptionManager (this=0x5c4ac08,
__in_chrg=<value optimized out>)
    at qpid/client/SubscriptionManager.cpp:35
#7  0x0000000000806895 in IPC::CListener::~CListener (this=0x5c4abf0,
__in_chrg=<value optimized out>)
    at CQPIDPubSubAdaptor.cpp:78

I post my code below, please help to check where I have done something
wrong?

namespace IPC
{
        class CListener : public qpid::client::MessageListener
        {
                private:
                        CMsgQueue&                           m_helpQueue;
                        qpid::client::Session&                  m_session;
                        qpid::client::SubscriptionManager      
m_subscriptions;
                        std::vector< std::string >             
m_vecQueueUIDs;
                public:
                        CListener(qpid::client::Session& session, CMsgQueue&
queue) :
                                        m_helpQueue(queue),
                                        m_session(session),
                                        m_subscriptions(session)
                        {
                        };
                        void prepareQueue(std::string queue, std::string
exchange, std::string routing_key)
                        {
                                /* Create a unique queue name for this
consumer by concatenating
                                 * the queue name parameter with the Session
ID.
                                */

                                queue += m_session.getId().getName();
                                DEBUG << "Declaring queue: " << queue << 
ENDL;

                                /* Declare an exclusive queue on the broker
                                 */

                               
m_session.queueDeclare(qpid::client::arg::queue=queue,
qpid::client::arg::exclusive=true, qpid::client::arg::autoDelete=true);

                               
m_session.exchangeBind(qpid::client::arg::exchange=exchange,
qpid::client::arg::queue=queue, qpid::client::arg::bindingKey=routing_key);

                                /*
                                 * subscribe to the queue using the
subscription manager.
                                 */

                                DEBUG << "Subscribing to queue " << queue <<
ENDL;
                                m_subscriptions.subscribe(*this, queue);

                                m_vecQueueUIDs.push_back(queue);
                        }
                        void received(qpid::client::Message& message)
                        {
                                DEBUG << "Message: " << message.getData() <<
" from " << message.getDestination() << ENDL;
                                char* pData = NULL;
                                unsigned int iLen =
message.getData().length();
                                if(iLen > 0)
                                {
                                        pData         = new char[iLen+1];
                                        strncpy(pData,
message.getData().c_str(), iLen);
                                        pData[iLen] = 0;
                                        m_helpQueue.enqueue(pData, iLen);
                                }
                        }
                        void start(const CIPCConfig& config);
                        void stop();
                        ~CListener() { };
        };
        void CListener::start(const CIPCConfig& config)
        {

                for(unsigned int i = 0; i < config.getNumAddressRcvrs();
i++)
                {
                        DEBUG << "create qpid session susbcribe routing key
" << config.getAddressRcvr(i) << ENDL;
                        prepareQueue(config.getAddressRcvr(i),
IPC::cs_strQPIDExchgAMQTopic, config.getAddressRcvr(i));
                }
                //always listen to Commands for control messages.
                prepareQueue(IPC::cs_strQPIDKeyRouteAdminCmd,
IPC::cs_strQPIDExchgAMQTopic, IPC::cs_strQPIDKeyRouteAdminCmd);

                INFO << "run Listner Thread" << ENDL;
                m_subscriptions.start();
        }
        void CListener::stop()
        {
                //No need to cancel subscriptions with
m_subscriptions.stop().
                //for(unsigned int i; i < m_vecQueueUIDs.size(); i++)
                //{
                //      if(!m_vecQueueUIDs[i].empty())
                //              m_subscriptions.cancel(m_vecQueueUIDs[i]);
                //}
                // stop the spawned thread
                m_subscriptions.stop();
        }

};
IPC::CQPIDPubSubAdaptor::CQPIDPubSubAdaptor()
                :   m_pHelpQueue(NULL),
                    m_pSession(NULL),
                    m_pConListner(NULL),
                    m_bIsValid(false),
                        m_iWaitTime(-1)
{
}

IPC::CQPIDPubSubAdaptor::~CQPIDPubSubAdaptor()
{
        INFO << "start to clear up QPIDPubSubAdaptor" << ENDL;
        if(m_pConListner)
        {
                CListener* pListner = (CListener*)m_pConListner;
                pListner->stop();
                //delete pListner;     //cannot delete listner, will cause
~Mutex() to signal
                m_pConListner = NULL;
        }

        m_connection.close();

        if(m_pSession)
        {
                delete m_pSession;
                m_pSession = NULL;
        }

        if(m_pHelpQueue)
        {
                delete m_pHelpQueue;
                m_pHelpQueue = NULL;
        }
}

bool IPC::CQPIDPubSubAdaptor::init(const CIPCConfig& config)
{
        if(m_bIsValid)
                return true;

        try
        {
                m_connection.open(config.getHost().c_str(),
atoi(config.getPort().c_str()));
                m_pSession = new
qpid::client::Session(m_connection.newSession());

                m_sSessionName = config.getHost() + ":" + config.getPort();
                //might need to create the session with a name for later
usage.
                DEBUG << "create qpid session with " << m_sSessionName <<
ENDL;

                m_bIsValid = true;

                m_iWaitTime = config.getWaitTime();

                //TODO: there will be multiple susbcribe routing key
                m_sAddressSndr = config.getAddressSender();
                DEBUG << "create qpid session publish routing key " <<
m_sAddressSndr << ENDL;

                m_pHelpQueue = new CFFSPSCQueueAdaptor();
                m_pHelpQueue->init(config);

                //start the listner in another thread.
                IPC::CListener* pListener = new IPC::CListener(*m_pSession,
*m_pHelpQueue);
                pListener->start(config);

                m_pConListner = (void*)pListener;

                return true;
        }
        catch(const std::exception& error)
        {
                ERROR << error.what() << ENDL;
        }

        m_bIsValid = false;

        return false;
}
unsigned int IPC::CQPIDPubSubAdaptor::enqueue(char* pData, unsigned int&
iLen)
{
        if(!isValid()
                        || !pData
                        || iLen <= 0)
                return 0;

        DEBUG << "try to send message to sender address " << m_sAddressSndr
<< " with content " << pData << ENDL;

        try
        {
                //Do we need to create a new session each time a message is
sent?
                qpid::client::Session l_session = m_connection.newSession();

                qpid::client::Message l_message;
               
l_message.getDeliveryProperties().setRoutingKey(m_sAddressSndr);
                l_message.setData(pData);

                //amq.topic is the exchange for Pub/Sub pattern, can be
hard-coded for now
               
l_session.messageTransfer(qpid::client::arg::content=l_message,
qpid::client::arg::destination=IPC::cs_strQPIDExchgAMQTopic);

                return iLen;
        }
        catch(const std::exception& error)
        {
        ERROR << error.what() << ENDL;
    }

        return 0;
}

Thanks a lot in advance! Looking forward to your replies.





--
View this message in context: http://qpid.2158936.n2.nabble.com/Client-warning-Session-was-not-closed-cleanly-tp7602834.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: [Client] warning Session was not closed cleanly

Posted by Gordon Sim <gs...@redhat.com>.
On 01/17/2014 08:36 AM, Xiong Zou wrote:
> Hi Gordon,
>
> An initial update on the test result:
> 1. Change m_subscriptions.stop(); to m_subscriptions.wait();

You need them both, keep the stop(), then add wait() after it. The 
stop() tells the thread to stop, the wait() waits for it to actually exit.

>        Thread stuck at m_subscriptions.wait(), no further process.
>
> 2. add m_session.close(); before m_connection.close();
>
>        Invalid argument
>        IT.OR.MD.CS.RMS.HA.Primary: ../include/qpid/sys/posix/Mutex.h:120:
> void qpid::sys::Mutex::unlock(): Assertion `0' failed.
>        Abort (core dumped)

Hmm, that sounds like the session is not valid in some way....

> 3. After I enable to manually call m_subscriptions.cancel() as below, then
> delete pListner; works well now.
>
>                  for(unsigned int i; i < m_vecQueueUIDs.size(); i++)
>                  {
>                          if(!m_vecQueueUIDs[i].empty())
>                                  m_subscriptions.cancel(m_vecQueueUIDs[i]);
>                  }

Ok, great!


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: [Client] warning Session was not closed cleanly

Posted by Xiong Zou <zo...@gmail.com>.
Hi Gordon,

An initial update on the test result:
1. Change m_subscriptions.stop(); to m_subscriptions.wait();
    
      Thread stuck at m_subscriptions.wait(), no further process.

2. add m_session.close(); before m_connection.close();

      Invalid argument
      IT.OR.MD.CS.RMS.HA.Primary: ../include/qpid/sys/posix/Mutex.h:120:
void qpid::sys::Mutex::unlock(): Assertion `0' failed.
      Abort (core dumped)

3. After I enable to manually call m_subscriptions.cancel() as below, then
delete pListner; works well now.

                for(unsigned int i; i < m_vecQueueUIDs.size(); i++)
                {
                        if(!m_vecQueueUIDs[i].empty())
                                m_subscriptions.cancel(m_vecQueueUIDs[i]);
                }

Thanks & regards,
Xiong Zou





--
View this message in context: http://qpid.2158936.n2.nabble.com/Client-warning-Session-was-not-closed-cleanly-tp7602834p7602894.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: [Client] warning Session was not closed cleanly

Posted by Gordon Sim <gs...@redhat.com>.
On 01/16/2014 02:07 PM, Xiong Zou wrote:
> Hi Gordon,
>
> Appreciate your help a lot!
>
> To use the new qpid:messaging APIs, I can just replace qpid::client to
> qpid:messaging in my source codes? I cannot find any pub-sub sample code for
> new qpid:messaging APIs.

No, your code will require changes I'm afraid. The APIs are different. 
However the good news is that I believe you will find the 
qpid::messaging API much simpler.

For pub-sub in particular, there is little special code required. You 
can even use the drain and spout examples. See for example the section 
on 'Topics' in 
http://qpid.apache.org/releases/qpid-0.24/programming/book/section-addresses.html

> As for the existing qpid::client APIs, I will test your suggestions and
> update the result.
>
> And I found some difficulty if I use m_subscriptions.run(). In this case I
> will call m_subscriptions.cancel(m_vecQueueUIDs[i]); from a different
> thread,

You are doing that anyway, since all start() does is tart a thread that 
calls run().

> are these APIs of m_subscriptions thread safe? They seems not when I
> check their source codes.

Yes, they should be threadsafe.

> Or I have to send cancel subscription messages into receiver queues from
> another thread, which is a bit troublesome with my current design.
>
> Thanks & regards,
> Xiong
>
>
>
>
> --
> View this message in context: http://qpid.2158936.n2.nabble.com/Client-warning-Session-was-not-closed-cleanly-tp7602834p7602853.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: [Client] warning Session was not closed cleanly

Posted by Xiong Zou <zo...@gmail.com>.
Hi Gordon,

Appreciate your help a lot!

To use the new qpid:messaging APIs, I can just replace qpid::client to
qpid:messaging in my source codes? I cannot find any pub-sub sample code for
new qpid:messaging APIs.

As for the existing qpid::client APIs, I will test your suggestions and
update the result.

And I found some difficulty if I use m_subscriptions.run(). In this case I
will call m_subscriptions.cancel(m_vecQueueUIDs[i]); from a different
thread, are these APIs of m_subscriptions thread safe? They seems not when I
check their source codes.

Or I have to send cancel subscription messages into receiver queues from
another thread, which is a bit troublesome with my current design.

Thanks & regards,
Xiong




--
View this message in context: http://qpid.2158936.n2.nabble.com/Client-warning-Session-was-not-closed-cleanly-tp7602834p7602853.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: [Client] warning Session was not closed cleanly

Posted by Gordon Sim <gs...@redhat.com>.
On 01/16/2014 02:04 AM, Xiong Zou wrote:
> Hi All,
>
> I am experimenting qpid Pub-Sub pattern, referring to
> examples/old_api/pub-sub sample codes. My qpid version is 0.20.

I would strongly advise you to switch to the qpid::messaging API[1] if 
at all possible. The qpid::client API has been removed in the latest 
versions of Qpid. Further comments on the code inline.

[1] http://qpid.apache.org/releases/qpid-0.24/programming/book/index.html

> Sometimes I received warning message like below sample: when I stop my
> binary:
> 2014-01-16 08:58:33 [Client] warning Session was not closed cleanly:
> anonymous.fa805922-8f1b-4e67-9710-a3da31e9bfc8
[...]
>          void CListener::stop()
>          {
>                  //No need to cancel subscriptions with
> m_subscriptions.stop().
>                  //for(unsigned int i; i < m_vecQueueUIDs.size(); i++)
>                  //{
>                  //      if(!m_vecQueueUIDs[i].empty())
>                  //              m_subscriptions.cancel(m_vecQueueUIDs[i]);
>                  //}
>                  // stop the spawned thread
>                  m_subscriptions.stop();

Add

    m_subscriptions.wait();

to wait for the thread to actually exit here. I would personally advise 
you to start your own thread and call m_subscriptions.run() on it, as 
the start()/stop() method doesn't handle exceptions occurring on 
dispatch well.

>          }
>
> };

[...]

> IPC::CQPIDPubSubAdaptor::~CQPIDPubSubAdaptor()
> {
>          INFO << "start to clear up QPIDPubSubAdaptor" << ENDL;
>          if(m_pConListner)
>          {
>                  CListener* pListner = (CListener*)m_pConListner;
>                  pListner->stop();
>                  //delete pListner;     //cannot delete listner, will cause
> ~Mutex() to signal
>                  m_pConListner = NULL;
>          }
>

Do an explicit m_session.close() here as well. If you want to be extr 
safe to an explicit m_session.sync() before that to ensure all inflight 
operations are complete.

>          m_connection.close();
>
>          if(m_pSession)
>          {
>                  delete m_pSession;
>                  m_pSession = NULL;
>          }
>
>          if(m_pHelpQueue)
>          {
>                  delete m_pHelpQueue;
>                  m_pHelpQueue = NULL;
>          }
> }


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org