You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by joseluis <jl...@gmail.com> on 2011/09/26 20:44:59 UTC
memory consumption on broker not closing session
I wrote a program who sends 1000 messages per second on testing address
continuously
The program below, read these messages, and after 10000 messages, create a
new session and receptor with same configuration as before
The receptor and session get out of scope before creating the new one, but
if I don't call session.close(); the memory on broker increase quickly
If I call session.close(); before creating the new session instance, it
doesn't happen
The topic was created with...
qpid-config add exchange topic testing
Looks like if reception.close(); or reception out of scope is not enough
to remove the "suscription" on broker side and the broker save all the
messages for a deleted reception and session objects
If so, it will be necessary to use just a reception per session in order to
let call session.close(); when a reception is not required by the program
anymore
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <iostream>
using namespace qpid::messaging;
int main(int /*argc*/, char** /*argv*/) {
std::string broker = "localhost:5672";
std::string address = "testing";
Connection connection(broker, "");
try {
connection.open();
{
Session session = connection.createSession();
std::string receiver_config = address + "/#;
{assert:always, node:{type:topic, durable:False},
link:{reliability:unreliable, durable:False} }";
Receiver receiver = session.createReceiver(receiver_config);
receiver.setCapacity(10);
for(int counter=0; counter<10000; ++counter)
Message message = receiver.fetch(Duration::SECOND * 1);
//session.close(); // this line commented, generates a
fast increase of memory on broker
}
std::cout << "other subscription receiver gets out of scope" <<
std::endl;
{
Session session = connection.createSession();
std::string receiver_config = address + "/#;
{assert:always, node:{type:topic, durable:False},
link:{reliability:unreliable, durable:False} }";
Receiver receiver = session.createReceiver(receiver_config);
for(int counter=0; counter<60000; ++counter)
Message message = receiver.fetch(Duration::SECOND * 1);
}
connection.close();
return 0;
} catch(const std::exception& error) {
std::cerr << error.what() << std::endl;
connection.close();
return 1;
}
}
--
View this message in context: http://apache-qpid-users.2158936.n2.nabble.com/memory-consumption-on-broker-not-closing-session-tp6833011p6833011.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org
Re: memory consumption on broker not closing session
Posted by joseluis <jl...@gmail.com>.
Thanks, it's working configuring the flags of linked queue
--
View this message in context: http://apache-qpid-users.2158936.n2.nabble.com/memory-consumption-on-broker-not-closing-session-tp6833011p6835525.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org
Re: memory consumption on broker not closing session
Posted by Gordon Sim <gs...@redhat.com>.
On 09/26/2011 07:44 PM, joseluis wrote:
> I wrote a program who sends 1000 messages per second on testing address
> continuously
>
>
> The program below, read these messages, and after 10000 messages, create a
> new session and receptor with same configuration as before
>
>
> The receptor and session get out of scope before creating the new one, but
> if I don't call session.close(); the memory on broker increase quickly
>
> If I call session.close(); before creating the new session instance, it
> doesn't happen
>
>
> The topic was created with...
>
> qpid-config add exchange topic testing
>
>
> Looks like if reception.close(); or reception out of scope is not enough
> to remove the "suscription" on broker side and the broker save all the
> messages for a deleted reception and session objects
Good point. Though the Receiver::close() cancels the subscription, it
does not actually delete the subscription queue or unbind it correctly.
It is marked as auto-delete and exclusive (by default) and thus will be
deleted by the broker only when the session ends.
> If so, it will be necessary to use just a reception per session in order to
> let call session.close(); when a reception is not required by the program
> anymore
Yes, that is a bug (I've raised a JIRA:
https://issues.apache.org/jira/browse/QPID-3508 and will get a fix in
shortly). As a workaround you could explicitly control the flags on the
queue. E.g.
address + "/#; {assert always, node: {type: topic}, link:
{x-declare:{exclusive:False, auto-delete:True}}}"
> #include<qpid/messaging/Connection.h>
> #include<qpid/messaging/Message.h>
> #include<qpid/messaging/Receiver.h>
> #include<qpid/messaging/Sender.h>
> #include<qpid/messaging/Session.h>
>
> #include<iostream>
>
> using namespace qpid::messaging;
>
> int main(int /*argc*/, char** /*argv*/) {
> std::string broker = "localhost:5672";
> std::string address = "testing";
>
> Connection connection(broker, "");
> try {
> connection.open();
>
>
> {
> Session session = connection.createSession();
> std::string receiver_config = address + "/#;
> {assert:always, node:{type:topic, durable:False},
> link:{reliability:unreliable, durable:False} }";
> Receiver receiver = session.createReceiver(receiver_config);
> receiver.setCapacity(10);
>
> for(int counter=0; counter<10000; ++counter)
> Message message = receiver.fetch(Duration::SECOND * 1);
>
> //session.close(); // this line commented, generates a
> fast increase of memory on broker
> }
>
> std::cout<< "other subscription receiver gets out of scope"<<
> std::endl;
>
> {
> Session session = connection.createSession();
> std::string receiver_config = address + "/#;
> {assert:always, node:{type:topic, durable:False},
> link:{reliability:unreliable, durable:False} }";
> Receiver receiver = session.createReceiver(receiver_config);
>
>
> for(int counter=0; counter<60000; ++counter)
> Message message = receiver.fetch(Duration::SECOND * 1);
> }
>
>
> connection.close();
> return 0;
> } catch(const std::exception& error) {
> std::cerr<< error.what()<< std::endl;
> connection.close();
> return 1;
> }
> }
>
> --
> View this message in context: http://apache-qpid-users.2158936.n2.nabble.com/memory-consumption-on-broker-not-closing-session-tp6833011p6833011.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:users-subscribe@qpid.apache.org
>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org