You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/08/22 16:13:09 UTC

svn commit: r1619814 - in /qpid/trunk/qpid/cpp/src/tests: brokertest.py ha_tests.py test_store.cpp

Author: aconway
Date: Fri Aug 22 14:13:09 2014
New Revision: 1619814

URL: http://svn.apache.org/r1619814
Log:
NO-JIRA: Clean up test_store.cpp async functionality.

Clean up test_store.cpp to allow control over async completion of messsages.

Modified:
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/cpp/src/tests/test_store.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1619814&r1=1619813&r2=1619814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Fri Aug 22 14:13:09 2014
@@ -402,7 +402,7 @@ class Broker(Popen):
 
     def host_port(self): return "%s:%s" % (self.host(), self.port())
 
-    def ready(self, timeout=30, **kwargs):
+    def ready(self, timeout=10, **kwargs):
         """Wait till broker is ready to serve clients"""
         deadline = time.time()+timeout
         while True:

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1619814&r1=1619813&r2=1619814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Aug 22 14:13:09 2014
@@ -1514,16 +1514,15 @@ class TransactionTests(HaBrokerTest):
         self.assertRaises(Exception, commit_sync)
 
     def test_tx_backup_fail(self):
-        cluster = HaCluster(
-            self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]])
+        cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]])
         c = cluster[0].connect(protocol=self.tx_protocol)
         tx = c.session(transactional=True)
         s = tx.sender("q;{create:always,node:{durable:true}}")
-        for m in ["foo","bang","bar"]: s.send(qm.Message(m, durable=True))
+        for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True))
         self.assert_commit_raises(tx)
         for b in cluster: b.assert_browse_backup("q", [])
-        self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
-        self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
+        self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
+        self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n")
 
     def test_tx_join_leave(self):
         """Test cluster members joining/leaving cluster.

Modified: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_store.cpp?rev=1619814&r1=1619813&r2=1619814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_store.cpp Fri Aug 22 14:13:09 2014
@@ -21,15 +21,18 @@
 
 
 /**@file
- * Plug-in message store for tests.
  *
- * Add functionality as required, build up a comprehensive set of
- * features to support persistent behavior tests.
+ * Message store for tests, with two roles:
+ *
+ * 1. Dump store events to a text file that can be compared to expected event
+ *    sequence
+ *
+ * 2. Emulate hard-to-recreate conditions such as asynchronous completion delays
+ *    or store errors.
+ *
+ * Messages with specially formatted contents trigger various actions.
+ * See class Action below for available actions and message format..
  *
- * Current features special "action" messages can:
- *  - raise exception from enqueue.
- *  - force host process to exit.
- *  - do async completion after a delay.
  */
 
 #include "qpid/broker/NullMessageStore.h"
@@ -58,12 +61,91 @@ using namespace qpid::sys;
 namespace qpid {
 namespace tests {
 
+namespace {
+
+bool startswith(const string& s, const string& prefix) {
+    return s.compare(0, prefix.size(), prefix) == 0;
+}
+
+void split(const string& s, vector<string>& result, const char* sep=" \t\n") {
+    size_t i = s.find_first_not_of(sep);
+    while (i != string::npos) {
+        size_t j = s.find_first_of(sep, i);
+        if (j == string::npos) {
+            result.push_back(s.substr(i));
+            break;
+        }
+        result.push_back(s.substr(i, j-i));
+        i = s.find_first_not_of(sep, j);
+    }
+}
+
+}
+
+/**
+ * Action message format is TEST_STORE_DO [<name>...]:<action> [<args>...]
+ *
+ * A list of store <name> can be included so the action only executes on one of
+ * the named stores. This is useful in a cluster setting where the same message
+ * is replicated to all broker's stores but should only trigger an action on
+ * specific ones. If no <name> is given, execute on any store.
+ *
+ */
+class Action {
+  public:
+    /** Available actions */
+    enum ActionEnum {
+        NONE,
+        THROW,                  ///< Throw an exception from enqueue
+        DELAY,                  ///< Delay completion, takes an ID string to complete.
+        COMPLETE,               ///< Complete a previously delayed message, takes ID
+
+        N_ACTIONS               // Count of actions, must be last
+    };
+
+    string name;
+    ActionEnum index;
+    vector<string> storeNames, args;
+
+    Action(const string& s) {
+        index = NONE;
+        if (!startswith(s, PREFIX)) return;
+        size_t colon = s.find_first_of(":");
+        if (colon == string::npos) return;
+        assert(colon >= PREFIX.size());
+        split(s.substr(PREFIX.size(), colon-PREFIX.size()), storeNames);
+        split(s.substr(colon+1), args);
+        if (args.empty()) return;
+        for (size_t i = 0; i < N_ACTIONS; ++i) {
+            if (args[0] == ACTION_NAMES[i]) {
+                name = args[0];
+                index = ActionEnum(i);
+                args.erase(args.begin());
+                break;
+            }
+        }
+    }
+
+    bool executeIn(const string& storeName) {
+        return storeNames.empty() ||
+            find(storeNames.begin(), storeNames.end(), storeName) !=storeNames.end();
+    }
+
+  private:
+    static string PREFIX;
+    static const char* ACTION_NAMES[N_ACTIONS];
+};
+
+string Action::PREFIX("TEST_STORE_DO");
+
+const char* Action::ACTION_NAMES[] = { "none", "throw", "delay", "complete" };
+
+
 struct TestStoreOptions : public Options {
 
     string name;
     string dump;
     string events;
-    vector<string> throwMsg; // Throw exception if message content matches.
 
     TestStoreOptions() : Options("Test Store Options") {
         addOptions()
@@ -73,22 +155,10 @@ struct TestStoreOptions : public Options
              "File to dump enqueued messages.")
             ("test-store-events", optValue(events, "FILE"),
              "File to log events, 1 line per event.")
-            ("test-store-throw", optValue(throwMsg, "CONTENT"),
-             "Throw exception if message content matches.")
             ;
     }
 };
 
-struct Completer : public Runnable {
-    boost::intrusive_ptr<PersistableMessage> message;
-    int usecs;
-    Completer(boost::intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {}
-    void run() {
-        qpid::sys::usleep(usecs);
-        message->enqueueComplete();
-        delete this;
-    }
-};
 
 class TestStore : public NullMessageStore {
   public:
@@ -97,8 +167,7 @@ class TestStore : public NullMessageStor
     {
         QPID_LOG(info, "TestStore name=" << name
                  << " dump=" << options.dump
-                 << " events=" << options.events
-                 << " throw messages =" << options.throwMsg.size());
+                 << " events=" << options.events)
 
         if (!options.dump.empty())
             dump.reset(new ofstream(options.dump.c_str()));
@@ -154,7 +223,6 @@ class TestStore : public NullMessageStor
                  const boost::intrusive_ptr<PersistableMessage>& pmsg,
                  const PersistableQueue& queue)
     {
-        QPID_LOG(debug, "TestStore enqueue " << queue.getName());
         qpid::broker::amqp_0_10::MessageTransfer* msg =
             dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
         assert(msg);
@@ -173,44 +241,50 @@ class TestStore : public NullMessageStor
             *dump << endl << "  ";
             *dump << msg->getFrames().getContentSize() << endl;
         }
-
-        // Check the message for special instructions.
+        string logPrefix = "TestStore "+name+": ";
+        // Check the message for special instructions for this store.
         string data = msg->getFrames().getContent();
-        size_t i = string::npos;
-        size_t j = string::npos;
-        const vector<string>& throwMsg(options.throwMsg);
-        if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) {
-                throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
-        }
-        else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
-            && (i = data.find(name+"[")) != string::npos
-            && (j = data.find("]", i)) != string::npos)
-        {
-            size_t start = i+name.size()+1;
-            string action = data.substr(start, j-start);
+        Action action(data);
+        bool doComplete = true;
+        if (action.index && action.executeIn(name)) {
+            switch (action.index) {
+
+              case Action::THROW:
+                throw Exception(logPrefix + data);
+                break;
+
+              case Action::DELAY: {
+                  if (action.args.empty()) {
+                      QPID_LOG(error, logPrefix << "async-id needs argument: " << data);
+                      break;
+                  }
+                  asyncIds[action.args[0]] = msg;
+                  QPID_LOG(debug, logPrefix << "delayed completion " << action.args[0]);
+                  doComplete = false;
+                  break;
+              }
+
+              case Action::COMPLETE: {
+                  if (action.args.empty()) {
+                      QPID_LOG(error, logPrefix << "complete-id needs argument: " << data);
+                      break;
+                  }
+                  AsyncIds::iterator i = asyncIds.find(action.args[0]);
+                  if (i != asyncIds.end()) {
+                      i->second->enqueueComplete();
+                      QPID_LOG(debug, logPrefix << "completed " << action.args[0]);
+                      asyncIds.erase(i);
+                  } else {
+                      QPID_LOG(info, logPrefix << "not found for completion " << action.args[0]);
+                  }
+                  break;
+              }
 
-            if (action == EXCEPTION) {
-                throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
-            }
-            else if (action == EXIT_PROCESS) {
-                // FIXME aconway 2009-04-10: this is a dubious way to
-                // close the process at best, it can cause assertions or seg faults
-                // rather than clean exit.
-                QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data);
-                exit(0);
-            }
-            else if (strncmp(action.c_str(), ASYNC.c_str(), strlen(ASYNC.c_str())) == 0) {
-                std::string delayStr(action.substr(ASYNC.size()));
-                int delay = boost::lexical_cast<int>(delayStr);
-                threads.push_back(Thread(*new Completer(msg, delay)));
-            }
-            else {
-                QPID_LOG(error, "TestStore " << name << " unknown action " << action);
-                msg->enqueueComplete();
+              default:
+                QPID_LOG(error, logPrefix << "unknown action: " << data);
             }
         }
-        else
-            msg->enqueueComplete();
+        if (doComplete) msg->enqueueComplete();
     }
 
     void dequeue(TransactionContext* tx,
@@ -239,22 +313,19 @@ class TestStore : public NullMessageStor
 
 
   private:
-    static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+    typedef map<string, boost::intrusive_ptr<PersistableMessage> > AsyncIds;
+
     TestStoreOptions options;
     string name;
     Broker& broker;
     vector<Thread> threads;
     std::auto_ptr<ofstream> dump;
     std::auto_ptr<ofstream> events;
+    AsyncIds asyncIds;
 };
 
 int TestStore::TxContext::nextId(1);
 
-const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
-const string TestStore::EXCEPTION = "exception";
-const string TestStore::EXIT_PROCESS = "exit_process";
-const string TestStore::ASYNC="async ";
-
 struct TestStorePlugin : public Plugin {
 
     TestStoreOptions options;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org