You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Xiong Zou <zo...@gmail.com> on 2014/01/16 03:04:16 UTC
[Client] warning Session was not closed cleanly
Hi All,
I am experimenting qpid Pub-Sub pattern, referring to
examples/old_api/pub-sub sample codes. My qpid version is 0.20.
Sometimes I received warning message like below sample: when I stop my
binary:
2014-01-16 08:58:33 [Client] warning Session was not closed cleanly:
anonymous.fa805922-8f1b-4e67-9710-a3da31e9bfc8
Please also be informed that in my class destructor, I cannot delete
Listiner. If I un-comment the code //delete pListner, I will receive
following stack trace:
Program terminated with signal 6, Aborted.
#0 0x00000030fe2302c5 in raise () from /lib64/libc.so.6
(gdb) bt
#0 0x00000030fe2302c5 in raise () from /lib64/libc.so.6
#1 0x00000030fe231d70 in abort () from /lib64/libc.so.6
#2 0x00002b6987115bd1 in ~Mutex (this=0x5c4b6f0, __in_chrg=<value optimized
out>) at ../include/qpid/sys/posix/Mutex.h:112
#3 qpid::client::Dispatcher::~Dispatcher (this=0x5c4b6f0, __in_chrg=<value
optimized out>) at ./qpid/client/Dispatcher.h:66
#4 0x00002b69871340f2 in
qpid::client::SubscriptionManagerImpl::~SubscriptionManagerImpl
(this=0x5c4b6b0,
__in_chrg=<value optimized out>) at
qpid/client/SubscriptionManagerImpl.cpp:51
#5 0x00002b6987131948 in dtor (this=0x5c4ac08, __in_chrg=<value optimized
out>) at ./qpid/RefCounted.h:42
#6 qpid::client::SubscriptionManager::~SubscriptionManager (this=0x5c4ac08,
__in_chrg=<value optimized out>)
at qpid/client/SubscriptionManager.cpp:35
#7 0x0000000000806895 in IPC::CListener::~CListener (this=0x5c4abf0,
__in_chrg=<value optimized out>)
at CQPIDPubSubAdaptor.cpp:78
I post my code below, please help to check where I have done something
wrong?
namespace IPC
{
class CListener : public qpid::client::MessageListener
{
private:
CMsgQueue& m_helpQueue;
qpid::client::Session& m_session;
qpid::client::SubscriptionManager
m_subscriptions;
std::vector< std::string >
m_vecQueueUIDs;
public:
CListener(qpid::client::Session& session, CMsgQueue&
queue) :
m_helpQueue(queue),
m_session(session),
m_subscriptions(session)
{
};
void prepareQueue(std::string queue, std::string
exchange, std::string routing_key)
{
/* Create a unique queue name for this
consumer by concatenating
* the queue name parameter with the Session
ID.
*/
queue += m_session.getId().getName();
DEBUG << "Declaring queue: " << queue <<
ENDL;
/* Declare an exclusive queue on the broker
*/
m_session.queueDeclare(qpid::client::arg::queue=queue,
qpid::client::arg::exclusive=true, qpid::client::arg::autoDelete=true);
m_session.exchangeBind(qpid::client::arg::exchange=exchange,
qpid::client::arg::queue=queue, qpid::client::arg::bindingKey=routing_key);
/*
* subscribe to the queue using the
subscription manager.
*/
DEBUG << "Subscribing to queue " << queue <<
ENDL;
m_subscriptions.subscribe(*this, queue);
m_vecQueueUIDs.push_back(queue);
}
void received(qpid::client::Message& message)
{
DEBUG << "Message: " << message.getData() <<
" from " << message.getDestination() << ENDL;
char* pData = NULL;
unsigned int iLen =
message.getData().length();
if(iLen > 0)
{
pData = new char[iLen+1];
strncpy(pData,
message.getData().c_str(), iLen);
pData[iLen] = 0;
m_helpQueue.enqueue(pData, iLen);
}
}
void start(const CIPCConfig& config);
void stop();
~CListener() { };
};
void CListener::start(const CIPCConfig& config)
{
for(unsigned int i = 0; i < config.getNumAddressRcvrs();
i++)
{
DEBUG << "create qpid session susbcribe routing key
" << config.getAddressRcvr(i) << ENDL;
prepareQueue(config.getAddressRcvr(i),
IPC::cs_strQPIDExchgAMQTopic, config.getAddressRcvr(i));
}
//always listen to Commands for control messages.
prepareQueue(IPC::cs_strQPIDKeyRouteAdminCmd,
IPC::cs_strQPIDExchgAMQTopic, IPC::cs_strQPIDKeyRouteAdminCmd);
INFO << "run Listner Thread" << ENDL;
m_subscriptions.start();
}
void CListener::stop()
{
//No need to cancel subscriptions with
m_subscriptions.stop().
//for(unsigned int i; i < m_vecQueueUIDs.size(); i++)
//{
// if(!m_vecQueueUIDs[i].empty())
// m_subscriptions.cancel(m_vecQueueUIDs[i]);
//}
// stop the spawned thread
m_subscriptions.stop();
}
};
IPC::CQPIDPubSubAdaptor::CQPIDPubSubAdaptor()
: m_pHelpQueue(NULL),
m_pSession(NULL),
m_pConListner(NULL),
m_bIsValid(false),
m_iWaitTime(-1)
{
}
IPC::CQPIDPubSubAdaptor::~CQPIDPubSubAdaptor()
{
INFO << "start to clear up QPIDPubSubAdaptor" << ENDL;
if(m_pConListner)
{
CListener* pListner = (CListener*)m_pConListner;
pListner->stop();
//delete pListner; //cannot delete listner, will cause
~Mutex() to signal
m_pConListner = NULL;
}
m_connection.close();
if(m_pSession)
{
delete m_pSession;
m_pSession = NULL;
}
if(m_pHelpQueue)
{
delete m_pHelpQueue;
m_pHelpQueue = NULL;
}
}
bool IPC::CQPIDPubSubAdaptor::init(const CIPCConfig& config)
{
if(m_bIsValid)
return true;
try
{
m_connection.open(config.getHost().c_str(),
atoi(config.getPort().c_str()));
m_pSession = new
qpid::client::Session(m_connection.newSession());
m_sSessionName = config.getHost() + ":" + config.getPort();
//might need to create the session with a name for later
usage.
DEBUG << "create qpid session with " << m_sSessionName <<
ENDL;
m_bIsValid = true;
m_iWaitTime = config.getWaitTime();
//TODO: there will be multiple susbcribe routing key
m_sAddressSndr = config.getAddressSender();
DEBUG << "create qpid session publish routing key " <<
m_sAddressSndr << ENDL;
m_pHelpQueue = new CFFSPSCQueueAdaptor();
m_pHelpQueue->init(config);
//start the listner in another thread.
IPC::CListener* pListener = new IPC::CListener(*m_pSession,
*m_pHelpQueue);
pListener->start(config);
m_pConListner = (void*)pListener;
return true;
}
catch(const std::exception& error)
{
ERROR << error.what() << ENDL;
}
m_bIsValid = false;
return false;
}
unsigned int IPC::CQPIDPubSubAdaptor::enqueue(char* pData, unsigned int&
iLen)
{
if(!isValid()
|| !pData
|| iLen <= 0)
return 0;
DEBUG << "try to send message to sender address " << m_sAddressSndr
<< " with content " << pData << ENDL;
try
{
//Do we need to create a new session each time a message is
sent?
qpid::client::Session l_session = m_connection.newSession();
qpid::client::Message l_message;
l_message.getDeliveryProperties().setRoutingKey(m_sAddressSndr);
l_message.setData(pData);
//amq.topic is the exchange for Pub/Sub pattern, can be
hard-coded for now
l_session.messageTransfer(qpid::client::arg::content=l_message,
qpid::client::arg::destination=IPC::cs_strQPIDExchgAMQTopic);
return iLen;
}
catch(const std::exception& error)
{
ERROR << error.what() << ENDL;
}
return 0;
}
Thanks a lot in advance! Looking forward to your replies.
--
View this message in context: http://qpid.2158936.n2.nabble.com/Client-warning-Session-was-not-closed-cleanly-tp7602834.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
Re: [Client] warning Session was not closed cleanly
Posted by Gordon Sim <gs...@redhat.com>.
On 01/17/2014 08:36 AM, Xiong Zou wrote:
> Hi Gordon,
>
> An initial update on the test result:
> 1. Change m_subscriptions.stop(); to m_subscriptions.wait();
You need them both, keep the stop(), then add wait() after it. The
stop() tells the thread to stop, the wait() waits for it to actually exit.
> Thread stuck at m_subscriptions.wait(), no further process.
>
> 2. add m_session.close(); before m_connection.close();
>
> Invalid argument
> IT.OR.MD.CS.RMS.HA.Primary: ../include/qpid/sys/posix/Mutex.h:120:
> void qpid::sys::Mutex::unlock(): Assertion `0' failed.
> Abort (core dumped)
Hmm, that sounds like the session is not valid in some way....
> 3. After I enable to manually call m_subscriptions.cancel() as below, then
> delete pListner; works well now.
>
> for(unsigned int i; i < m_vecQueueUIDs.size(); i++)
> {
> if(!m_vecQueueUIDs[i].empty())
> m_subscriptions.cancel(m_vecQueueUIDs[i]);
> }
Ok, great!
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
Re: [Client] warning Session was not closed cleanly
Posted by Xiong Zou <zo...@gmail.com>.
Hi Gordon,
An initial update on the test result:
1. Change m_subscriptions.stop(); to m_subscriptions.wait();
Thread stuck at m_subscriptions.wait(), no further process.
2. add m_session.close(); before m_connection.close();
Invalid argument
IT.OR.MD.CS.RMS.HA.Primary: ../include/qpid/sys/posix/Mutex.h:120:
void qpid::sys::Mutex::unlock(): Assertion `0' failed.
Abort (core dumped)
3. After I enable to manually call m_subscriptions.cancel() as below, then
delete pListner; works well now.
for(unsigned int i; i < m_vecQueueUIDs.size(); i++)
{
if(!m_vecQueueUIDs[i].empty())
m_subscriptions.cancel(m_vecQueueUIDs[i]);
}
Thanks & regards,
Xiong Zou
--
View this message in context: http://qpid.2158936.n2.nabble.com/Client-warning-Session-was-not-closed-cleanly-tp7602834p7602894.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
Re: [Client] warning Session was not closed cleanly
Posted by Gordon Sim <gs...@redhat.com>.
On 01/16/2014 02:07 PM, Xiong Zou wrote:
> Hi Gordon,
>
> Appreciate your help a lot!
>
> To use the new qpid:messaging APIs, I can just replace qpid::client to
> qpid:messaging in my source codes? I cannot find any pub-sub sample code for
> new qpid:messaging APIs.
No, your code will require changes I'm afraid. The APIs are different.
However the good news is that I believe you will find the
qpid::messaging API much simpler.
For pub-sub in particular, there is little special code required. You
can even use the drain and spout examples. See for example the section
on 'Topics' in
http://qpid.apache.org/releases/qpid-0.24/programming/book/section-addresses.html
> As for the existing qpid::client APIs, I will test your suggestions and
> update the result.
>
> And I found some difficulty if I use m_subscriptions.run(). In this case I
> will call m_subscriptions.cancel(m_vecQueueUIDs[i]); from a different
> thread,
You are doing that anyway, since all start() does is tart a thread that
calls run().
> are these APIs of m_subscriptions thread safe? They seems not when I
> check their source codes.
Yes, they should be threadsafe.
> Or I have to send cancel subscription messages into receiver queues from
> another thread, which is a bit troublesome with my current design.
>
> Thanks & regards,
> Xiong
>
>
>
>
> --
> View this message in context: http://qpid.2158936.n2.nabble.com/Client-warning-Session-was-not-closed-cleanly-tp7602834p7602853.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
Re: [Client] warning Session was not closed cleanly
Posted by Xiong Zou <zo...@gmail.com>.
Hi Gordon,
Appreciate your help a lot!
To use the new qpid:messaging APIs, I can just replace qpid::client to
qpid:messaging in my source codes? I cannot find any pub-sub sample code for
new qpid:messaging APIs.
As for the existing qpid::client APIs, I will test your suggestions and
update the result.
And I found some difficulty if I use m_subscriptions.run(). In this case I
will call m_subscriptions.cancel(m_vecQueueUIDs[i]); from a different
thread, are these APIs of m_subscriptions thread safe? They seems not when I
check their source codes.
Or I have to send cancel subscription messages into receiver queues from
another thread, which is a bit troublesome with my current design.
Thanks & regards,
Xiong
--
View this message in context: http://qpid.2158936.n2.nabble.com/Client-warning-Session-was-not-closed-cleanly-tp7602834p7602853.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
Re: [Client] warning Session was not closed cleanly
Posted by Gordon Sim <gs...@redhat.com>.
On 01/16/2014 02:04 AM, Xiong Zou wrote:
> Hi All,
>
> I am experimenting qpid Pub-Sub pattern, referring to
> examples/old_api/pub-sub sample codes. My qpid version is 0.20.
I would strongly advise you to switch to the qpid::messaging API[1] if
at all possible. The qpid::client API has been removed in the latest
versions of Qpid. Further comments on the code inline.
[1] http://qpid.apache.org/releases/qpid-0.24/programming/book/index.html
> Sometimes I received warning message like below sample: when I stop my
> binary:
> 2014-01-16 08:58:33 [Client] warning Session was not closed cleanly:
> anonymous.fa805922-8f1b-4e67-9710-a3da31e9bfc8
[...]
> void CListener::stop()
> {
> //No need to cancel subscriptions with
> m_subscriptions.stop().
> //for(unsigned int i; i < m_vecQueueUIDs.size(); i++)
> //{
> // if(!m_vecQueueUIDs[i].empty())
> // m_subscriptions.cancel(m_vecQueueUIDs[i]);
> //}
> // stop the spawned thread
> m_subscriptions.stop();
Add
m_subscriptions.wait();
to wait for the thread to actually exit here. I would personally advise
you to start your own thread and call m_subscriptions.run() on it, as
the start()/stop() method doesn't handle exceptions occurring on
dispatch well.
> }
>
> };
[...]
> IPC::CQPIDPubSubAdaptor::~CQPIDPubSubAdaptor()
> {
> INFO << "start to clear up QPIDPubSubAdaptor" << ENDL;
> if(m_pConListner)
> {
> CListener* pListner = (CListener*)m_pConListner;
> pListner->stop();
> //delete pListner; //cannot delete listner, will cause
> ~Mutex() to signal
> m_pConListner = NULL;
> }
>
Do an explicit m_session.close() here as well. If you want to be extr
safe to an explicit m_session.sync() before that to ensure all inflight
operations are complete.
> m_connection.close();
>
> if(m_pSession)
> {
> delete m_pSession;
> m_pSession = NULL;
> }
>
> if(m_pHelpQueue)
> {
> delete m_pHelpQueue;
> m_pHelpQueue = NULL;
> }
> }
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org