You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/12/04 15:19:07 UTC
svn commit: r600962 - in /incubator/qpid/trunk/qpid/cpp/src/tests:
topic_listener.cpp topic_publisher.cpp
Author: gsim
Date: Tue Dec 4 06:19:06 2007
New Revision: 600962
URL: http://svn.apache.org/viewvc?rev=600962&view=rev
Log:
Updates to topic test
Modified:
incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=600962&r1=600961&r2=600962&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Tue Dec 4 06:19:06 2007
@@ -99,7 +99,6 @@
if(args.help)
cout << args << endl;
else {
- cout << "topic_listener: Started." << endl;
Connection connection(args.trace);
args.open(connection);
Session_0_10 session = connection.newSession();
@@ -110,7 +109,11 @@
//declare exchange, queue and bind them:
session.queueDeclare(arg::queue="response");
std::string control = "control_" + session.getId().str();
- session.queueDeclare(arg::queue=control, arg::durable=args.durable);
+ if (args.durable) {
+ session.queueDeclare(arg::queue=control, arg::durable=true);
+ } else {
+ session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true);
+ }
session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control");
//set up listener
@@ -125,13 +128,14 @@
}
mgr.subscribe(listener, control);
- cout << "topic_listener: Consuming." << endl;
+ cout << "topic_listener: listening..." << endl;
mgr.run();
- cout << "topic_listener: run returned, closing session" << endl;
+ if (args.durable) {
+ session.queueDelete(arg::queue=control);
+ }
session.close();
cout << "closing connection" << endl;
connection.close();
- cout << "topic_listener: normal exit" << endl;
}
return 0;
} catch (const std::exception& error) {
@@ -148,16 +152,19 @@
start = now();
count = 0;
init = true;
+ cout << "Batch started." << endl;
}
FieldTable::ValuePtr type(message.getHeaders().get("TYPE"));
if(!!type && StringValue("TERMINATION_REQUEST") == *type){
shutdown();
}else if(!!type && StringValue("REPORT_REQUEST") == *type){
+ message.acknowledge();//acknowledge everything upto this point
+ cout <<"Batch ended, sending report." << endl;
//send a report:
report();
init = false;
- }else if (++count % 100 == 0){
+ }else if (++count % 1000 == 0){
cout <<"Received " << count << " messages." << endl;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp?rev=600962&r1=600961&r2=600962&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp Tue Dec 4 06:19:06 2007
@@ -55,20 +55,18 @@
* message listener and can therfore be used to receive messages sent
* back by the subscribers.
*/
-class Publisher : public MessageListener{
+class Publisher {
Session_0_10& session;
+ SubscriptionManager mgr;
+ LocalQueue queue;
const string controlTopic;
const bool transactional;
const bool durable;
- Monitor monitor;
- int count;
- void waitForCompletion(int msgs);
string generateData(int size);
public:
Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool durable);
- virtual void received(Message& msg);
int64_t publish(int msgs, int listeners, int size);
void terminate();
};
@@ -118,11 +116,7 @@
//declare queue (relying on default binding):
session.queueDeclare(arg::queue="response");
- //set up listener
- SubscriptionManager mgr(session);
Publisher publisher(session, "topic_control", args.transactional, args.durable);
- mgr.subscribe(publisher, "response");
- mgr.start();
int batchSize(args.batches);
int64_t max(0);
@@ -141,7 +135,6 @@
<< " in " << msecs << "ms" << endl;
}
publisher.terminate();
- mgr.stop();
int64_t avg = sum / batchSize;
if(batchSize > 1){
cout << batchSize << " batches completed. avg=" << avg <<
@@ -158,19 +151,9 @@
}
Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx, bool d) :
- session(_session), controlTopic(_controlTopic), transactional(tx), durable(d), count(0) {}
-
-void Publisher::received(Message& ){
- //count responses and when all are received end the current batch
- Monitor::ScopedLock l(monitor);
- if(--count == 0){
- monitor.notify();
- }
-}
-
-void Publisher::waitForCompletion(int msgs){
- count = msgs;
- monitor.wait();
+ session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
+{
+ mgr.subscribe(queue, "response");
}
int64_t Publisher::publish(int msgs, int listeners, int size){
@@ -179,21 +162,25 @@
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
AbsTime start = now();
- {
- Monitor::ScopedLock l(monitor);
- for(int i = 0; i < msgs; i++){
- session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
- }
- //send report request
- Message reportRequest("", controlTopic);
- reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
- if(transactional){
- session.txCommit();
- }
-
- waitForCompletion(listeners);
+
+ for(int i = 0; i < msgs; i++){
+ session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
}
+ //send report request
+ Message reportRequest("", controlTopic);
+ reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+ session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
+ if(transactional){
+ session.txCommit();
+ }
+ //wait for a response from each listener (TODO, could log these)
+ for (int i = 0; i < listeners; i++) {
+ Message report = queue.pop();
+ }
+
+ if(transactional){
+ session.txCommit();
+ }
AbsTime finish = now();
return Duration(start, finish);