You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/08/22 16:13:09 UTC
svn commit: r1619814 - in /qpid/trunk/qpid/cpp/src/tests: brokertest.py
ha_tests.py test_store.cpp
Author: aconway
Date: Fri Aug 22 14:13:09 2014
New Revision: 1619814
URL: http://svn.apache.org/r1619814
Log:
NO-JIRA: Clean up test_store.cpp async functionality.
Clean up test_store.cpp to allow control over async completion of messsages.
Modified:
qpid/trunk/qpid/cpp/src/tests/brokertest.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
qpid/trunk/qpid/cpp/src/tests/test_store.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1619814&r1=1619813&r2=1619814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Fri Aug 22 14:13:09 2014
@@ -402,7 +402,7 @@ class Broker(Popen):
def host_port(self): return "%s:%s" % (self.host(), self.port())
- def ready(self, timeout=30, **kwargs):
+ def ready(self, timeout=10, **kwargs):
"""Wait till broker is ready to serve clients"""
deadline = time.time()+timeout
while True:
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1619814&r1=1619813&r2=1619814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Aug 22 14:13:09 2014
@@ -1514,16 +1514,15 @@ class TransactionTests(HaBrokerTest):
self.assertRaises(Exception, commit_sync)
def test_tx_backup_fail(self):
- cluster = HaCluster(
- self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]])
+ cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]])
c = cluster[0].connect(protocol=self.tx_protocol)
tx = c.session(transactional=True)
s = tx.sender("q;{create:always,node:{durable:true}}")
- for m in ["foo","bang","bar"]: s.send(qm.Message(m, durable=True))
+ for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True))
self.assert_commit_raises(tx)
for b in cluster: b.assert_browse_backup("q", [])
- self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
- self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
+ self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
+ self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n")
def test_tx_join_leave(self):
"""Test cluster members joining/leaving cluster.
Modified: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_store.cpp?rev=1619814&r1=1619813&r2=1619814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_store.cpp Fri Aug 22 14:13:09 2014
@@ -21,15 +21,18 @@
/**@file
- * Plug-in message store for tests.
*
- * Add functionality as required, build up a comprehensive set of
- * features to support persistent behavior tests.
+ * Message store for tests, with two roles:
+ *
+ * 1. Dump store events to a text file that can be compared to expected event
+ * sequence
+ *
+ * 2. Emulate hard-to-recreate conditions such as asynchronous completion delays
+ * or store errors.
+ *
+ * Messages with specially formatted contents trigger various actions.
+ * See class Action below for available actions and message format..
*
- * Current features special "action" messages can:
- * - raise exception from enqueue.
- * - force host process to exit.
- * - do async completion after a delay.
*/
#include "qpid/broker/NullMessageStore.h"
@@ -58,12 +61,91 @@ using namespace qpid::sys;
namespace qpid {
namespace tests {
+namespace {
+
+bool startswith(const string& s, const string& prefix) {
+ return s.compare(0, prefix.size(), prefix) == 0;
+}
+
+void split(const string& s, vector<string>& result, const char* sep=" \t\n") {
+ size_t i = s.find_first_not_of(sep);
+ while (i != string::npos) {
+ size_t j = s.find_first_of(sep, i);
+ if (j == string::npos) {
+ result.push_back(s.substr(i));
+ break;
+ }
+ result.push_back(s.substr(i, j-i));
+ i = s.find_first_not_of(sep, j);
+ }
+}
+
+}
+
+/**
+ * Action message format is TEST_STORE_DO [<name>...]:<action> [<args>...]
+ *
+ * A list of store <name> can be included so the action only executes on one of
+ * the named stores. This is useful in a cluster setting where the same message
+ * is replicated to all broker's stores but should only trigger an action on
+ * specific ones. If no <name> is given, execute on any store.
+ *
+ */
+class Action {
+ public:
+ /** Available actions */
+ enum ActionEnum {
+ NONE,
+ THROW, ///< Throw an exception from enqueue
+ DELAY, ///< Delay completion, takes an ID string to complete.
+ COMPLETE, ///< Complete a previously delayed message, takes ID
+
+ N_ACTIONS // Count of actions, must be last
+ };
+
+ string name;
+ ActionEnum index;
+ vector<string> storeNames, args;
+
+ Action(const string& s) {
+ index = NONE;
+ if (!startswith(s, PREFIX)) return;
+ size_t colon = s.find_first_of(":");
+ if (colon == string::npos) return;
+ assert(colon >= PREFIX.size());
+ split(s.substr(PREFIX.size(), colon-PREFIX.size()), storeNames);
+ split(s.substr(colon+1), args);
+ if (args.empty()) return;
+ for (size_t i = 0; i < N_ACTIONS; ++i) {
+ if (args[0] == ACTION_NAMES[i]) {
+ name = args[0];
+ index = ActionEnum(i);
+ args.erase(args.begin());
+ break;
+ }
+ }
+ }
+
+ bool executeIn(const string& storeName) {
+ return storeNames.empty() ||
+ find(storeNames.begin(), storeNames.end(), storeName) !=storeNames.end();
+ }
+
+ private:
+ static string PREFIX;
+ static const char* ACTION_NAMES[N_ACTIONS];
+};
+
+string Action::PREFIX("TEST_STORE_DO");
+
+const char* Action::ACTION_NAMES[] = { "none", "throw", "delay", "complete" };
+
+
struct TestStoreOptions : public Options {
string name;
string dump;
string events;
- vector<string> throwMsg; // Throw exception if message content matches.
TestStoreOptions() : Options("Test Store Options") {
addOptions()
@@ -73,22 +155,10 @@ struct TestStoreOptions : public Options
"File to dump enqueued messages.")
("test-store-events", optValue(events, "FILE"),
"File to log events, 1 line per event.")
- ("test-store-throw", optValue(throwMsg, "CONTENT"),
- "Throw exception if message content matches.")
;
}
};
-struct Completer : public Runnable {
- boost::intrusive_ptr<PersistableMessage> message;
- int usecs;
- Completer(boost::intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {}
- void run() {
- qpid::sys::usleep(usecs);
- message->enqueueComplete();
- delete this;
- }
-};
class TestStore : public NullMessageStore {
public:
@@ -97,8 +167,7 @@ class TestStore : public NullMessageStor
{
QPID_LOG(info, "TestStore name=" << name
<< " dump=" << options.dump
- << " events=" << options.events
- << " throw messages =" << options.throwMsg.size());
+ << " events=" << options.events)
if (!options.dump.empty())
dump.reset(new ofstream(options.dump.c_str()));
@@ -154,7 +223,6 @@ class TestStore : public NullMessageStor
const boost::intrusive_ptr<PersistableMessage>& pmsg,
const PersistableQueue& queue)
{
- QPID_LOG(debug, "TestStore enqueue " << queue.getName());
qpid::broker::amqp_0_10::MessageTransfer* msg =
dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
assert(msg);
@@ -173,44 +241,50 @@ class TestStore : public NullMessageStor
*dump << endl << " ";
*dump << msg->getFrames().getContentSize() << endl;
}
-
- // Check the message for special instructions.
+ string logPrefix = "TestStore "+name+": ";
+ // Check the message for special instructions for this store.
string data = msg->getFrames().getContent();
- size_t i = string::npos;
- size_t j = string::npos;
- const vector<string>& throwMsg(options.throwMsg);
- if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) {
- throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
- }
- else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
- && (i = data.find(name+"[")) != string::npos
- && (j = data.find("]", i)) != string::npos)
- {
- size_t start = i+name.size()+1;
- string action = data.substr(start, j-start);
+ Action action(data);
+ bool doComplete = true;
+ if (action.index && action.executeIn(name)) {
+ switch (action.index) {
+
+ case Action::THROW:
+ throw Exception(logPrefix + data);
+ break;
+
+ case Action::DELAY: {
+ if (action.args.empty()) {
+ QPID_LOG(error, logPrefix << "async-id needs argument: " << data);
+ break;
+ }
+ asyncIds[action.args[0]] = msg;
+ QPID_LOG(debug, logPrefix << "delayed completion " << action.args[0]);
+ doComplete = false;
+ break;
+ }
+
+ case Action::COMPLETE: {
+ if (action.args.empty()) {
+ QPID_LOG(error, logPrefix << "complete-id needs argument: " << data);
+ break;
+ }
+ AsyncIds::iterator i = asyncIds.find(action.args[0]);
+ if (i != asyncIds.end()) {
+ i->second->enqueueComplete();
+ QPID_LOG(debug, logPrefix << "completed " << action.args[0]);
+ asyncIds.erase(i);
+ } else {
+ QPID_LOG(info, logPrefix << "not found for completion " << action.args[0]);
+ }
+ break;
+ }
- if (action == EXCEPTION) {
- throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
- }
- else if (action == EXIT_PROCESS) {
- // FIXME aconway 2009-04-10: this is a dubious way to
- // close the process at best, it can cause assertions or seg faults
- // rather than clean exit.
- QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data);
- exit(0);
- }
- else if (strncmp(action.c_str(), ASYNC.c_str(), strlen(ASYNC.c_str())) == 0) {
- std::string delayStr(action.substr(ASYNC.size()));
- int delay = boost::lexical_cast<int>(delayStr);
- threads.push_back(Thread(*new Completer(msg, delay)));
- }
- else {
- QPID_LOG(error, "TestStore " << name << " unknown action " << action);
- msg->enqueueComplete();
+ default:
+ QPID_LOG(error, logPrefix << "unknown action: " << data);
}
}
- else
- msg->enqueueComplete();
+ if (doComplete) msg->enqueueComplete();
}
void dequeue(TransactionContext* tx,
@@ -239,22 +313,19 @@ class TestStore : public NullMessageStor
private:
- static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+ typedef map<string, boost::intrusive_ptr<PersistableMessage> > AsyncIds;
+
TestStoreOptions options;
string name;
Broker& broker;
vector<Thread> threads;
std::auto_ptr<ofstream> dump;
std::auto_ptr<ofstream> events;
+ AsyncIds asyncIds;
};
int TestStore::TxContext::nextId(1);
-const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
-const string TestStore::EXCEPTION = "exception";
-const string TestStore::EXIT_PROCESS = "exit_process";
-const string TestStore::ASYNC="async ";
-
struct TestStorePlugin : public Plugin {
TestStoreOptions options;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org