You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2009/11/13 06:37:54 UTC

svn commit: r835746 - in /qpid/trunk/qpid/cpp/src/tests: declare_queues.cpp failover_soak.cpp replaying_sender.cpp resuming_receiver.cpp run_failover_soak

Author: mgoulish
Date: Fri Nov 13 05:37:54 2009
New Revision: 835746

URL: http://svn.apache.org/viewvc?rev=835746&view=rev
Log:
Make failover_soak and its children adjustable as to 
the number of brokers in the cluster, and the number 
of queues talking through the cluster during the test.

resuming_receiver.cpp and replaying_sender.cpp now
take command line args to control the queue name.

If more than 1 queue is desired, failover_soak.cpp 
will start up N queue, each with its own sender and
receiver.

Queue names are now made unique with the failover_soak
PID as part of their name.

Modified:
    qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp
    qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
    qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
    qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp
    qpid/trunk/qpid/cpp/src/tests/run_failover_soak

Modified: qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp Fri Nov 13 05:37:54 2009
@@ -25,46 +25,73 @@
 
 #include <cstdlib>
 #include <iostream>
+#include <sstream>
 
 using namespace qpid::client;
 
 using namespace std;
 
-int main(int argc, char ** argv) 
+int 
+main(int argc, char ** argv) 
 {
     ConnectionSettings settings;
-    if ( argc != 4 )
+    if ( argc != 6 )
     {
-      cerr << "Usage: declare_queues host port durability\n";
+      cerr << "Usage: declare_queues host port durability queue_name_prefix n_queues\n";
       return 1;
     }
 
     settings.host = argv[1];
     settings.port = atoi(argv[2]);
     int durability = atoi(argv[3]);
+    char const * queue_name_prefix = argv[4];
+    int n_queues = atoi(argv[5]);
     
     FailoverManager connection(settings);
-    try {
-        bool complete = false;
-        while (!complete) {
-            Session session = connection.connect().newSession();
-            try {
-                if ( durability )
-                  session.queueDeclare(arg::queue="message_queue", arg::durable=true);
-                else
-                  session.queueDeclare(arg::queue="message_queue");
-                complete = true;
-            } catch (const qpid::TransportFailure&) {}
-        }
-        connection.close();
-        return 0;
-    } catch (const exception& error) {
-        cerr << "declare_queues failed:" << error.what() << endl;
-        cerr << "  host: " << settings.host 
-             << "  port: " << settings.port << endl;
-        return 1;
+
+    int max_fail = 13;
+    for ( int i = 0; i < n_queues; ++ i ) {
+        stringstream queue_name;
+        queue_name << queue_name_prefix << '_' << i;
+
+        bool queue_created = false;
+        int failure_count;
+
+        // Any non-transport failure is Bad.
+        try
+        {
+            while ( ! queue_created ) {
+                Session session = connection.connect().newSession();
+                // TransportFailures aren't too bad -- they might happen because
+                // we are doing a cluster failover test.  But if we get too many,
+                // we will still bug out.
+                failure_count = 0;
+                try {
+                    if ( durability )
+                        session.queueDeclare(arg::queue=queue_name.str(), arg::durable=true);
+                    else
+                        session.queueDeclare(arg::queue=queue_name.str());
+                    queue_created = true;
+                    cout << "declare_queues: Created queue " << queue_name.str() << endl;
+                }
+                catch ( const qpid::TransportFailure& ) {
+                  if ( ++ failure_count >= max_fail ) {
+                      cerr << "declare_queues failed: too many transport errors.\n";
+                      cerr << "  host: " << settings.host
+                           << "  port: " << settings.port << endl;
+                      return 1;
+                  }
+                  sleep ( 1 );
+                }
+            }
+       }
+       catch ( const exception & error) {
+           cerr << "declare_queues failed:" << error.what() << endl;
+           cerr << "  host: " << settings.host
+                << "  port: " << settings.port << endl;
+           return 1;
+       }
     }
-    
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Fri Nov 13 05:37:54 2009
@@ -433,7 +433,9 @@
                             char const *  host,
                             char const *  path,
                             int verbosity,
-                            int durable
+                            int durable,
+                            char const * queue_prefix,
+                            int n_queues
                           )
 {
     string name("declareQueues");
@@ -456,6 +458,13 @@
       argv.push_back ( "1" );
     else
       argv.push_back ( "0" );
+
+    argv.push_back ( queue_prefix );
+
+    char n_queues_str[20];
+    sprintf ( n_queues_str, "%d", n_queues );
+    argv.push_back ( n_queues_str );
+
     argv.push_back ( 0 );
     pid_t pid = fork();
 
@@ -475,10 +484,11 @@
 
 pid_t
 startReceivingClient ( brokerVector brokers,
-                         char const *  host,
-                         char const *  receiverPath,
-                         char const *  reportFrequency,
-                         int verbosity
+                         char const * host,
+                         char const * receiverPath,
+                         char const * reportFrequency,
+                         int verbosity,
+                         char const * queue_name
                        )
 {
     string name("receiver");
@@ -502,6 +512,7 @@
     argv.push_back ( portStr );
     argv.push_back ( reportFrequency );
     argv.push_back ( verbosityStr );
+    argv.push_back ( queue_name );
     argv.push_back ( 0 );
 
     pid_t pid = fork();
@@ -522,12 +533,13 @@
 
 pid_t
 startSendingClient ( brokerVector brokers,
-                       char const *  host,
-                       char const *  senderPath,
-                       char const *  nMessages,
-                       char const *  reportFrequency,
+                       char const * host,
+                       char const * senderPath,
+                       char const * nMessages,
+                       char const * reportFrequency,
                        int verbosity,
-                       int durability
+                       int durability,
+                       char const * queue_name
                      )
 {
     string name("sender");
@@ -555,6 +567,7 @@
       argv.push_back ( "1" );
     else
       argv.push_back ( "0" );
+    argv.push_back ( queue_name );
     argv.push_back ( 0 );
 
     pid_t pid = fork();
@@ -589,10 +602,10 @@
 int
 main ( int argc, char const ** argv )
 {
-    if ( argc != 9 ) {
+    if ( argc != 11 ) {
         cerr << "Usage: "
              << argv[0]
-             << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable"
+             << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable n_queues n_brokers"
              << endl;
         cerr << "\tverbosity is an integer, durable is 0 or 1\n";
         return BAD_ARGS;
@@ -608,6 +621,8 @@
     char const * reportFrequency    = argv[i++];
     int          verbosity          = atoi(argv[i++]);
     int          durable            = atoi(argv[i++]);
+    int          n_queues           = atoi(argv[i++]);
+    int          n_brokers          = atoi(argv[i++]);
 
     char const * host               = "127.0.0.1";
     int maxBrokers = 50;
@@ -625,8 +640,7 @@
     if ( verbosity > 1 )
         cout << "Starting initial cluster...\n";
 
-    int nBrokers = 3;
-    for ( int i = 0; i < nBrokers; ++ i ) {
+    for ( int i = 0; i < n_brokers; ++ i ) {
         startNewBroker ( brokers,
                          moduleOrDir,
                          clusterName,
@@ -638,10 +652,22 @@
     if ( verbosity > 0 )
         printBrokers ( brokers );
 
+     // Get prefix for each queue name.
+     stringstream queue_prefix;
+     queue_prefix << "failover_soak_" << getpid();
+
+
      // Run the declareQueues child.
      int childStatus;
      pid_t dqClientPid =
-     runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity, durable );
+     runDeclareQueuesClient ( brokers, 
+                              host, 
+                              declareQueuesPath, 
+                              verbosity, 
+                              durable,
+                              queue_prefix.str().c_str(),
+                              n_queues
+                            );
      if ( -1 == dqClientPid ) {
          cerr << "END_OF_TEST ERROR_START_DECLARE_1\n";
          return CANT_FORK_DQ;
@@ -656,32 +682,42 @@
      allMyChildren.exited ( dqClientPid, childStatus );
 
 
-
-     // Start the receiving client.
-     pid_t receivingClientPid =
-     startReceivingClient ( brokers,
-                              host,
-                              receiverPath,
-                              reportFrequency,
-                              verbosity );
-     if ( -1 == receivingClientPid ) {
-         cerr << "END_OF_TEST ERROR_START_RECEIVER\n";
-         return CANT_FORK_RECEIVER;
-     }
+     /*
+       Start one receiving and one sending client for each queue.
+     */
+     for ( int i = 0; i < n_queues; ++ i ) {
+
+         stringstream queue_name;
+         queue_name << queue_prefix.str() << '_' << i;
+
+         // Receiving client ---------------------------
+         pid_t receivingClientPid =
+         startReceivingClient ( brokers,
+                                  host,
+                                  receiverPath,
+                                  reportFrequency,
+                                  verbosity,
+                                  queue_name.str().c_str() );
+         if ( -1 == receivingClientPid ) {
+             cerr << "END_OF_TEST ERROR_START_RECEIVER\n";
+             return CANT_FORK_RECEIVER;
+         }
 
 
-     // Start the sending client.
-     pid_t sendingClientPid =
-     startSendingClient ( brokers,
-                            host,
-                            senderPath,
-                            nMessages,
-                            reportFrequency,
-                            verbosity,
-                            durable );
-     if ( -1 == sendingClientPid ) {
-         cerr << "END_OF_TEST ERROR_START_SENDER\n";
-         return CANT_FORK_SENDER;
+         // Sending client ---------------------------
+         pid_t sendingClientPid =
+         startSendingClient ( brokers,
+                                host,
+                                senderPath,
+                                nMessages,
+                                reportFrequency,
+                                verbosity,
+                                durable,
+                                queue_name.str().c_str() );
+         if ( -1 == sendingClientPid ) {
+             cerr << "END_OF_TEST ERROR_START_SENDER\n";
+             return CANT_FORK_SENDER;
+         }
      }
 
 
@@ -689,7 +725,7 @@
          maxSleep = 4;
 
 
-     for ( int totalBrokers = 3;
+     for ( int totalBrokers = n_brokers;
            totalBrokers < maxBrokers;
            ++ totalBrokers
          )

Modified: qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp Fri Nov 13 05:37:54 2009
@@ -54,14 +54,21 @@
     uint sent;
     const uint reportFrequency;
     Message message;
-
     int verbosity;
     int persistence;
+    string queueName;
 };
 
-Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq), verbosity(0), persistence(0)
+Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) 
+    : sender(10), 
+      count(count_), 
+      sent(0), 
+      reportFrequency(reportFreq), 
+      verbosity(0), 
+      persistence(0),
+      queueName ( queue )
 {
-    message.getDeliveryProperties().setRoutingKey(queue);
+    message.getDeliveryProperties().setRoutingKey(queueName.c_str());
 }
 
 void Sender::execute(AsyncSession& session, bool isRetry)
@@ -81,7 +88,13 @@
         sender.send(message);
         if (count > reportFrequency && !(sent % reportFrequency)) {
             if ( verbosity > 0 )
-                std::cout << "Sender sent " << sent << " of " << count << std::endl;
+                std::cout << "Sender sent " 
+                          << sent 
+                          << " of " 
+                          << count 
+                          << " on queue "
+                          << queueName
+                          << std::endl;
         }
     }
     message.setData("That's all, folks!");
@@ -104,9 +117,9 @@
 {
     ConnectionSettings settings;
 
-    if ( argc != 7 )
+    if ( argc != 8 )
     {
-      std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity persistence\n";
+      std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity persistence queue_name\n";
       return 1;
     }
 
@@ -116,9 +129,10 @@
     int reportFrequency = atoi(argv[4]);
     int verbosity       = atoi(argv[5]);
     int persistence     = atoi(argv[6]);
+    char * queue_name   = argv[7];
 
     FailoverManager connection(settings);
-    Sender sender("message_queue", n_messages, reportFrequency );
+    Sender sender(queue_name, n_messages, reportFrequency );
     sender.setVerbosity   ( verbosity   );
     sender.setPersistence ( persistence );
     try {
@@ -127,7 +141,8 @@
         {
             std::cout << "Sender finished.  Sent "
                       << sender.getSent()
-                      << " messages."
+                      << " messages on queue "
+                      << queue_name
                       << endl;
         }
         connection.close();

Modified: qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp Fri Nov 13 05:37:54 2009
@@ -27,7 +27,6 @@
 #include <iostream>
 #include <fstream>
 
-
 using namespace qpid;
 using namespace qpid::client;
 using namespace qpid::framing;
@@ -35,6 +34,7 @@
 using namespace std;
 
 
+
 namespace qpid {
 namespace tests {
 
@@ -43,31 +43,34 @@
                  public FailoverManager::ReconnectionStrategy
 {
   public:
-    Listener ( int report_frequency = 1000, int verbosity = 0 );
+    Listener ( int report_frequency = 1000, 
+               int verbosity = 0, 
+               char const * queue_name = "message_queue" );
     void received(Message& message);
     void execute(AsyncSession& session, bool isRetry);
     void check();
-    void editUrlList(std::vector<Url>& urls);
+    void editUrlList(vector<Url>& urls);
   private:
     Subscription subscription;
     uint count;
-    uint received_twice;
+    vector<int> received_twice;
     uint lastSn;
     bool gaps;
     uint  reportFrequency;
     int  verbosity;
     bool done;
+    string queueName;
 };
 
 
-Listener::Listener(int freq, int verbosity)
+Listener::Listener ( int freq, int verbosity, char const * name )
   : count(0),
-    received_twice(0),
     lastSn(0),
     gaps(false),
     reportFrequency(freq),
     verbosity(verbosity),
-    done(false)
+    done(false),
+    queueName ( name )
 {}
 
 
@@ -78,36 +81,51 @@
         done = true;
         if(verbosity > 0 )
         {
-            std::cout << "Shutting down listener for "
-                      << message.getDestination() << std::endl;
+            cout << "Shutting down listener for "
+                      << message.getDestination() << endl;
 
-            std::cout << "Listener received "
+            cout << "Listener received "
                       << count
                       << " messages ("
-                      << received_twice
+                      << received_twice.size()
                       << " received_twice)"
                       << endl;
+            
         }
         subscription.cancel();
         if ( verbosity > 0 )
-          std::cout << "LISTENER COMPLETED\n";
+          cout << "LISTENER COMPLETED\n";
+        
+        if ( ! gaps ) {
+          cout << "no gaps were detected\n";
+          cout << received_twice.size() << " messages were received twice.\n";
+        }
+        else {
+            cout << "gaps detected\n";
+            for ( unsigned int i = 0; i < received_twice.size(); ++ i )
+              cout << "received_twice "
+                        << received_twice[i]
+                        << endl;
+        }
     } else {
         uint sn = message.getHeaders().getAsInt("sn");
         if (lastSn < sn) {
             if (sn - lastSn > 1) {
-                std::cerr << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl;
+                cerr << "Error: gap in sequence between " << lastSn << " and " << sn << endl;
                 gaps = true;
             }
             lastSn = sn;
             ++count;
             if ( ! ( count % reportFrequency ) ) {
                 if ( verbosity > 0 )
-                    std::cout << "Listener has received "
+                    cout << "Listener has received "
                               << count
-                              << " messages.\n";
+                              << " messages on queue "
+                              << queueName
+                              << endl;
             }
         } else {
-            ++received_twice;
+            received_twice.push_back ( sn );
         }
     }
 }
@@ -119,21 +137,21 @@
 
 void Listener::execute(AsyncSession& session, bool isRetry) {
     if (verbosity > 0)
-        std::cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl;
+        cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl;
     if (!done) {
         SubscriptionManager subs(session);
-        subscription = subs.subscribe(*this, "message_queue");
+        subscription = subs.subscribe(*this, queueName);
         subs.run();
     }
 }
 
-void Listener::editUrlList(std::vector<Url>& urls)
+void Listener::editUrlList(vector<Url>& urls)
 {
     /**
      * A more realistic algorithm would be to search through the list
      * for prefered hosts and ensure they come first in the list.
      */
-    if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end());
+    if (urls.size() > 1) rotate(urls.begin(), urls.begin() + 1, urls.end());
 }
 
 }} // namespace qpid::tests
@@ -144,9 +162,9 @@
 {
     ConnectionSettings settings;
 
-    if ( argc != 5 )
+    if ( argc != 6 )
     {
-      std::cerr << "Usage: resuming_receiver host port report_frequency verbosity\n";
+      cerr << "Usage: resuming_receiver host port report_frequency verbosity queue_name\n";
       return 1;
     }
 
@@ -154,8 +172,9 @@
     settings.port       = atoi(argv[2]);
     int reportFrequency = atoi(argv[3]);
     int verbosity       = atoi(argv[4]);
+    char * queue_name   = argv[5];
 
-    Listener listener(reportFrequency, verbosity);
+    Listener listener ( reportFrequency, verbosity, queue_name );
     FailoverManager connection(settings, &listener);
 
     try {
@@ -163,8 +182,8 @@
         connection.close();
         listener.check();
         return 0;
-    } catch(const std::exception& error) {
-        std::cerr << "Receiver failed: " << error.what() << std::endl;
+    } catch(const exception& error) {
+        cerr << "Receiver failed: " << error.what() << endl;
     }
     return 1;
 }

Modified: qpid/trunk/qpid/cpp/src/tests/run_failover_soak
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_failover_soak?rev=835746&r1=835745&r2=835746&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_failover_soak (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_failover_soak Fri Nov 13 05:37:54 2009
@@ -24,11 +24,13 @@
 host=127.0.0.1
 
 MODULES=${MODULES:-../.libs}
-MESSAGES=${MESSAGES:-300000}
-REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`}
-VERBOSITY=${VERBOSITY:-1}
-DURABILITY=${DURABILITY:-1}
+MESSAGES=${MESSAGES:-1000000}
+REPORT_FREQUENCY=${REPORT_FREQUENCY:-20000}
+VERBOSITY=${VERBOSITY:-10}
+DURABILITY=${DURABILITY:-0}
+N_QUEUES=${N_QUEUES:-1}
+N_BROKERS=${N_BROKERS:-3}
 
 rm -f soak-*.log
-exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY
+exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY $N_QUEUES $N_BROKERS
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org