You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/11/30 15:37:52 UTC
svn commit: r599832 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/client/Dispatcher.cpp qpid/client/SubscriptionManager.cpp
qpid/client/SubscriptionManager.h tests/topic_listener.cpp
tests/topic_publisher.cpp
Author: gsim
Date: Fri Nov 30 06:37:45 2007
New Revision: 599832
URL: http://svn.apache.org/viewvc?rev=599832&view=rev
Log:
Altered topic test to use the new session api.
Exposed start() through the subscription manager in addition to run().
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Fri Nov 30 06:37:45 2007
@@ -69,18 +69,22 @@
boost::state_saver<bool> reset(running); // Reset to false on exit.
running = true;
queue->open();
- while (!queue->isClosed()) {
- Mutex::ScopedUnlock u(lock);
- FrameSet::shared_ptr content = queue->pop();
- if (content->isA<MessageTransferBody>()) {
- Message msg(*content, session);
- Subscriber::shared_ptr listener = find(msg.getDestination());
- assert(listener);
- listener->received(msg);
- } else {
- assert (handler.get());
- handler->handle(*content);
+ try {
+ while (!queue->isClosed()) {
+ Mutex::ScopedUnlock u(lock);
+ FrameSet::shared_ptr content = queue->pop();
+ if (content->isA<MessageTransferBody>()) {
+ Message msg(*content, session);
+ Subscriber::shared_ptr listener = find(msg.getDestination());
+ assert(listener);
+ listener->received(msg);
+ } else {
+ assert (handler.get());
+ handler->handle(*content);
+ }
}
+ } catch (const ClosedException&) {
+ //ignore it and return
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Fri Nov 30 06:37:45 2007
@@ -102,6 +102,11 @@
dispatcher.stop();
}
+void SubscriptionManager::start()
+{
+ dispatcher.start();
+}
+
}} // namespace qpid::client
#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Fri Nov 30 06:37:45 2007
@@ -82,8 +82,12 @@
*/
void run(bool autoStop=true);
+ /** Deliver messages in another thread. */
+ void start();
+
/** Cause run() to return */
void stop();
+
static const uint32_t UNLIMITED=0xFFFFFFFF;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Fri Nov 30 06:37:45 2007
@@ -33,11 +33,10 @@
*/
#include "TestOptions.h"
-#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/Exchange.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/client/Queue.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/FieldValue.h"
#include <iostream>
@@ -54,7 +53,8 @@
* defined.
*/
class Listener : public MessageListener{
- Channel* const channel;
+ Session_0_10& session;
+ SubscriptionManager& mgr;
const string responseQueue;
const bool transactional;
bool init;
@@ -64,7 +64,7 @@
void shutdown();
void report();
public:
- Listener(Channel* channel, const string& reponseQueue, bool tx);
+ Listener(Session_0_10& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
virtual void received(Message& msg);
};
@@ -72,14 +72,14 @@
* A utility class for managing the options passed in.
*/
struct Args : public qpid::TestOptions {
- int ackmode;
+ int ack;
bool transactional;
int prefetch;
- Args() : ackmode(NO_ACK), transactional(false), prefetch(1000) {
+ Args() : ack(0), transactional(false), prefetch(0) {
addOptions()
- ("ack", optValue(ackmode, "MODE"), "Ack mode: 0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
+ ("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)")
("transactional", optValue(transactional), "Use transactions")
- ("prefetch", optValue(prefetch, "N"), "prefetch count");
+ ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)");
}
};
@@ -98,24 +98,35 @@
else {
cout << "topic_listener: Started." << endl;
Connection connection(args.trace);
- connection.open(args.host, args.port, args.username, args.password, args.virtualhost);
- Channel channel(args.transactional, args.prefetch);
- connection.openChannel(channel);
-
+ args.open(connection);
+ Session_0_10 session = connection.newSession();
+ if (args.transactional) {
+ session.txSelect();
+ }
+
//declare exchange, queue and bind them:
- Queue response("response");
- channel.declareQueue(response);
-
- Queue control;
- channel.declareQueue(control);
- qpid::framing::FieldTable bindArgs;
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
+ session.queueDeclare(arg::queue="response");
+ std::string control = "control_" + session.getId().str();
+ session.queueDeclare(arg::queue=control);
+ session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control");
+
//set up listener
- Listener listener(&channel, response.getName(), args.transactional);
- channel.consume(control, "c1", &listener, AckMode(args.ackmode));
+ SubscriptionManager mgr(session);
+ Listener listener(session, mgr, "response", args.transactional);
+ if (args.prefetch) {
+ mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2)));
+ mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true);
+ } else {
+ mgr.setConfirmMode(false);
+ mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+ }
+ mgr.subscribe(listener, control);
+
cout << "topic_listener: Consuming." << endl;
- channel.run();
- cout << "topic_listener: run returned, closing connection" << endl;
+ mgr.run();
+ cout << "topic_listener: run returned, closing session" << endl;
+ session.close();
+ cout << "closing connection" << endl;
connection.close();
cout << "topic_listener: normal exit" << endl;
}
@@ -126,8 +137,8 @@
return 1;
}
-Listener::Listener(Channel* _channel, const string& _responseq, bool tx) :
- channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){}
+Listener::Listener(Session_0_10& s, SubscriptionManager& m, const string& _responseq, bool tx) :
+ session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){}
void Listener::received(Message& message){
if(!init){
@@ -149,7 +160,7 @@
}
void Listener::shutdown(){
- channel->close();
+ mgr.stop();
}
void Listener::report(){
@@ -158,11 +169,11 @@
stringstream reportstr;
reportstr << "Received " << count << " messages in "
<< time/TIME_MSEC << " ms.";
- Message msg(reportstr.str());
+ Message msg(reportstr.str(), responseQueue);
msg.getHeaders().setString("TYPE", "REPORT");
- channel->publish(msg, string(), responseQueue);
+ session.messageTransfer(arg::content=msg);
if(transactional){
- channel->commit();
+ session.txCommit();
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp Fri Nov 30 06:37:45 2007
@@ -35,11 +35,10 @@
*/
#include "TestOptions.h"
-#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/Exchange.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/client/Queue.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
#include <unistd.h>
#include "qpid/sys/Time.h"
@@ -57,7 +56,7 @@
* back by the subscribers.
*/
class Publisher : public MessageListener{
- Channel* const channel;
+ Session_0_10& session;
const string controlTopic;
const bool transactional;
Monitor monitor;
@@ -67,7 +66,7 @@
string generateData(int size);
public:
- Publisher(Channel* channel, const string& controlTopic, bool tx);
+ Publisher(Session_0_10& session, const string& controlTopic, bool tx);
virtual void received(Message& msg);
int64_t publish(int msgs, int listeners, int size);
void terminate();
@@ -79,7 +78,7 @@
struct Args : public TestOptions {
int messages;
int subscribers;
- int ackmode;
+ int ack;
bool transactional;
int prefetch;
int batches;
@@ -87,13 +86,13 @@
int size;
Args() : messages(1000), subscribers(1),
- ackmode(NO_ACK), transactional(false), prefetch(1000),
+ ack(500), transactional(false), prefetch(1000),
batches(1), delay(0), size(256)
{
addOptions()
("messages", optValue(messages, "N"), "how many messages to send")
("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from")
- ("ackmode", optValue(ackmode, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
+ ("ack", optValue(ack, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
("transactional", optValue(transactional), "client should use transactions")
("prefetch", optValue(prefetch, "N"), "prefetch count")
("batches", optValue(batches, "N"), "how many batches to run")
@@ -110,18 +109,21 @@
cout << args << endl;
else {
Connection connection(args.trace);
- connection.open(args.host, args.port, args.username, args.password, args.virtualhost);
- Channel channel(args.transactional, args.prefetch);
- connection.openChannel(channel);
+ args.open(connection);
+ Session_0_10 session = connection.newSession();
+ if (args.transactional) {
+ session.txSelect();
+ }
+
//declare queue (relying on default binding):
- Queue response("response");
- channel.declareQueue(response);
+ session.queueDeclare(arg::queue="response");
//set up listener
- Publisher publisher(&channel, "topic_control", args.transactional);
- channel.consume(response, "mytag", &publisher, AckMode(args.ackmode));
- channel.start();
+ SubscriptionManager mgr(session);
+ Publisher publisher(session, "topic_control", args.transactional);
+ mgr.subscribe(publisher, "response");
+ mgr.start();
int batchSize(args.batches);
int64_t max(0);
@@ -140,12 +142,13 @@
<< " in " << msecs << "ms" << endl;
}
publisher.terminate();
+ mgr.stop();
int64_t avg = sum / batchSize;
if(batchSize > 1){
cout << batchSize << " batches completed. avg=" << avg <<
", max=" << max << ", min=" << min << endl;
}
- channel.close();
+ session.close();
connection.close();
}
return 0;
@@ -155,8 +158,8 @@
return 1;
}
-Publisher::Publisher(Channel* _channel, const string& _controlTopic, bool tx) :
- channel(_channel), controlTopic(_controlTopic), transactional(tx){}
+Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx) :
+ session(_session), controlTopic(_controlTopic), transactional(tx){}
void Publisher::received(Message& ){
//count responses and when all are received end the current batch
@@ -172,21 +175,19 @@
}
int64_t Publisher::publish(int msgs, int listeners, int size){
- Message msg;
- msg.setData(generateData(size));
+ Message msg(generateData(size), controlTopic);
AbsTime start = now();
{
Monitor::ScopedLock l(monitor);
for(int i = 0; i < msgs; i++){
- channel->publish(
- msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
}
//send report request
- Message reportRequest;
+ Message reportRequest("", controlTopic);
reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
if(transactional){
- channel->commit();
+ session.txCommit();
}
waitForCompletion(listeners);
@@ -206,11 +207,11 @@
void Publisher::terminate(){
//send termination request
- Message terminationRequest;
+ Message terminationRequest("", controlTopic);
terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic");
if(transactional){
- channel->commit();
+ session.txCommit();
}
}