You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/12/17 11:59:29 UTC
svn commit: r604821 -
/incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
Author: gsim
Date: Mon Dec 17 02:59:26 2007
New Revision: 604821
URL: http://svn.apache.org/viewvc?rev=604821&view=rev
Log:
Changed to use the new session api, made size and max-frame-size configurable.
Modified:
incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp?rev=604821&r1=604820&r2=604821&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp Mon Dec 17 02:59:26 2007
@@ -29,106 +29,115 @@
#include <iostream>
#include "TestOptions.h"
-#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Time.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MessageTransferBody.h"
+using namespace qpid;
using namespace qpid::client;
-using namespace qpid::sys;
+using qpid::framing::FrameSet;
+using qpid::framing::MessageTransferBody;
using std::string;
-
-/**
- * A simple message listener implementation that prints out the
- * message content then notifies a montitor allowing the test to
- * complete.
- */
-class SimpleListener : public virtual MessageListener{
- Monitor* monitor;
- bool verbose;
-
-public:
- inline SimpleListener(Monitor* _monitor, bool debug) : monitor(_monitor), verbose(debug) {}
-
- inline virtual void received(Message& msg){
- if (verbose)
- std::cout << "Received message " << msg.getData() << std::endl;
- monitor->notify();
+struct Args : public qpid::TestOptions {
+ uint msgSize;
+ uint maxFrameSize;
+
+ Args() : msgSize(26), maxFrameSize(65535)
+ {
+ addOptions()
+ ("size", optValue(msgSize, "N"), "message size")
+ ("max-frame-size", optValue(maxFrameSize, "N"), "max frame size");
}
};
+const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+
+std::string generateData(uint size)
+{
+ if (size < chars.length()) {
+ return chars.substr(0, size);
+ }
+ std::string data;
+ for (uint i = 0; i < (size / chars.length()); i++) {
+ data += chars;
+ }
+ data += chars.substr(0, size % chars.length());
+ return data;
+}
+
+void print(const std::string& text, const Message& msg)
+{
+ std::cout << text;
+ if (msg.getData().size() > 16) {
+ std::cout << msg.getData().substr(0, 16) << "...";
+ } else {
+ std::cout << msg.getData();
+ }
+ std::cout << std::endl;
+}
+
int main(int argc, char** argv)
{
try {
- qpid::TestOptions opts;
+ Args opts;
opts.parse(argc, argv);
-
- //Use a custom exchange
- Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE);
- //Use a named, temporary queue
- Queue queue("MyQueue", true);
-
-
- Connection con(opts.trace);
- con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost);
- if (opts.trace)
- std::cout << "Opened connection." << std::endl;
-
- //Create and open a channel on the connection through which
- //most functionality is exposed
- Channel channel;
- con.openChannel(channel);
- if (opts.trace) std::cout << "Opened channel." << std::endl;
+
+ //Connect to the broker:
+ Connection connection(opts.trace, opts.maxFrameSize);
+ opts.open(connection);
+ if (opts.trace) std::cout << "Opened connection." << std::endl;
+
+ //Create and open a session on the connection through which
+ //most functionality is exposed:
+ Session_0_10 session = connection.newSession();
+ if (opts.trace) std::cout << "Opened session." << std::endl;
+
//'declare' the exchange and the queue, which will create them
//as they don't exist
- channel.declareExchange(exchange);
+ session.exchangeDeclare(arg::exchange="MyExchange", arg::type="direct");
if (opts.trace) std::cout << "Declared exchange." << std::endl;
- channel.declareQueue(queue);
+ session.queueDeclare(arg::queue="MyQueue", arg::autoDelete=true, arg::exclusive=true);
if (opts.trace) std::cout << "Declared queue." << std::endl;
//now bind the queue to the exchange
- channel.bind(exchange, queue, "MyTopic");
+ session.queueBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::routingKey="MyKey");
if (opts.trace) std::cout << "Bound queue to exchange." << std::endl;
- //Set up a message listener to receive any messages that
- //arrive in our queue on the broker. We only expect one, and
- //as it will be received on another thread, we create a
- //montior to use to notify the main thread when that message
- //is received.
- Monitor monitor;
- SimpleListener listener(&monitor, opts.trace);
- string tag("MyTag");
- channel.consume(queue, tag, &listener);
- if (opts.trace) std::cout << "Registered consumer." << std::endl;
-
- //we need to enable the message dispatching for this channel
- //and we want that to occur on another thread so we call
- //start().
- channel.start();
-
- //Now we create and publish a message to our exchange with a
- //routing key that will cause it to be routed to our queue
- Message msg;
- string data("MyMessage");
- msg.setData(data);
- channel.publish(msg, exchange, "MyTopic");
- if (opts.trace) std::cout << "Published message: " << data << std::endl;
-
- {
- Monitor::ScopedLock l(monitor);
- //now we wait until we receive notification that the
- //message was received
- monitor.wait();
+ //create and send a message to the exchange using the routing
+ //key we bound our queue with:
+ Message msgOut(generateData(opts.msgSize));
+ msgOut.getDeliveryProperties().setRoutingKey("MyKey");
+ session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut);
+ if (opts.trace) print("Published message: ", msgOut);
+
+ //subscribe to the queue, add sufficient credit and then get
+ //incoming 'frameset', check that its a message transfer and
+ //then convert it to a message (see Dispatcher and
+ //SubscriptionManager utilties for common reusable patterns at
+ //a higher level)
+ session.messageSubscribe(arg::queue="MyQueue", arg::destination="MyId");
+ session.messageFlow(arg::destination="MyId", arg::unit=0, arg::value=1); //credit for one message
+ session.messageFlow(arg::destination="MyId", arg::unit=1, arg::value=0xFFFFFFFF); //credit for infinite bytes
+ if (opts.trace) std::cout << "Subscribed to queue." << std::endl;
+ FrameSet::shared_ptr incoming = session.get();
+ if (incoming->isA<MessageTransferBody>()) {
+ Message msgIn(*incoming, session);
+ if (msgIn.getData() == msgOut.getData()) {
+ if (opts.trace) std::cout << "Received the exepected message." << std::endl;
+ msgIn.acknowledge();
+ } else {
+ print("Received an unexepected message: ", msgIn);
+ }
}
- //close the channel & connection
- channel.close();
- if (opts.trace) std::cout << "Closed channel." << std::endl;
- con.close();
+ //close the session & connection
+ session.close();
+ if (opts.trace) std::cout << "Closed session." << std::endl;
+ connection.close();
if (opts.trace) std::cout << "Closed connection." << std::endl;
return 0;
} catch(const std::exception& e) {