You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2009/11/13 06:37:54 UTC
svn commit: r835746 - in /qpid/trunk/qpid/cpp/src/tests: declare_queues.cpp
failover_soak.cpp replaying_sender.cpp resuming_receiver.cpp
run_failover_soak
Author: mgoulish
Date: Fri Nov 13 05:37:54 2009
New Revision: 835746
URL: http://svn.apache.org/viewvc?rev=835746&view=rev
Log:
Make failover_soak and its children adjustable as to
the number of brokers in the cluster, and the number
of queues talking through the cluster during the test.
resuming_receiver.cpp and replaying_sender.cpp now
take command line args to control the queue name.
If more than 1 queue is desired, failover_soak.cpp
will start up N queue, each with its own sender and
receiver.
Queue names are now made unique with the failover_soak
PID as part of their name.
Modified:
qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp
qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp
qpid/trunk/qpid/cpp/src/tests/run_failover_soak
Modified: qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp Fri Nov 13 05:37:54 2009
@@ -25,46 +25,73 @@
#include <cstdlib>
#include <iostream>
+#include <sstream>
using namespace qpid::client;
using namespace std;
-int main(int argc, char ** argv)
+int
+main(int argc, char ** argv)
{
ConnectionSettings settings;
- if ( argc != 4 )
+ if ( argc != 6 )
{
- cerr << "Usage: declare_queues host port durability\n";
+ cerr << "Usage: declare_queues host port durability queue_name_prefix n_queues\n";
return 1;
}
settings.host = argv[1];
settings.port = atoi(argv[2]);
int durability = atoi(argv[3]);
+ char const * queue_name_prefix = argv[4];
+ int n_queues = atoi(argv[5]);
FailoverManager connection(settings);
- try {
- bool complete = false;
- while (!complete) {
- Session session = connection.connect().newSession();
- try {
- if ( durability )
- session.queueDeclare(arg::queue="message_queue", arg::durable=true);
- else
- session.queueDeclare(arg::queue="message_queue");
- complete = true;
- } catch (const qpid::TransportFailure&) {}
- }
- connection.close();
- return 0;
- } catch (const exception& error) {
- cerr << "declare_queues failed:" << error.what() << endl;
- cerr << " host: " << settings.host
- << " port: " << settings.port << endl;
- return 1;
+
+ int max_fail = 13;
+ for ( int i = 0; i < n_queues; ++ i ) {
+ stringstream queue_name;
+ queue_name << queue_name_prefix << '_' << i;
+
+ bool queue_created = false;
+ int failure_count;
+
+ // Any non-transport failure is Bad.
+ try
+ {
+ while ( ! queue_created ) {
+ Session session = connection.connect().newSession();
+ // TransportFailures aren't too bad -- they might happen because
+ // we are doing a cluster failover test. But if we get too many,
+ // we will still bug out.
+ failure_count = 0;
+ try {
+ if ( durability )
+ session.queueDeclare(arg::queue=queue_name.str(), arg::durable=true);
+ else
+ session.queueDeclare(arg::queue=queue_name.str());
+ queue_created = true;
+ cout << "declare_queues: Created queue " << queue_name.str() << endl;
+ }
+ catch ( const qpid::TransportFailure& ) {
+ if ( ++ failure_count >= max_fail ) {
+ cerr << "declare_queues failed: too many transport errors.\n";
+ cerr << " host: " << settings.host
+ << " port: " << settings.port << endl;
+ return 1;
+ }
+ sleep ( 1 );
+ }
+ }
+ }
+ catch ( const exception & error) {
+ cerr << "declare_queues failed:" << error.what() << endl;
+ cerr << " host: " << settings.host
+ << " port: " << settings.port << endl;
+ return 1;
+ }
}
-
}
Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Fri Nov 13 05:37:54 2009
@@ -433,7 +433,9 @@
char const * host,
char const * path,
int verbosity,
- int durable
+ int durable,
+ char const * queue_prefix,
+ int n_queues
)
{
string name("declareQueues");
@@ -456,6 +458,13 @@
argv.push_back ( "1" );
else
argv.push_back ( "0" );
+
+ argv.push_back ( queue_prefix );
+
+ char n_queues_str[20];
+ sprintf ( n_queues_str, "%d", n_queues );
+ argv.push_back ( n_queues_str );
+
argv.push_back ( 0 );
pid_t pid = fork();
@@ -475,10 +484,11 @@
pid_t
startReceivingClient ( brokerVector brokers,
- char const * host,
- char const * receiverPath,
- char const * reportFrequency,
- int verbosity
+ char const * host,
+ char const * receiverPath,
+ char const * reportFrequency,
+ int verbosity,
+ char const * queue_name
)
{
string name("receiver");
@@ -502,6 +512,7 @@
argv.push_back ( portStr );
argv.push_back ( reportFrequency );
argv.push_back ( verbosityStr );
+ argv.push_back ( queue_name );
argv.push_back ( 0 );
pid_t pid = fork();
@@ -522,12 +533,13 @@
pid_t
startSendingClient ( brokerVector brokers,
- char const * host,
- char const * senderPath,
- char const * nMessages,
- char const * reportFrequency,
+ char const * host,
+ char const * senderPath,
+ char const * nMessages,
+ char const * reportFrequency,
int verbosity,
- int durability
+ int durability,
+ char const * queue_name
)
{
string name("sender");
@@ -555,6 +567,7 @@
argv.push_back ( "1" );
else
argv.push_back ( "0" );
+ argv.push_back ( queue_name );
argv.push_back ( 0 );
pid_t pid = fork();
@@ -589,10 +602,10 @@
int
main ( int argc, char const ** argv )
{
- if ( argc != 9 ) {
+ if ( argc != 11 ) {
cerr << "Usage: "
<< argv[0]
- << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable"
+ << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable n_queues n_brokers"
<< endl;
cerr << "\tverbosity is an integer, durable is 0 or 1\n";
return BAD_ARGS;
@@ -608,6 +621,8 @@
char const * reportFrequency = argv[i++];
int verbosity = atoi(argv[i++]);
int durable = atoi(argv[i++]);
+ int n_queues = atoi(argv[i++]);
+ int n_brokers = atoi(argv[i++]);
char const * host = "127.0.0.1";
int maxBrokers = 50;
@@ -625,8 +640,7 @@
if ( verbosity > 1 )
cout << "Starting initial cluster...\n";
- int nBrokers = 3;
- for ( int i = 0; i < nBrokers; ++ i ) {
+ for ( int i = 0; i < n_brokers; ++ i ) {
startNewBroker ( brokers,
moduleOrDir,
clusterName,
@@ -638,10 +652,22 @@
if ( verbosity > 0 )
printBrokers ( brokers );
+ // Get prefix for each queue name.
+ stringstream queue_prefix;
+ queue_prefix << "failover_soak_" << getpid();
+
+
// Run the declareQueues child.
int childStatus;
pid_t dqClientPid =
- runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity, durable );
+ runDeclareQueuesClient ( brokers,
+ host,
+ declareQueuesPath,
+ verbosity,
+ durable,
+ queue_prefix.str().c_str(),
+ n_queues
+ );
if ( -1 == dqClientPid ) {
cerr << "END_OF_TEST ERROR_START_DECLARE_1\n";
return CANT_FORK_DQ;
@@ -656,32 +682,42 @@
allMyChildren.exited ( dqClientPid, childStatus );
-
- // Start the receiving client.
- pid_t receivingClientPid =
- startReceivingClient ( brokers,
- host,
- receiverPath,
- reportFrequency,
- verbosity );
- if ( -1 == receivingClientPid ) {
- cerr << "END_OF_TEST ERROR_START_RECEIVER\n";
- return CANT_FORK_RECEIVER;
- }
+ /*
+ Start one receiving and one sending client for each queue.
+ */
+ for ( int i = 0; i < n_queues; ++ i ) {
+
+ stringstream queue_name;
+ queue_name << queue_prefix.str() << '_' << i;
+
+ // Receiving client ---------------------------
+ pid_t receivingClientPid =
+ startReceivingClient ( brokers,
+ host,
+ receiverPath,
+ reportFrequency,
+ verbosity,
+ queue_name.str().c_str() );
+ if ( -1 == receivingClientPid ) {
+ cerr << "END_OF_TEST ERROR_START_RECEIVER\n";
+ return CANT_FORK_RECEIVER;
+ }
- // Start the sending client.
- pid_t sendingClientPid =
- startSendingClient ( brokers,
- host,
- senderPath,
- nMessages,
- reportFrequency,
- verbosity,
- durable );
- if ( -1 == sendingClientPid ) {
- cerr << "END_OF_TEST ERROR_START_SENDER\n";
- return CANT_FORK_SENDER;
+ // Sending client ---------------------------
+ pid_t sendingClientPid =
+ startSendingClient ( brokers,
+ host,
+ senderPath,
+ nMessages,
+ reportFrequency,
+ verbosity,
+ durable,
+ queue_name.str().c_str() );
+ if ( -1 == sendingClientPid ) {
+ cerr << "END_OF_TEST ERROR_START_SENDER\n";
+ return CANT_FORK_SENDER;
+ }
}
@@ -689,7 +725,7 @@
maxSleep = 4;
- for ( int totalBrokers = 3;
+ for ( int totalBrokers = n_brokers;
totalBrokers < maxBrokers;
++ totalBrokers
)
Modified: qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp Fri Nov 13 05:37:54 2009
@@ -54,14 +54,21 @@
uint sent;
const uint reportFrequency;
Message message;
-
int verbosity;
int persistence;
+ string queueName;
};
-Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq), verbosity(0), persistence(0)
+Sender::Sender(const std::string& queue, uint count_, uint reportFreq )
+ : sender(10),
+ count(count_),
+ sent(0),
+ reportFrequency(reportFreq),
+ verbosity(0),
+ persistence(0),
+ queueName ( queue )
{
- message.getDeliveryProperties().setRoutingKey(queue);
+ message.getDeliveryProperties().setRoutingKey(queueName.c_str());
}
void Sender::execute(AsyncSession& session, bool isRetry)
@@ -81,7 +88,13 @@
sender.send(message);
if (count > reportFrequency && !(sent % reportFrequency)) {
if ( verbosity > 0 )
- std::cout << "Sender sent " << sent << " of " << count << std::endl;
+ std::cout << "Sender sent "
+ << sent
+ << " of "
+ << count
+ << " on queue "
+ << queueName
+ << std::endl;
}
}
message.setData("That's all, folks!");
@@ -104,9 +117,9 @@
{
ConnectionSettings settings;
- if ( argc != 7 )
+ if ( argc != 8 )
{
- std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity persistence\n";
+ std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity persistence queue_name\n";
return 1;
}
@@ -116,9 +129,10 @@
int reportFrequency = atoi(argv[4]);
int verbosity = atoi(argv[5]);
int persistence = atoi(argv[6]);
+ char * queue_name = argv[7];
FailoverManager connection(settings);
- Sender sender("message_queue", n_messages, reportFrequency );
+ Sender sender(queue_name, n_messages, reportFrequency );
sender.setVerbosity ( verbosity );
sender.setPersistence ( persistence );
try {
@@ -127,7 +141,8 @@
{
std::cout << "Sender finished. Sent "
<< sender.getSent()
- << " messages."
+ << " messages on queue "
+ << queue_name
<< endl;
}
connection.close();
Modified: qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp Fri Nov 13 05:37:54 2009
@@ -27,7 +27,6 @@
#include <iostream>
#include <fstream>
-
using namespace qpid;
using namespace qpid::client;
using namespace qpid::framing;
@@ -35,6 +34,7 @@
using namespace std;
+
namespace qpid {
namespace tests {
@@ -43,31 +43,34 @@
public FailoverManager::ReconnectionStrategy
{
public:
- Listener ( int report_frequency = 1000, int verbosity = 0 );
+ Listener ( int report_frequency = 1000,
+ int verbosity = 0,
+ char const * queue_name = "message_queue" );
void received(Message& message);
void execute(AsyncSession& session, bool isRetry);
void check();
- void editUrlList(std::vector<Url>& urls);
+ void editUrlList(vector<Url>& urls);
private:
Subscription subscription;
uint count;
- uint received_twice;
+ vector<int> received_twice;
uint lastSn;
bool gaps;
uint reportFrequency;
int verbosity;
bool done;
+ string queueName;
};
-Listener::Listener(int freq, int verbosity)
+Listener::Listener ( int freq, int verbosity, char const * name )
: count(0),
- received_twice(0),
lastSn(0),
gaps(false),
reportFrequency(freq),
verbosity(verbosity),
- done(false)
+ done(false),
+ queueName ( name )
{}
@@ -78,36 +81,51 @@
done = true;
if(verbosity > 0 )
{
- std::cout << "Shutting down listener for "
- << message.getDestination() << std::endl;
+ cout << "Shutting down listener for "
+ << message.getDestination() << endl;
- std::cout << "Listener received "
+ cout << "Listener received "
<< count
<< " messages ("
- << received_twice
+ << received_twice.size()
<< " received_twice)"
<< endl;
+
}
subscription.cancel();
if ( verbosity > 0 )
- std::cout << "LISTENER COMPLETED\n";
+ cout << "LISTENER COMPLETED\n";
+
+ if ( ! gaps ) {
+ cout << "no gaps were detected\n";
+ cout << received_twice.size() << " messages were received twice.\n";
+ }
+ else {
+ cout << "gaps detected\n";
+ for ( unsigned int i = 0; i < received_twice.size(); ++ i )
+ cout << "received_twice "
+ << received_twice[i]
+ << endl;
+ }
} else {
uint sn = message.getHeaders().getAsInt("sn");
if (lastSn < sn) {
if (sn - lastSn > 1) {
- std::cerr << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl;
+ cerr << "Error: gap in sequence between " << lastSn << " and " << sn << endl;
gaps = true;
}
lastSn = sn;
++count;
if ( ! ( count % reportFrequency ) ) {
if ( verbosity > 0 )
- std::cout << "Listener has received "
+ cout << "Listener has received "
<< count
- << " messages.\n";
+ << " messages on queue "
+ << queueName
+ << endl;
}
} else {
- ++received_twice;
+ received_twice.push_back ( sn );
}
}
}
@@ -119,21 +137,21 @@
void Listener::execute(AsyncSession& session, bool isRetry) {
if (verbosity > 0)
- std::cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl;
+ cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl;
if (!done) {
SubscriptionManager subs(session);
- subscription = subs.subscribe(*this, "message_queue");
+ subscription = subs.subscribe(*this, queueName);
subs.run();
}
}
-void Listener::editUrlList(std::vector<Url>& urls)
+void Listener::editUrlList(vector<Url>& urls)
{
/**
* A more realistic algorithm would be to search through the list
* for prefered hosts and ensure they come first in the list.
*/
- if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end());
+ if (urls.size() > 1) rotate(urls.begin(), urls.begin() + 1, urls.end());
}
}} // namespace qpid::tests
@@ -144,9 +162,9 @@
{
ConnectionSettings settings;
- if ( argc != 5 )
+ if ( argc != 6 )
{
- std::cerr << "Usage: resuming_receiver host port report_frequency verbosity\n";
+ cerr << "Usage: resuming_receiver host port report_frequency verbosity queue_name\n";
return 1;
}
@@ -154,8 +172,9 @@
settings.port = atoi(argv[2]);
int reportFrequency = atoi(argv[3]);
int verbosity = atoi(argv[4]);
+ char * queue_name = argv[5];
- Listener listener(reportFrequency, verbosity);
+ Listener listener ( reportFrequency, verbosity, queue_name );
FailoverManager connection(settings, &listener);
try {
@@ -163,8 +182,8 @@
connection.close();
listener.check();
return 0;
- } catch(const std::exception& error) {
- std::cerr << "Receiver failed: " << error.what() << std::endl;
+ } catch(const exception& error) {
+ cerr << "Receiver failed: " << error.what() << endl;
}
return 1;
}
Modified: qpid/trunk/qpid/cpp/src/tests/run_failover_soak
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_failover_soak?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_failover_soak (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_failover_soak Fri Nov 13 05:37:54 2009
@@ -24,11 +24,13 @@
host=127.0.0.1
MODULES=${MODULES:-../.libs}
-MESSAGES=${MESSAGES:-300000}
-REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`}
-VERBOSITY=${VERBOSITY:-1}
-DURABILITY=${DURABILITY:-1}
+MESSAGES=${MESSAGES:-1000000}
+REPORT_FREQUENCY=${REPORT_FREQUENCY:-20000}
+VERBOSITY=${VERBOSITY:-10}
+DURABILITY=${DURABILITY:-0}
+N_QUEUES=${N_QUEUES:-1}
+N_BROKERS=${N_BROKERS:-3}
rm -f soak-*.log
-exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY
+exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY $N_QUEUES $N_BROKERS
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org