You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Virgilio Alexandre Fornazin <vi...@gmail.com> on 2011/09/14 18:44:01 UTC

Receiver capacity and performance issues

Hi all

 

I'm having some strange results with receiver capacity. I'm assuming when
the capacity is higher, more messages will become prefetched from server,
but I get some strange results.

I have a program that send 500000 messages to a queue / topic (in alternate
mode) that send messages very fast, but receiver performance is directly
related to capacity: all receiver

capacity between 256 and 512 messages bring fast results, otherwise
degradation in receiver performance is detected.

 

When I define a receiver capacity to 65536, the receiving flow is very slow,
like this output:

 

[2011-Sep-14 16:34:13.996503] received message 89000

[2011-Sep-14 16:34:14.194874] received message 90000

[2011-Sep-14 16:34:14.400461] received message 91000

[2011-Sep-14 16:34:14.616668] received message 92000

[2011-Sep-14 16:34:14.843150] received message 93000

[2011-Sep-14 16:34:15.058240] received message 94000

 

The flow goes to 4000 messages / second at max.

 

But, when I define the receiver capacity to 512, the difference is huge:

 

[2011-Sep-14 16:36:57.981603] received message 246000

[2011-Sep-14 16:36:58.008238] received message 247000

[2011-Sep-14 16:36:58.035268] received message 248000

[2011-Sep-14 16:36:58.060602] received message 249000

[2011-Sep-14 16:36:58.086925] received message 250000

[2011-Sep-14 16:36:58.112095] received message 251000

[2011-Sep-14 16:36:58.139018] received message 252000

[2011-Sep-14 16:36:58.164436] received message 253000

[2011-Sep-14 16:36:58.190823] received message 254000

[2011-Sep-14 16:36:58.217884] received message 255000

[2011-Sep-14 16:36:58.244004] received message 256000

[2011-Sep-14 16:36:58.271129] received message 257000

[2011-Sep-14 16:36:58.300046] received message 258000

[2011-Sep-14 16:36:58.329023] received message 259000

[2011-Sep-14 16:36:58.359524] received message 260000

[2011-Sep-14 16:36:58.389732] received message 261000

[2011-Sep-14 16:36:58.418393] received message 262000

[2011-Sep-14 16:36:58.448619] received message 263000

[2011-Sep-14 16:36:58.476850] received message 264000

[2011-Sep-14 16:36:58.507046] received message 265000

[2011-Sep-14 16:36:58.537993] received message 266000

[2011-Sep-14 16:36:58.567516] received message 267000

[2011-Sep-14 16:36:58.598180] received message 268000

[2011-Sep-14 16:36:58.628843] received message 269000

[2011-Sep-14 16:36:58.659675] received message 270000

[2011-Sep-14 16:36:58.691512] received message 271000

[2011-Sep-14 16:36:58.721224] received message 272000

[2011-Sep-14 16:36:58.751696] received message 273000

[2011-Sep-14 16:36:58.782686] received message 274000

[2011-Sep-14 16:36:58.813318] received message 275000

[2011-Sep-14 16:36:58.844414] received message 276000

[2011-Sep-14 16:36:58.874242] received message 277000

[2011-Sep-14 16:36:58.905431] received message 278000

[2011-Sep-14 16:36:58.935342] received message 279000

[2011-Sep-14 16:36:58.965506] received message 280000

[2011-Sep-14 16:36:58.995814] received message 281000

[2011-Sep-14 16:36:59.026351] received message 282000

 

Now, I can receive at 35000 messages / second rate, 7 times faster!

 

I'm using RHEL 6.1 (all patched applied) with MRG 2.0 (qpid 0.10).

 

The program is running locally on server console (sample code goes below),
all of them built with command line

 

g++ -m64 -lqpidclient -lqpidcommon -lqpidmessaging -lqpidtypes
-lboost_date_time-mt -lboost_thread-mt -o qpidtest -Wall -O3 qpidtest.cpp

 

---------------------------------------------- qpidtest.cpp
------------------------------------------------

 

#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Address.h>
#include <string>
#include <iostream>
#include <boost/date_time.hpp>
#include <boost/thread.hpp>

using namespace std;
using namespace qpid::messaging;

#ifdef _MSC_VER
#ifdef NDEBUG
#pragma comment(lib, "qpidmessaging.lib")
#pragma comment(lib, "qpidclient.lib")
#pragma comment(lib, "qpidcommon.lib")
#pragma comment(lib, "qpidtypes.lib")
#else
#pragma comment(lib, "qpidmessagingd.lib")
#pragma comment(lib, "qpidclientd.lib")
#pragma comment(lib, "qpidcommond.lib")
#pragma comment(lib, "qpidtypesd.lib")
#endif
#endif

static int number_of_messages_received = 0;

void background_receive_thread(bool * connected, Session *
receiving_session)
{
         boost::this_thread::disable_interruption disable_interruptions;

         try
         {
                 while (* connected)
                 {
                          Receiver receiver;

                          if (receiving_session->nextReceiver(receiver,
Duration::SECOND))
                          {
                                   Message message;
                                            
                                   if (receiver.fetch(message,
Duration::SECOND))
                                   {
 
receiving_session->acknowledge(message);

                                            ++number_of_messages_received;

                                            if ((number_of_messages_received
% 1000) == 0)
                                            {
                                                     cout << "[" <<
boost::get_system_time() << "] received message " <<
number_of_messages_received << endl;
                                            }
                                   }
                          }
                 }
         }
         catch (exception & e)
         {
                 cout << e.what() << endl;
         }
         catch (...)
         {
                 cout << "Unexpected exception..." << endl;
         }

         cout << "Messages received: " << number_of_messages_received <<
endl;
}

int main(int argc, char ** argv)
{
         const int max_messages = 300000;
         const int default_sender_capacity = 65536;
         const int default_receiver_capacity = 512;

         const char * qpid_address = "10.31.0.16:5672";
         const char * qpid_options = "{ heartbeat: 15, tcp-nodelay: true,
reconnect: true, reconnect-timeout: 999999999, reconnect-limit: 999999999,
reconnect-interval: 1 }";

         Connection connection(qpid_address, qpid_options);
                 
         connection.open();

         Session declare_session = connection.createSession();

         const char * queue_declare = "my_receiving_queue ; { mode: browse,
create: always, delete: always, node: { type: queue, durable: false,
x-declare: { auto-delete: true } } }";
         
         Receiver queue_declare_receiver =
declare_session.createReceiver(queue_declare);

         declare_session.sync();

         const char * queue_binding_direct_declare = "my_receiving_queue ; {
mode: browse, create: always, delete: always, node: { type: queue, durable:
false, x-bindings: [{ exchange: amq.direct, queue: my_receiving_queue, key:
my_receiving_queue }], x-declare: { auto-delete: true } } }";
         
         Receiver queue_binding_direct_declare_receiver =
declare_session.createReceiver(queue_binding_direct_declare);

         declare_session.sync();

         const char * queue_binding_topic_declare = "my_receiving_queue ; {
mode: browse, create: always, delete: always, node: { type: queue, durable:
false, x-bindings: [{ exchange: amq.topic, queue: my_receiving_queue, key:
my_topic.* }], x-declare: { auto-delete: true } } }";
         
         Receiver queue_binding_topic_declare_receiver =
declare_session.createReceiver(queue_binding_topic_declare);

         declare_session.sync();

         Session receive_session = connection.createSession();

         const char * queue_receive = "my_receiving_queue ; { mode: consume,
node: { type: queue } }";

         Receiver queue_receive_receiver =
receive_session.createReceiver(queue_receive);

         queue_receive_receiver.setCapacity(default_receiver_capacity);

         receive_session.sync();
         
         bool connected = true;

         boost::thread thread(background_receive_thread, &connected,
&receive_session);

         Connection send_connection(qpid_address, qpid_options);
                 
         send_connection.open();

         Session send_session = send_connection.createSession();

         const char * direct_destination = "amq.direct/my_receiving_queue";

         Sender direct_sender =
send_session.createSender(direct_destination);

         direct_sender.setCapacity(default_sender_capacity);

         send_session.sync();

         const char * topic_destination = "amq.topic/my_topic.blahblahblah";

         Sender topic_sender = send_session.createSender(topic_destination);

         topic_sender.setCapacity(default_sender_capacity);

         send_session.sync();

         char blahbuffer [1024];

         ::memset(blahbuffer, 'A', sizeof(blahbuffer));

         // Send messages to queue using different senders

         for (int i = 0; i < max_messages; ++i)
         {
                 Message message(blahbuffer, sizeof(blahbuffer));

                 message.setDurable(true);

                 (i % 2 ? &topic_sender : &direct_sender)->send(message);

                 if ((i % 5000) == 0)
                 {
                          cout << "[" << boost::get_system_time() << "] sent
message " << i << endl;
                 }
         }

         send_session.sync();

         cout << "Messages sent..." << endl;

         while (number_of_messages_received < max_messages)
         {
                 boost::thread::sleep(boost::get_system_time() +
boost::posix_time::milliseconds(1000));
         }

         connected = false;

         thread.join();

         topic_sender.close();
         direct_sender.close();
         send_session.close();
         send_connection.close();

         queue_receive_receiver.close();
         receive_session.close();
         declare_session.close();

         queue_binding_topic_declare_receiver.close();
         queue_binding_direct_declare_receiver.close();
         queue_declare_receiver.close();

         connection.close();

         cout << "Program terminated" << endl;
}