You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/05 14:52:10 UTC
svn commit: r821779 [8/11] - in /qpid/branches/java-broker-0-10/qpid: ./
cpp/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/python/qmf/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/
cpp/examples/messaging/ cpp/include/qmf/ cpp/include/qp...
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/datagen.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/datagen.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/datagen.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/datagen.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,10 @@
#include <time.h>
#include "qpid/Options.h"
-struct Args : public qpid::Options
+namespace qpid {
+namespace tests {
+
+struct Args : public qpid::Options
{
uint count;
uint minSize;
@@ -34,12 +37,12 @@
uint maxChar;
bool help;
- Args() : qpid::Options("Random data generator"),
- count(1), minSize(8), maxSize(4096),
+ Args() : qpid::Options("Random data generator"),
+ count(1), minSize(8), maxSize(4096),
minChar(32), maxChar(126),//safely printable ascii chars
help(false)
{
- addOptions()
+ addOptions()
("count", qpid::optValue(count, "N"), "number of data strings to generate")
("min-size", qpid::optValue(minSize, "N"), "minimum size of data string")
("max-size", qpid::optValue(maxSize, "N"), "maximum size of data string")
@@ -81,6 +84,10 @@
return data;
}
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char** argv)
{
Args opts;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/echotest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/echotest.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/echotest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/echotest.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,6 +33,9 @@
using namespace qpid::sys;
using namespace std;
+namespace qpid {
+namespace tests {
+
struct Args : public qpid::Options,
public qpid::client::ConnectionSettings
{
@@ -48,7 +51,7 @@
("help", optValue(help), "Print this usage statement")
("count", optValue(count, "N"), "Number of messages to send")
("size", optValue(count, "N"), "Size of messages")
- ("broker,b", optValue(host, "HOST"), "Broker host to connect to")
+ ("broker,b", optValue(host, "HOST"), "Broker host to connect to")
("port,p", optValue(port, "PORT"), "Broker port to connect to")
("username", optValue(username, "USER"), "user name for broker log in.")
("password", optValue(password, "PASSWORD"), "password for broker log in.")
@@ -75,7 +78,7 @@
Message request;
double total, min, max;
bool summary;
-
+
public:
Listener(Session& session, uint limit, bool summary);
void start(uint size);
@@ -92,7 +95,7 @@
{
session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true);
request.getDeliveryProperties().setRoutingKey(queue);
- subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
+ subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
request.getDeliveryProperties().setTimestamp(current_time());
if (size) request.setData(std::string(size, 'X'));
@@ -100,7 +103,7 @@
subscriptions.run();
}
-void Listener::received(Message& response)
+void Listener::received(Message& response)
{
//extract timestamp and compute latency:
uint64_t sentAt = response.getDeliveryProperties().getTimestamp();
@@ -122,7 +125,11 @@
}
}
-int main(int argc, char** argv)
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char** argv)
{
Args opts;
opts.parse(argc, argv);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/exception_test.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/exception_test.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/exception_test.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,6 +28,9 @@
#include "qpid/sys/Thread.h"
#include "qpid/framing/reply_exceptions.h"
+namespace qpid {
+namespace tests {
+
QPID_AUTO_TEST_SUITE(exception_test)
// FIXME aconway 2008-06-12: need to update our exception handling to
@@ -49,12 +52,12 @@
function<void ()> f;
bool caught;
Thread thread;
-
+
Catcher(function<void ()> f_) : f(f_), caught(false), thread(this) {}
~Catcher() { join(); }
-
+
void run() {
- try {
+ try {
ScopedSuppressLogging sl; // Suppress messages for expected errors.
f();
}
@@ -110,7 +113,7 @@
Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs)));
fix.connection.proxy.close();
runner.join();
- BOOST_CHECK_THROW(fix.session.close(), TransportFailure);
+ BOOST_CHECK_THROW(fix.session.close(), TransportFailure);
}
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
@@ -120,3 +123,5 @@
}
QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/failover_soak.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/failover_soak.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/failover_soak.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/failover_soak.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -52,7 +52,8 @@
using namespace qpid::client;
-
+namespace qpid {
+namespace tests {
typedef vector<ForkedBroker *> brokerVector;
@@ -91,9 +92,9 @@
struct child
{
- child ( string & name, pid_t pid, childType type )
+ child ( string & name, pid_t pid, childType type )
: name(name), pid(pid), retval(-999), status(RUNNING), type(type)
- {
+ {
gettimeofday ( & startTime, 0 );
}
@@ -108,7 +109,7 @@
void
- setType ( childType t )
+ setType ( childType t )
{
type = t;
}
@@ -127,7 +128,7 @@
struct children : public vector<child *>
-{
+{
void
add ( string & name, pid_t pid, childType type )
@@ -136,7 +137,7 @@
}
- child *
+ child *
get ( pid_t pid )
{
vector<child *>::iterator i;
@@ -156,7 +157,7 @@
{
if ( verbosity > 1 )
{
- cerr << "children::exited warning: Can't find child with pid "
+ cerr << "children::exited warning: Can't find child with pid "
<< pid
<< endl;
}
@@ -193,7 +194,7 @@
<< endl;
return (*i)->retval;
}
-
+
return 0;
}
@@ -227,11 +228,11 @@
children allMyChildren;
-void
-childExit ( int )
+void
+childExit ( int )
{
- int childReturnCode;
- pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG);
+ int childReturnCode;
+ pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG);
if ( pid > 0 )
allMyChildren.exited ( pid, childReturnCode );
@@ -271,10 +272,10 @@
{
cout << "Broker List ------------ size: " << brokers.size() << "\n";
for ( brokerVector::iterator i = brokers.begin(); i != brokers.end(); ++ i) {
- cout << "pid: "
- << (*i)->getPID()
- << " port: "
- << (*i)->getPort()
+ cout << "pid: "
+ << (*i)->getPID()
+ << " port: "
+ << (*i)->getPort()
<< endl;
}
cout << "end Broker List ------------\n";
@@ -294,7 +295,7 @@
if ( ! newbie )
return true;
- try
+ try
{
Connection connection;
connection.open ( "127.0.0.1", newbie_port );
@@ -304,8 +305,8 @@
}
catch ( const std::exception& error )
{
- std::cerr << "wait_for_newbie error: "
- << error.what()
+ std::cerr << "wait_for_newbie error: "
+ << error.what()
<< endl;
return false;
}
@@ -321,7 +322,7 @@
char const * moduleOrDir,
string const clusterName,
int verbosity,
- int durable )
+ int durable )
{
static int brokerId = 0;
stringstream path, prefix;
@@ -354,8 +355,8 @@
ForkedBroker * broker = newbie;
if ( verbosity > 0 )
- std::cerr << "new broker created: pid == "
- << broker->getPID()
+ std::cerr << "new broker created: pid == "
+ << broker->getPID()
<< " log-prefix == "
<< "soak-" << brokerId
<< endl;
@@ -382,8 +383,8 @@
catch ( const exception& error ) {
if ( verbosity > 0 )
{
- cout << "error killing broker: "
- << error.what()
+ cout << "error killing broker: "
+ << error.what()
<< endl;
}
@@ -399,14 +400,14 @@
/*
- * The optional delay is to avoid killing newbie brokers that have just
+ * The optional delay is to avoid killing newbie brokers that have just
* been added and are still in the process of updating. This causes
* spurious, test-generated errors that scare everybody.
*/
void
killAllBrokers ( brokerVector & brokers, int delay )
{
- if ( delay > 0 )
+ if ( delay > 0 )
{
std::cerr << "Killing all brokers after delay of " << delay << endl;
sleep ( delay );
@@ -414,8 +415,8 @@
for ( uint i = 0; i < brokers.size(); ++ i )
try { brokers[i]->kill(9); }
- catch ( const exception& error )
- {
+ catch ( const exception& error )
+ {
std::cerr << "killAllBrokers Warning: exception during kill on broker "
<< i
<< " "
@@ -429,21 +430,21 @@
pid_t
-runDeclareQueuesClient ( brokerVector brokers,
+runDeclareQueuesClient ( brokerVector brokers,
char const * host,
char const * path,
int verbosity,
int durable
- )
+ )
{
string name("declareQueues");
int port = brokers[0]->getPort ( );
if ( verbosity > 1 )
- cout << "startDeclareQueuesClient: host: "
- << host
- << " port: "
- << port
+ cout << "startDeclareQueuesClient: host: "
+ << host
+ << " port: "
+ << port
<< endl;
stringstream portSs;
portSs << port;
@@ -474,12 +475,12 @@
pid_t
-startReceivingClient ( brokerVector brokers,
+startReceivingClient ( brokerVector brokers,
char const * host,
char const * receiverPath,
char const * reportFrequency,
int verbosity
- )
+ )
{
string name("receiver");
int port = brokers[0]->getPort ( );
@@ -521,14 +522,14 @@
pid_t
-startSendingClient ( brokerVector brokers,
+startSendingClient ( brokerVector brokers,
char const * host,
char const * senderPath,
char const * nMessages,
char const * reportFrequency,
int verbosity,
int durability
- )
+ )
{
string name("sender");
int port = brokers[0]->getPort ( );
@@ -581,13 +582,14 @@
#define HANGING 7
#define ERROR_KILLING_BROKER 8
+}} // namespace qpid::tests
-// If you want durability, use the "dir" option of "moduleOrDir" .
-
+using namespace qpid::tests;
+// If you want durability, use the "dir" option of "moduleOrDir" .
int
-main ( int argc, char const ** argv )
-{
+main ( int argc, char const ** argv )
+{
if ( argc != 9 ) {
cerr << "Usage: "
<< argv[0]
@@ -627,10 +629,10 @@
int nBrokers = 3;
for ( int i = 0; i < nBrokers; ++ i ) {
startNewBroker ( brokers,
- moduleOrDir,
+ moduleOrDir,
clusterName,
verbosity,
- durable );
+ durable );
}
@@ -639,7 +641,7 @@
// Run the declareQueues child.
int childStatus;
- pid_t dqClientPid =
+ pid_t dqClientPid =
runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity, durable );
if ( -1 == dqClientPid ) {
cerr << "END_OF_TEST ERROR_START_DECLARE_1\n";
@@ -658,8 +660,8 @@
// Start the receiving client.
pid_t receivingClientPid =
- startReceivingClient ( brokers,
- host,
+ startReceivingClient ( brokers,
+ host,
receiverPath,
reportFrequency,
verbosity );
@@ -670,10 +672,10 @@
// Start the sending client.
- pid_t sendingClientPid =
- startSendingClient ( brokers,
- host,
- senderPath,
+ pid_t sendingClientPid =
+ startSendingClient ( brokers,
+ host,
+ senderPath,
nMessages,
reportFrequency,
verbosity,
@@ -688,10 +690,10 @@
maxSleep = 4;
- for ( int totalBrokers = 3;
- totalBrokers < maxBrokers;
- ++ totalBrokers
- )
+ for ( int totalBrokers = 3;
+ totalBrokers < maxBrokers;
+ ++ totalBrokers
+ )
{
if ( verbosity > 0 )
cout << totalBrokers << " brokers have been added to the cluster.\n\n\n";
@@ -722,14 +724,14 @@
cout << "Starting new broker.\n\n";
startNewBroker ( brokers,
- moduleOrDir,
+ moduleOrDir,
clusterName,
verbosity,
- durable );
-
+ durable );
+
if ( verbosity > 1 )
printBrokers ( brokers );
-
+
// If all children have exited, quit.
int unfinished = allMyChildren.unfinished();
if ( ! unfinished ) {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/federation.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/federation.py (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/federation.py Mon Oct 5 12:51:57 2009
@@ -503,6 +503,20 @@
self.assertEqual(result.status, 0)
result = bridge2.close()
self.assertEqual(result.status, 0)
+
+ # extra check: verify we don't leak bridge objects - keep the link
+ # around and verify the bridge count has gone to zero
+
+ attempts = 0
+ bridgeCount = len(qmf.getObjects(_class="bridge"))
+ while bridgeCount > 0:
+ attempts += 1
+ if attempts >= 5:
+ self.fail("Bridges didn't clean up")
+ return
+ sleep(1)
+ bridgeCount = len(qmf.getObjects(_class="bridge"))
+
result = link.close()
self.assertEqual(result.status, 0)
@@ -559,8 +573,13 @@
result = bridge.close()
self.assertEqual(result.status, 0)
- result = bridge2.close()
- self.assertEqual(result.status, 0)
+
+ # Extra test: don't explicitly close() bridge2. When the link is closed,
+ # it should clean up bridge2 automagically. verify_cleanup() will detect
+ # if bridge2 isn't cleaned up and will fail the test.
+ #
+ #result = bridge2.close()
+ #self.assertEqual(result.status, 0)
result = link.close()
self.assertEqual(result.status, 0)
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/latencytest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/latencytest.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/latencytest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/latencytest.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,6 +40,9 @@
using namespace qpid::sys;
using std::string;
+namespace qpid {
+namespace tests {
+
typedef std::vector<std::string> StringSet;
struct Args : public qpid::TestOptions {
@@ -64,7 +67,7 @@
durable(false), base("latency-test"), singleConnect(false)
{
- addOptions()
+ addOptions()
("size", optValue(size, "N"), "message size")
("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setups, will create another publisher,\
@@ -73,9 +76,9 @@
("count", optValue(count, "N"), "number of messages to send")
("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
("sync", optValue(sync), "send messages synchronously")
- ("report-frequency", optValue(reportFrequency, "N"),
+ ("report-frequency", optValue(reportFrequency, "N"),
"number of milliseconds to wait between reports (ignored unless rate specified)")
- ("time-limit", optValue(timeLimit, "N"),
+ ("time-limit", optValue(timeLimit, "N"),
"test duration, in seconds")
("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)")
("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)")
@@ -98,7 +101,7 @@
return t;
}
-struct Stats
+struct Stats
{
Mutex lock;
uint count;
@@ -132,7 +135,7 @@
};
class Receiver : public Client, public MessageListener
-{
+{
SubscriptionManager mgr;
uint count;
Stats& stats;
@@ -168,7 +171,7 @@
Receiver receiver;
Sender sender;
AbsTime begin;
-
+
public:
Test(const string& q) : queue(q), receiver(queue, stats), sender(queue, receiver), begin(now()) {}
void start();
@@ -186,7 +189,7 @@
connection = &localConnection;
opts.open(localConnection);
}
- session = connection->newSession();
+ session = connection->newSession();
}
void Client::start()
@@ -235,7 +238,7 @@
settings.acceptMode = ACCEPT_MODE_NONE;
settings.flowControl = FlowControl::unlimited();
}
- mgr.subscribe(*this, queue, settings);
+ mgr.subscribe(*this, queue, settings);
}
void Receiver::test()
@@ -283,7 +286,7 @@
if (!opts.csv) {
if (count) {
std::cout << "Latency(ms): min=" << minLatency << ", max=" <<
- maxLatency << ", avg=" << aux_avg;
+ maxLatency << ", avg=" << aux_avg;
} else {
std::cout << "Stalled: no samples for interval";
}
@@ -368,7 +371,7 @@
Duration delay(sentAt, waitTill);
if (delay < 0)
++missedRate;
- else
+ else
sys::usleep(delay / TIME_USEC);
if (timeLimit != 0 && Duration(start, now()) > timeLimit) {
session.sync();
@@ -382,7 +385,7 @@
{
if (size < chars.length()) {
return chars.substr(0, size);
- }
+ }
std::string data;
for (uint i = 0; i < (size / chars.length()); i++) {
data += chars;
@@ -392,35 +395,39 @@
}
-void Test::start()
-{
- receiver.start();
+void Test::start()
+{
+ receiver.start();
begin = AbsTime(now());
- sender.start();
+ sender.start();
}
-void Test::join()
-{
- sender.join();
- receiver.join();
+void Test::join()
+{
+ sender.join();
+ receiver.join();
AbsTime end = now();
Duration time(begin, end);
double msecs(time / TIME_MSEC);
if (!opts.csv) {
- std::cout << "Sent " << receiver.getCount() << " msgs through " << queue
+ std::cout << "Sent " << receiver.getCount() << " msgs through " << queue
<< " in " << msecs << "ms (" << (receiver.getCount() * 1000 / msecs) << " msgs/s) ";
}
stats.print();
std::cout << std::endl;
}
-void Test::report()
-{
+void Test::report()
+{
stats.print();
std::cout << std::endl;
stats.reset();
}
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char** argv)
{
try {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/logging.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/logging.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/logging.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/logging.cpp Mon Oct 5 12:51:57 2009
@@ -37,6 +37,9 @@
#include <time.h>
+namespace qpid {
+namespace tests {
+
QPID_AUTO_TEST_SUITE(loggingTestSuite)
using namespace std;
@@ -106,7 +109,7 @@
TestOutput(Logger& l) {
l.output(std::auto_ptr<Logger::Output>(this));
}
-
+
void log(const Statement& s, const string& m) {
msg.push_back(m);
stmt.push_back(s);
@@ -117,7 +120,7 @@
using boost::assign::list_of;
QPID_AUTO_TEST_CASE(testLoggerOutput) {
- Logger l;
+ Logger l;
l.clear();
l.select(Selector(debug));
Statement s=QPID_LOG_STATEMENT_INIT(debug);
@@ -174,7 +177,7 @@
l.format(Logger::FUNCTION);
QPID_LOG(critical, "foo");
BOOST_CHECK_EQUAL(string(BOOST_CURRENT_FUNCTION) + ": foo\n", out->last());
-
+
l.format(Logger::LEVEL);
QPID_LOG(critical, "foo");
BOOST_CHECK_EQUAL("critical foo\n", out->last());
@@ -228,12 +231,12 @@
// Overhead test disabled because it consumes a ton of CPU and takes
// forever under valgrind. Not friendly for regular test runs.
-//
+//
#if 0
QPID_AUTO_TEST_CASE(testOverhead) {
// Ensure that the ratio of CPU time for an incrementing loop
// with and without disabled log statements is in acceptable limits.
- //
+ //
int times=100000000;
clock_t noLog=timeLoop(times, count);
clock_t withLog=timeLoop(times, loggedCount);
@@ -242,9 +245,9 @@
// NB: in initial tests the ratio was consistently below 1.5,
// 2.5 is reasonable and should avoid spurios failures
// due to machine load.
- //
- BOOST_CHECK_SMALL(ratio, 2.5);
-}
+ //
+ BOOST_CHECK_SMALL(ratio, 2.5);
+}
#endif // 0
Statement statement(
@@ -290,7 +293,7 @@
}
QPID_AUTO_TEST_CASE(testOptionsDefault) {
- Options opts("");
+ qpid::log::Options opts("");
#ifdef _WIN32
qpid::log::windows::SinkOptions sinks("test");
#else
@@ -328,10 +331,10 @@
QPID_AUTO_TEST_CASE(testLoggerStateure) {
Logger& l=Logger::instance();
ScopedSuppressLogging ls(l);
- Options opts("test");
+ qpid::log::Options opts("test");
const char* argv[]={
0,
- "--log-time", "no",
+ "--log-time", "no",
"--log-source", "yes",
"--log-to-stderr", "no",
"--log-to-file", "logging.tmp",
@@ -352,7 +355,7 @@
QPID_AUTO_TEST_CASE(testQuoteNonPrintable) {
Logger& l=Logger::instance();
ScopedSuppressLogging ls(l);
- Options opts("test");
+ qpid::log::Options opts("test");
opts.time=false;
#ifdef _WIN32
qpid::log::windows::SinkOptions *sinks =
@@ -367,7 +370,7 @@
char s[] = "null\0tab\tspace newline\nret\r\x80\x99\xff";
string str(s, sizeof(s));
- QPID_LOG(critical, str);
+ QPID_LOG(critical, str);
ifstream log("logging.tmp");
string line;
getline(log, line, '\0');
@@ -378,3 +381,5 @@
}
QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/perftest.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/perftest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/perftest.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,6 +49,9 @@
using boost::lexical_cast;
using boost::bind;
+namespace qpid {
+namespace tests {
+
enum Mode { SHARED, FANOUT, TOPIC };
const char* modeNames[] = { "shared", "fanout", "topic" };
@@ -105,9 +108,9 @@
bool commitAsync;
static const std::string helpText;
-
+
Opts() :
- TestOptions(helpText),
+ TestOptions(helpText),
setup(false), control(false), publish(false), subscribe(false), baseName("perftest"),
pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
subs(1), ack(0),
@@ -136,16 +139,16 @@
("nsubs", optValue(subs, "N"), "Create N subscribers.")
("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n"
"N==0: Subscriber uses unconfirmed mode")
-
+
("qt", optValue(qt, "N"), "Create N queues or topics.")
("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.")
-
+
("iterations", optValue(iterations, "N"), "Desired number of iterations of the test.")
("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec")
("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'")
("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'")
- ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics")
+ ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics")
("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)")
("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
@@ -171,7 +174,7 @@
count += subs - (count % subs);
cout << "WARNING: Adjusted --count to " << count
<< " the nearest multiple of --nsubs" << endl;
- }
+ }
totalPubs = pubs*qt;
totalSubs = subs*qt;
subQuota = (pubs*count)/subs;
@@ -258,7 +261,7 @@
};
struct Setup : public Client {
-
+
void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) {
session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings);
session.queuePurge(arg::queue=name);
@@ -278,7 +281,7 @@
for (size_t i = 0; i < opts.qt; ++i) {
ostringstream qname;
qname << opts.baseName << i;
- queueInit(qname.str(), opts.durable || opts.queueDurable, settings);
+ queueInit(qname.str(), opts.durable || opts.queueDurable, settings);
}
}
}
@@ -303,7 +306,7 @@
public:
Stats() : sum(0) {}
-
+
// Functor to collect rates.
void operator()(const string& data) {
try {
@@ -314,7 +317,7 @@
throw Exception("Bad report: "+data);
}
}
-
+
double mean() const {
return sum/values.size();
}
@@ -331,7 +334,7 @@
}
return sqrt(ssq/(values.size()-1));
}
-
+
ostream& print(ostream& out) {
ostream_iterator<double> o(out, "\n");
copy(values.begin(), values.end(), o);
@@ -341,11 +344,11 @@
return out << endl;
}
};
-
+
// Manage control queues, collect and print reports.
struct Controller : public Client {
-
+
SubscriptionManager subs;
Controller() : subs(session) {}
@@ -354,7 +357,7 @@
void process(size_t n, string queue,
boost::function<void (const string&)> msgFn)
{
- if (!opts.summary)
+ if (!opts.summary)
cout << "Processing " << n << " messages from "
<< queue << " " << flush;
LocalQueue lq;
@@ -370,8 +373,8 @@
void process(size_t n, LocalQueue lq, string queue,
boost::function<void (const string&)> msgFn)
{
- session.messageFlow(queue, 0, n);
- if (!opts.summary)
+ session.messageFlow(queue, 0, n);
+ if (!opts.summary)
cout << "Processing " << n << " messages from "
<< queue << " " << flush;
for (size_t i = 0; i < n; ++i) {
@@ -386,7 +389,7 @@
cout << "Sending " << data << " " << n << " times to " << queue
<< endl;
Message msg(data, queue);
- for (size_t i = 0; i < n; ++i)
+ for (size_t i = 0; i < n; ++i)
session.messageTransfer(arg::content=msg, arg::acceptMode=1);
}
@@ -419,7 +422,7 @@
process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates));
process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates));
- AbsTime end=now();
+ AbsTime end=now();
double time=secs(start, end);
double txrate=opts.transfers/time;
@@ -469,12 +472,12 @@
string routingKey;
PublishThread() {};
-
+
PublishThread(string key, string dest=string()) {
destination=dest;
routingKey=key;
}
-
+
void run() { // Publisher
try {
string data;
@@ -492,7 +495,7 @@
}
} else {
size_t msgSize=max(opts.size, sizeof(size_t));
- data = string(msgSize, 'X');
+ data = string(msgSize, 'X');
}
Message msg(data, routingKey);
@@ -500,21 +503,21 @@
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
- if (opts.txPub){
+ if (opts.txPub){
session.txSelect();
}
SubscriptionManager subs(session);
LocalQueue lq;
- subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
- subs.subscribe(lq, fqn("pub_start"));
-
+ subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
+ subs.subscribe(lq, fqn("pub_start"));
+
for (size_t j = 0; j < opts.iterations; ++j) {
expect(lq.pop().getData(), "start");
AbsTime start=now();
for (size_t i=0; i<opts.count; i++) {
// Stamp the iteration into the message data, avoid
// any heap allocation.
- const_cast<std::string&>(msg.getData()).replace(offset, sizeof(size_t),
+ const_cast<std::string&>(msg.getData()).replace(offset, sizeof(size_t),
reinterpret_cast<const char*>(&i), sizeof(size_t));
if (opts.syncPub) {
sync(session).messageTransfer(
@@ -540,7 +543,7 @@
if (opts.confirm) session.sync();
AbsTime end=now();
double time=secs(start,end);
-
+
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time), fqn("pub_done"));
session.messageTransfer(arg::content=report, arg::acceptMode=1);
@@ -561,7 +564,7 @@
string queue;
SubscribeThread() {}
-
+
SubscribeThread(string q) { queue = q; }
SubscribeThread(string key, string ex) {
@@ -586,7 +589,7 @@
}
void run() { // Subscribe
- try {
+ try {
if (opts.txSub) sync(session).txSelect();
SubscriptionManager subs(session);
SubscriptionSettings settings;
@@ -606,15 +609,15 @@
if (opts.iterations > 1) {
subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0)));
}
-
+
for (size_t j = 0; j < opts.iterations; ++j) {
if (j > 0) {
//need to wait here until all subs are done
- session.messageFlow(fqn("sub_iteration"), 0, 1);
+ session.messageFlow(fqn("sub_iteration"), 0, 1);
iterationControl.pop();
//need to allocate some more credit for subscription
- session.messageFlow(queue, 0, opts.subQuota);
+ session.messageFlow(queue, 0, opts.subQuota);
}
Message msg;
AbsTime start=now();
@@ -627,7 +630,7 @@
}
if (opts.intervalSub)
qpid::sys::usleep(opts.intervalSub*1000);
- // TODO aconway 2007-11-23: check message order for.
+ // TODO aconway 2007-11-23: check message order for.
// multiple publishers. Need an array of counters,
// one per publisher and a publisher ID in the
// message. Careful not to introduce a lot of overhead
@@ -664,6 +667,10 @@
}
};
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char** argv) {
int exitCode = 0;
boost::ptr_vector<Client> subs(opts.subs);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/publish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/publish.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/publish.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/publish.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,6 +36,9 @@
using namespace qpid::sys;
using namespace std;
+namespace qpid {
+namespace tests {
+
typedef vector<string> StringSet;
struct Args : public qpid::TestOptions {
@@ -61,12 +64,12 @@
Args opts;
-struct Client
+struct Client
{
Connection connection;
AsyncSession session;
- Client()
+ Client()
{
opts.open(connection);
session = connection.newSession();
@@ -75,7 +78,7 @@
// Cheap hex calculation, avoid expensive ostrstream and string
// creation to generate correlation ids in message loop.
char hex(char i) { return i<10 ? '0'+i : 'A'+i-10; }
- void hex(char i, string& s) {
+ void hex(char i, string& s) {
s[0]=hex(i>>24); s[1]=hex(i>>16); s[2]=hex(i>>8); s[3]=i;
}
@@ -86,7 +89,7 @@
string correlationId = "0000";
if (opts.durable)
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
-
+
for (uint i = 0; i < opts.count; i++) {
if (opts.id) {
hex(i+1, correlationId);
@@ -103,7 +106,7 @@
else cout << "Time: " << secs << "s Rate: " << opts.count/secs << endl;
}
- ~Client()
+ ~Client()
{
try{
session.close();
@@ -114,6 +117,10 @@
}
};
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char** argv)
{
try {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qpid_ping.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qpid_ping.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qpid_ping.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qpid_ping.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,6 +38,9 @@
using namespace qpid::client;
using namespace qpid;
+namespace qpid {
+namespace tests {
+
struct PingOptions : public qpid::TestOptions {
int timeout; // Timeout in seconds.
bool quiet; // No output
@@ -58,7 +61,7 @@
public:
Ping() : status(WAITING) {}
-
+
void run() {
try {
opts.open(connection);
@@ -100,6 +103,9 @@
}
};
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
int main(int argc, char** argv) {
try {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qrsh.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qrsh.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qrsh.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qrsh.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,11 +37,13 @@
using namespace std;
+namespace qpid {
+namespace tests {
class ResponseListener : public MessageListener
{
public :
-
+
int exitCode;
ResponseListener ( SubscriptionManager & subscriptions )
@@ -50,7 +52,7 @@
{
}
- virtual void
+ virtual void
received ( Message & message )
{
char first_word[1000];
@@ -66,9 +68,9 @@
if ( ! strcmp ( first_word, "get_response" ) )
{
// The remainder of the message is the file we requested.
- fprintf ( stdout,
- "%s",
- message.getData().c_str() + strlen("get_response" )
+ fprintf ( stdout,
+ "%s",
+ message.getData().c_str() + strlen("get_response" )
);
subscriptions.cancel(message.getDestination());
}
@@ -76,12 +78,13 @@
private :
-
+
SubscriptionManager & subscriptions;
};
+}} // namespace qpid::tests
-
+using namespace qpid::tests;
/*
* argv[1] host
@@ -90,8 +93,8 @@
* argv[4] command name
* argv[5..N] args to the command
*/
-int
-main ( int argc, char ** argv )
+int
+main ( int argc, char ** argv )
{
const char* host = argv[1];
int port = atoi(argv[2]);
@@ -99,14 +102,14 @@
Connection connection;
- try
+ try
{
connection.open ( host, port );
Session session = connection.newSession ( );
// Make a queue and bind it to fanout.
string myQueue = session.getId().getName();
-
+
session.queueDeclare ( arg::queue=myQueue,
arg::exclusive=true,
arg::autoDelete=true
@@ -136,7 +139,7 @@
response_command = true;
// Send the payload message.
- // Skip "qrsh host_name port"
+ // Skip "qrsh host_name port"
Message message;
stringstream ss;
for ( int i = 3; i < argc; ++ i )
@@ -144,7 +147,7 @@
message.setData ( ss.str() );
- session.messageTransfer(arg::content=message,
+ session.messageTransfer(arg::content=message,
arg::destination="amq.fanout");
if ( response_command )
@@ -153,8 +156,8 @@
session.close();
connection.close();
return responseListener.exitCode;
- }
- catch ( exception const & e)
+ }
+ catch ( exception const & e)
{
cerr << e.what() << endl;
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qrsh_server.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qrsh_server.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qrsh_server.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/qrsh_server.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -43,7 +43,8 @@
using namespace std;
-
+namespace qpid {
+namespace tests {
int
mrand ( int max_desired_val )
@@ -54,7 +55,7 @@
-char *
+char *
file2str ( char const * file_name )
{
FILE * fp = fopen ( file_name, "r" );
@@ -71,9 +72,9 @@
if ( ! content )
{
- fprintf ( stderr,
- "file2str error: can't malloc %d bytes.\n",
- (int)file_len
+ fprintf ( stderr,
+ "file2str error: can't malloc %d bytes.\n",
+ (int)file_len
);
return 0;
}
@@ -123,9 +124,9 @@
bool myMessage ( Message const & message );
/* ----------------------------------------------
- * Special Commands
+ * Special Commands
* These are commands that the qrsh_server executes
- * directly, rather than through a child process
+ * directly, rather than through a child process
* instance of qrsh_run.
*/
void runCommand ( Message const & message );
@@ -157,9 +158,9 @@
char const * skipWord ( char const * s );
- void string_replaceAll ( string & str,
- string & target,
- string & replacement
+ void string_replaceAll ( string & str,
+ string & target,
+ string & replacement
);
@@ -186,12 +187,12 @@
-QrshServer::QrshServer ( SubscriptionManager & subs,
+QrshServer::QrshServer ( SubscriptionManager & subs,
char const * name,
char const * qrsh_run_path,
char const * host,
int port
- )
+ )
: subscriptions ( subs ),
name ( name ),
qrsh_run_path ( qrsh_run_path ),
@@ -202,11 +203,11 @@
{
data_dir << "/tmp/qrsh_"
<< getpid();
-
+
if(mkdir ( data_dir.str().c_str(), 0777 ) )
{
- fprintf ( stderr,
- "QrshServer::QrshServer error: can't mkdir |%s|\n",
+ fprintf ( stderr,
+ "QrshServer::QrshServer error: can't mkdir |%s|\n",
data_dir.str().c_str()
);
exit ( 1 );
@@ -239,21 +240,21 @@
<< name;
send ( announcement_data.str() );
-
+
saidHello = true;
}
-void
+void
QrshServer::send ( string const & content )
{
try
{
Message message;
message.setData ( content );
-
+
Connection connection;
connection.open ( host, port );
Session session = connection.newSession ( );
@@ -289,7 +290,7 @@
-void
+void
QrshServer::sayName ( )
{
fprintf ( stderr, "My name is: |%s|\n", name.c_str() );
@@ -343,7 +344,7 @@
break;
}
}
-
+
if ( i_win && (ties <= 0) )
{
myStraw = 0;
@@ -364,10 +365,10 @@
/*
* "APB" command (all-points-bullitens (commands that are not addressed
* specifically to any server)) are handled directly, here.
- * Because if I return simply "true", the normal command processing code
+ * Because if I return simply "true", the normal command processing code
* will misinterpret the command.
*/
-bool
+bool
QrshServer::myMessage ( Message const & message )
{
int const maxlen = 100;
@@ -414,7 +415,7 @@
{
return true;
}
- else
+ else
if ( ! strcmp ( first_word, "any" ) )
{
straws.clear();
@@ -443,7 +444,7 @@
-void
+void
QrshServer::addAlias ( Message const & message )
{
char alias[1000];
@@ -463,8 +464,8 @@
if ( ! dir )
{
- fprintf ( stderr,
- "QrshServer::getNames error: could not open dir |%s|.\n",
+ fprintf ( stderr,
+ "QrshServer::getNames error: could not open dir |%s|.\n",
data_dir.str().c_str()
);
return;
@@ -491,8 +492,8 @@
}
else
{
- /*
- * Fail silently. The non-existence of this file
+ /*
+ * Fail silently. The non-existence of this file
* is not necessarily an error.
*/
}
@@ -504,9 +505,9 @@
void
-QrshServer::string_replaceAll ( string & str,
- string & target,
- string & replacement
+QrshServer::string_replaceAll ( string & str,
+ string & target,
+ string & replacement
)
{
int target_size = target.size();
@@ -519,7 +520,7 @@
-bool
+bool
QrshServer::isProcessName ( char const * str )
{
getNames();
@@ -537,12 +538,12 @@
-int
+int
QrshServer::string_countWords ( char const * s1 )
{
int count = 0;
char const * s2 = s1 + 1;
-
+
if ( ! isspace(* s1) )
{
++ count;
@@ -603,7 +604,7 @@
*/
char file_or_process_name[1000];
sscanf ( request_message.getData().c_str(), "%*s%*s%s", file_or_process_name );
-
+
if ( isProcessName ( file_or_process_name ) )
{
stringstream desired_file_name;
@@ -612,13 +613,13 @@
<< file_or_process_name
<< '/';
char requested_output_stream[1000];
- if(1 != sscanf ( request_message.getData().c_str(),
- "%*s%*s%*s%s",
- requested_output_stream
+ if(1 != sscanf ( request_message.getData().c_str(),
+ "%*s%*s%*s%s",
+ requested_output_stream
)
)
{
- fprintf ( stderr,
+ fprintf ( stderr,
"QrshServer::get error: Can't read requested data file name from this message: |%s|\n",
request_message.getData().c_str()
);
@@ -674,7 +675,7 @@
if ( truncated_command )
{
stringstream ss;
- ss << qrsh_run_path
+ ss << qrsh_run_path
<< ' '
<< data_dir.str()
<< ' '
@@ -706,9 +707,9 @@
fprintf ( stderr, "qrsh_server error awaiting child!\n" );
exit ( 1 );
}
-
+
exit_code >>= 8;
-
+
stringstream data;
data << "wait_response "
<< exit_code;
@@ -731,7 +732,7 @@
// The second word is "exec_wait".
// The third word is the symbolic name of the command to wait for.
// The fact that there are exactly three words means that this
- // must be a command that has already been named and started --
+ // must be a command that has already been named and started --
// we just need to find its pid and wait on it.
pre_existing = true;
}
@@ -762,7 +763,7 @@
if ( truncated_command )
{
stringstream ss;
- ss << qrsh_run_path
+ ss << qrsh_run_path
<< ' '
<< data_dir.str()
<< ' '
@@ -795,7 +796,7 @@
exit ( 1 );
}
}
-
+
exit_code >>= 8;
stringstream data;
@@ -810,7 +811,7 @@
-char const *
+char const *
QrshServer::skipWord ( char const * s )
{
if(! (s && *s) )
@@ -884,7 +885,7 @@
arg_len = 0;
}
- done:
+ done:
if ( arg_len > 0 )
lengths.push_back ( arg_len );
@@ -896,8 +897,8 @@
for ( int i = 0; i < n_args; ++ i )
{
argv[i] = ( char *) malloc ( lengths[i] + 1 );
- strncpy ( argv[i],
- str + start_positions[i],
+ strncpy ( argv[i],
+ str + start_positions[i],
lengths[i]
);
argv[i][lengths[i]] = 0;
@@ -971,12 +972,12 @@
* qrsh_run, which will save all its data in the qrsh dir.
*/
stringstream ss;
- ss << qrsh_run_path
+ ss << qrsh_run_path
<< ' '
<< data_dir.str()
<< ' '
<< s;
-
+
if ( ! fork() )
{
char ** argv = getArgs ( ss.str().c_str() );
@@ -988,8 +989,8 @@
-void
-QrshServer::received ( Message & message )
+void
+QrshServer::received ( Message & message )
{
if ( myMessage ( message ) )
runCommand ( message );
@@ -997,7 +998,9 @@
+}} // namespace qpid::tests
+using namespace qpid::tests;
/*
* fixme mick Mon Aug 3 10:29:26 EDT 2009
@@ -1024,23 +1027,23 @@
// Declare queues.
string myQueue = session.getId().getName();
- session.queueDeclare ( arg::queue=myQueue,
+ session.queueDeclare ( arg::queue=myQueue,
arg::exclusive=true,
arg::autoDelete=true);
- session.exchangeBind ( arg::exchange="amq.fanout",
- arg::queue=myQueue,
+ session.exchangeBind ( arg::exchange="amq.fanout",
+ arg::queue=myQueue,
arg::bindingKey="my-key");
-
+
// Create a server and subscribe it to my queue.
SubscriptionManager subscriptions ( session );
- QrshServer server ( subscriptions,
+ QrshServer server ( subscriptions,
argv[1], // server name
argv[2], // qrsh exe path
host,
port
);
- subscriptions.subscribe ( server, myQueue );
+ subscriptions.subscribe ( server, myQueue );
// Receive messages until the subscription is cancelled
// by QrshServer::received()
@@ -1048,7 +1051,7 @@
connection.close();
}
- catch(const exception& error)
+ catch(const exception& error)
{
cout << error.what() << endl;
return 1;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/receiver.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/receiver.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/receiver.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,7 +36,10 @@
using namespace std;
-struct Args : public qpid::TestOptions
+namespace qpid {
+namespace tests {
+
+struct Args : public qpid::TestOptions
{
string queue;
uint messages;
@@ -47,7 +50,7 @@
Args() : queue("test-queue"), messages(0), ignoreDuplicates(false), creditWindow(0), ackFrequency(1), browse(false)
{
- addOptions()
+ addOptions()
("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages")
("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
@@ -77,15 +80,15 @@
bool isDuplicate(Message& message);
};
-Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency, bool browse) :
- queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0)
+Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency, bool browse) :
+ queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0)
{
if (browse) settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED;
if (creditWindow) settings.flowControl = FlowControl::messageWindow(creditWindow);
settings.autoAck = ackFrequency;
}
-void Receiver::received(Message& message)
+void Receiver::received(Message& message)
{
if (!(skipDups && isDuplicate(message))) {
bool eos = message.getData() == EOS;
@@ -94,7 +97,7 @@
}
}
-bool Receiver::isDuplicate(Message& message)
+bool Receiver::isDuplicate(Message& message)
{
uint sn = message.getHeaders().getAsInt("sn");
if (lastSn < sn) {
@@ -115,6 +118,10 @@
}
}
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char ** argv)
{
Args opts;
@@ -130,6 +137,3 @@
}
return 1;
}
-
-
-
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replaying_sender.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replaying_sender.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replaying_sender.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replaying_sender.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,12 +35,15 @@
using namespace std;
+namespace qpid {
+namespace tests {
+
class Sender : public FailoverManager::Command
{
public:
Sender(const std::string& queue, uint count, uint reportFreq);
void execute(AsyncSession& session, bool isRetry);
- uint getSent();
+ uint getSent();
void setVerbosity ( int v ) { verbosity = v; }
void setPersistence ( int p ) { persistence = p; }
@@ -51,7 +54,7 @@
uint sent;
const uint reportFrequency;
Message message;
-
+
int verbosity;
int persistence;
};
@@ -93,7 +96,11 @@
return sent;
}
-int main(int argc, char ** argv)
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char ** argv)
{
ConnectionSettings settings;
@@ -118,23 +125,23 @@
connection.execute ( sender );
if ( verbosity > 0 )
{
- std::cout << "Sender finished. Sent "
- << sender.getSent()
- << " messages."
+ std::cout << "Sender finished. Sent "
+ << sender.getSent()
+ << " messages."
<< endl;
}
connection.close();
- return 0;
- }
- catch(const std::exception& error)
+ return 0;
+ }
+ catch(const std::exception& error)
{
- cerr << "Sender (host: "
- << settings.host
- << " port: "
+ cerr << "Sender (host: "
+ << settings.host
+ << " port: "
<< settings.port
<< " ) "
- << " Failed: "
- << error.what()
+ << " Failed: "
+ << error.what()
<< std::endl;
}
return 1;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/resuming_receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/resuming_receiver.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/resuming_receiver.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/resuming_receiver.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,8 +35,11 @@
using namespace std;
-class Listener : public MessageListener,
- public FailoverManager::Command,
+namespace qpid {
+namespace tests {
+
+class Listener : public MessageListener,
+ public FailoverManager::Command,
public FailoverManager::ReconnectionStrategy
{
public:
@@ -57,32 +60,32 @@
};
-Listener::Listener(int freq, int verbosity)
- : count(0),
- received_twice(0),
- lastSn(0),
- gaps(false),
+Listener::Listener(int freq, int verbosity)
+ : count(0),
+ received_twice(0),
+ lastSn(0),
+ gaps(false),
reportFrequency(freq),
verbosity(verbosity),
done(false)
{}
-void Listener::received(Message & message)
+void Listener::received(Message & message)
{
- if (message.getData() == "That's all, folks!")
+ if (message.getData() == "That's all, folks!")
{
done = true;
if(verbosity > 0 )
{
- std::cout << "Shutting down listener for "
+ std::cout << "Shutting down listener for "
<< message.getDestination() << std::endl;
- std::cout << "Listener received "
- << count
- << " messages ("
- << received_twice
- << " received_twice)"
+ std::cout << "Listener received "
+ << count
+ << " messages ("
+ << received_twice
+ << " received_twice)"
<< endl;
}
subscription.cancel();
@@ -99,8 +102,8 @@
++count;
if ( ! ( count % reportFrequency ) ) {
if ( verbosity > 0 )
- std::cout << "Listener has received "
- << count
+ std::cout << "Listener has received "
+ << count
<< " messages.\n";
}
} else {
@@ -133,6 +136,10 @@
if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end());
}
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char ** argv)
{
ConnectionSettings settings;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/sender.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/sender.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/sender.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/sender.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,22 +37,27 @@
using namespace std;
-struct Args : public qpid::TestOptions
+namespace qpid {
+namespace tests {
+
+struct Args : public qpid::TestOptions
{
string destination;
string key;
uint sendEos;
bool durable;
+ uint ttl;
string lvqMatchValue;
string lvqMatchFile;
- Args() : key("test-queue"), sendEos(0), durable(false)
+ Args() : key("test-queue"), sendEos(0), durable(false), ttl(0)
{
addOptions()
("exchange", qpid::optValue(destination, "EXCHANGE"), "Exchange to send messages to")
("routing-key", qpid::optValue(key, "KEY"), "Routing key to add to messages")
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+ ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
("lvq-match-value", qpid::optValue(lvqMatchValue, "KEY"), "The value to set for the LVQ match key property")
("lvq-match-file", qpid::optValue(lvqMatchFile, "FILE"), "A file containing values to set for the LVQ match key property");
}
@@ -63,26 +68,29 @@
class Sender : public FailoverManager::Command
{
public:
- Sender(const std::string& destination, const std::string& key, uint sendEos, bool durable,
+ Sender(const std::string& destination, const std::string& key, uint sendEos, bool durable, uint ttl,
const std::string& lvqMatchValue, const std::string& lvqMatchFile);
void execute(AsyncSession& session, bool isRetry);
private:
const std::string destination;
MessageReplayTracker sender;
- Message message;
+ Message message;
const uint sendEos;
uint sent;
std::ifstream lvqMatchValues;
};
-Sender::Sender(const std::string& dest, const std::string& key, uint eos, bool durable,
- const std::string& lvqMatchValue, const std::string& lvqMatchFile) :
+Sender::Sender(const std::string& dest, const std::string& key, uint eos, bool durable, uint ttl, const std::string& lvqMatchValue, const std::string& lvqMatchFile) :
destination(dest), sender(10), message("", key), sendEos(eos), sent(0) , lvqMatchValues(lvqMatchFile.c_str())
{
if (durable){
message.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
+ if (ttl) {
+ message.getDeliveryProperties().setTtl(ttl);
+ }
+
if (!lvqMatchValue.empty()) {
message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqMatchValue);
}
@@ -108,16 +116,20 @@
}
}
-int main(int argc, char ** argv)
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char ** argv)
{
Args opts;
try {
opts.parse(argc, argv);
FailoverManager connection(opts.con);
- Sender sender(opts.destination, opts.key, opts.sendEos, opts.durable, opts.lvqMatchValue, opts.lvqMatchFile);
+ Sender sender(opts.destination, opts.key, opts.sendEos, opts.durable, opts.ttl, opts.lvqMatchValue, opts.lvqMatchFile);
connection.execute(sender);
connection.close();
- return 0;
+ return 0;
} catch(const std::exception& error) {
std::cout << "Failed: " << error.what() << std::endl;
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/shlibtest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/shlibtest.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/shlibtest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/shlibtest.cpp Mon Oct 5 12:51:57 2009
@@ -18,6 +18,9 @@
*
*/
+namespace qpid {
+namespace tests {
+
int* loaderData = 0;
extern "C"
#ifdef WIN32
@@ -28,5 +31,4 @@
struct OnUnload { ~OnUnload() { *loaderData=42; } };
OnUnload unloader; // For destructor.
-
-
+}} // namespace qpid::tests
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/test_store.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/test_store.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,7 @@
/**@file
* Plug-in message store for tests.
- *
+ *
* Add functionality as required, build up a comprehensive set of
* features to support persistent behavior tests.
*
@@ -46,6 +46,9 @@
using namespace boost;
using namespace qpid::sys;
+namespace qpid {
+namespace tests {
+
struct TestStoreOptions : public Options {
string name;
@@ -66,7 +69,7 @@
delete this;
}
};
-
+
class TestStore : public NullMessageStore {
public:
TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {}
@@ -83,7 +86,7 @@
// Check the message for special instructions.
size_t i = string::npos;
- size_t j = string::npos;
+ size_t j = string::npos;
if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
&& (i = data.find(name+"[")) != string::npos
&& (j = data.find("]", i)) != string::npos)
@@ -144,3 +147,5 @@
};
static TestStorePlugin pluginInstance;
+
+}} // namespace qpid::tests
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/test_tools.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/test_tools.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/test_tools.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/test_tools.h Mon Oct 5 12:51:57 2009
@@ -34,7 +34,7 @@
return o;
}
-// Compare sequences
+// Compare sequences
template <class T, class U>
bool seqEqual(const T& a, const U& b) {
typename T::const_iterator i = a.begin();
@@ -60,6 +60,9 @@
bool operator == (const boost::assign_detail::generic_list<T>& b, const vector<T>& a) { return seqEqual(a, b); }
}
+namespace qpid {
+namespace tests {
+
/** NB: order of parameters is regex first, in line with
* CHECK(expected, actual) convention.
*/
@@ -98,6 +101,7 @@
return defaultPath;
}
+}} // namespace qpid::tests
#endif /*!TEST_TOOLS_H*/
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/topic_listener.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/topic_listener.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,7 @@
* This file provides one half of a test and example of a pub-sub
* style of interaction. See topic_publisher.cpp for the other half,
* in which the logic for publishing is defined.
- *
+ *
* This file contains the listener logic. A listener will subscribe to
* a logical 'topic'. It will count the number of messages it receives
* and the time elapsed between the first one and the last one. It
@@ -50,11 +50,14 @@
using namespace qpid::framing;
using namespace std;
+namespace qpid {
+namespace tests {
+
/**
* A message listener implementation in which the runtime logic is
* defined.
*/
-class Listener : public MessageListener{
+class Listener : public MessageListener{
Session session;
SubscriptionManager& mgr;
const string responseQueue;
@@ -62,7 +65,7 @@
bool init;
int count;
AbsTime start;
-
+
void shutdown();
void report();
public:
@@ -91,6 +94,52 @@
}
};
+Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) :
+ session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){}
+
+void Listener::received(Message& message){
+ if(!init){
+ start = now();
+ count = 0;
+ init = true;
+ cout << "Batch started." << endl;
+ }
+ string type = message.getHeaders().getAsString("TYPE");
+
+ if(string("TERMINATION_REQUEST") == type){
+ shutdown();
+ }else if(string("REPORT_REQUEST") == type){
+ subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point
+ cout <<"Batch ended, sending report." << endl;
+ //send a report:
+ report();
+ init = false;
+ }else if (++count % 1000 == 0){
+ cout <<"Received " << count << " messages." << endl;
+ }
+}
+
+void Listener::shutdown(){
+ mgr.stop();
+}
+
+void Listener::report(){
+ AbsTime finish = now();
+ Duration time(start, finish);
+ stringstream reportstr;
+ reportstr << "Received " << count << " messages in "
+ << time/TIME_MSEC << " ms.";
+ Message msg(reportstr.str(), responseQueue);
+ msg.getHeaders().setString("TYPE", "REPORT");
+ session.messageTransfer(arg::destination="amq.direct", arg::content=msg, arg::acceptMode=1);
+ if(transactional){
+ sync(session).txCommit();
+ }
+}
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
/**
* The main routine creates a Listener instance and sets it up to
@@ -142,7 +191,7 @@
if (args.transactional) {
session.txSelect();
}
-
+
cout << "topic_listener: listening..." << endl;
mgr.run();
if (args.durable) {
@@ -158,47 +207,3 @@
}
return 1;
}
-
-Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) :
- session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){}
-
-void Listener::received(Message& message){
- if(!init){
- start = now();
- count = 0;
- init = true;
- cout << "Batch started." << endl;
- }
- string type = message.getHeaders().getAsString("TYPE");
-
- if(string("TERMINATION_REQUEST") == type){
- shutdown();
- }else if(string("REPORT_REQUEST") == type){
- subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point
- cout <<"Batch ended, sending report." << endl;
- //send a report:
- report();
- init = false;
- }else if (++count % 1000 == 0){
- cout <<"Received " << count << " messages." << endl;
- }
-}
-
-void Listener::shutdown(){
- mgr.stop();
-}
-
-void Listener::report(){
- AbsTime finish = now();
- Duration time(start, finish);
- stringstream reportstr;
- reportstr << "Received " << count << " messages in "
- << time/TIME_MSEC << " ms.";
- Message msg(reportstr.str(), responseQueue);
- msg.getHeaders().setString("TYPE", "REPORT");
- session.messageTransfer(arg::destination="amq.direct", arg::content=msg, arg::acceptMode=1);
- if(transactional){
- sync(session).txCommit();
- }
-}
-
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/topic_publisher.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/topic_publisher.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/topic_publisher.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,7 @@
* This file provides one half of a test and example of a pub-sub
* style of interaction. See topic_listener.cpp for the other half, in
* which the logic for subscribers is defined.
- *
+ *
* This file contains the publisher logic. The publisher will send a
* number of messages to the exchange with the appropriate routing key
* for the logical 'topic'. Once it has done this it will then send a
@@ -49,19 +49,22 @@
using namespace qpid::sys;
using namespace std;
+namespace qpid {
+namespace tests {
+
/**
* The publishing logic is defined in this class. It implements
* message listener and can therfore be used to receive messages sent
* back by the subscribers.
*/
-class Publisher {
+class Publisher {
AsyncSession session;
SubscriptionManager mgr;
LocalQueue queue;
const string controlTopic;
const bool transactional;
const bool durable;
-
+
string generateData(int size);
public:
@@ -99,6 +102,64 @@
}
};
+Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) :
+ session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
+{
+ mgr.subscribe(queue, "response");
+}
+
+int64_t Publisher::publish(int msgs, int listeners, int size){
+ Message msg(generateData(size), controlTopic);
+ if (durable) {
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ }
+ AbsTime start = now();
+
+ for(int i = 0; i < msgs; i++){
+ session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1);
+ }
+ //send report request
+ Message reportRequest("", controlTopic);
+ reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+ session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1);
+ if(transactional){
+ sync(session).txCommit();
+ }
+ //wait for a response from each listener (TODO, could log these)
+ for (int i = 0; i < listeners; i++) {
+ Message report = queue.pop();
+ }
+
+ if(transactional){
+ sync(session).txCommit();
+ }
+
+ AbsTime finish = now();
+ return Duration(start, finish);
+}
+
+string Publisher::generateData(int size){
+ string data;
+ for(int i = 0; i < size; i++){
+ data += ('A' + (i / 26));
+ }
+ return data;
+}
+
+void Publisher::terminate(){
+ //send termination request
+ Message terminationRequest("", controlTopic);
+ terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
+ session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1);
+ if(transactional){
+ session.txCommit();
+ }
+}
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char** argv) {
try{
Args args;
@@ -120,11 +181,11 @@
Message m = statusQ.get();
if( m.getData().find("topic_listener: ", 0) == 0 ) {
cout << "Listener " << (i+1) << " of " << args.subscribers
- << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16)
- << ")" << endl;
+ << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16)
+ << ")" << endl;
} else {
throw Exception(QPID_MSG("Unexpected message received on status queue: " << m.getData()));
- }
+ }
}
}
@@ -150,12 +211,12 @@
if(!min || msecs < min) min = msecs;
sum += msecs;
cout << "Completed " << (i+1) << " of " << batchSize
- << " in " << msecs << "ms" << endl;
+ << " in " << msecs << "ms" << endl;
}
publisher.terminate();
int64_t avg = sum / batchSize;
if(batchSize > 1){
- cout << batchSize << " batches completed. avg=" << avg <<
+ cout << batchSize << " batches completed. avg=" << avg <<
", max=" << max << ", min=" << min << endl;
}
session.close();
@@ -167,57 +228,3 @@
}
return 1;
}
-
-Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) :
- session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
-{
- mgr.subscribe(queue, "response");
-}
-
-int64_t Publisher::publish(int msgs, int listeners, int size){
- Message msg(generateData(size), controlTopic);
- if (durable) {
- msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
- }
- AbsTime start = now();
-
- for(int i = 0; i < msgs; i++){
- session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1);
- }
- //send report request
- Message reportRequest("", controlTopic);
- reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1);
- if(transactional){
- sync(session).txCommit();
- }
- //wait for a response from each listener (TODO, could log these)
- for (int i = 0; i < listeners; i++) {
- Message report = queue.pop();
- }
-
- if(transactional){
- sync(session).txCommit();
- }
-
- AbsTime finish = now();
- return Duration(start, finish);
-}
-
-string Publisher::generateData(int size){
- string data;
- for(int i = 0; i < size; i++){
- data += ('A' + (i / 26));
- }
- return data;
-}
-
-void Publisher::terminate(){
- //send termination request
- Message terminationRequest("", controlTopic);
- terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1);
- if(transactional){
- session.txCommit();
- }
-}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/txjob.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/txjob.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/txjob.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/txjob.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,7 +33,10 @@
using namespace qpid::client;
using namespace qpid::sys;
-struct Args : public qpid::TestOptions
+namespace qpid {
+namespace tests {
+
+struct Args : public qpid::TestOptions
{
string workQueue;
string source;
@@ -43,10 +46,10 @@
bool quit;
bool declareQueues;
- Args() : workQueue("txshift-control"), source("txshift-1"), dest("txshift-2"), messages(0), jobs(0),
+ Args() : workQueue("txshift-control"), source("txshift-1"), dest("txshift-2"), messages(0), jobs(0),
quit(false), declareQueues(false)
{
- addOptions()
+ addOptions()
("messages", qpid::optValue(messages, "N"), "Number of messages to shift")
("jobs", qpid::optValue(jobs, "N"), "Number of shift jobs to request")
("source", qpid::optValue(source, "QUEUE NAME"), "source queue from which messages will be shifted")
@@ -57,6 +60,10 @@
}
};
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
//TODO: might be nice to make this capable of failover as well at some
//point; for now its just for the setup phase.
int main(int argc, char** argv)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org