You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/12/17 11:59:29 UTC

svn commit: r604821 - /incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp

Author: gsim
Date: Mon Dec 17 02:59:26 2007
New Revision: 604821

URL: http://svn.apache.org/viewvc?rev=604821&view=rev
Log:
Changed to use the new session api, made size and max-frame-size configurable.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp?rev=604821&r1=604820&r2=604821&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp Mon Dec 17 02:59:26 2007
@@ -29,106 +29,115 @@
 #include <iostream>
 
 #include "TestOptions.h"
-#include "qpid/client/Channel.h"
 #include "qpid/client/Connection.h"
 #include "qpid/client/Message.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Time.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MessageTransferBody.h"
 
+using namespace qpid;
 using namespace qpid::client;
-using namespace qpid::sys;
+using qpid::framing::FrameSet;
+using qpid::framing::MessageTransferBody;
 using std::string;
 
-
-/**
- * A simple message listener implementation that prints out the
- * message content then notifies a montitor allowing the test to
- * complete.
- */
-class SimpleListener : public virtual MessageListener{
-    Monitor* monitor;
-    bool verbose;
-
-public:
-    inline SimpleListener(Monitor* _monitor, bool debug) : monitor(_monitor), verbose(debug) {}
-
-    inline virtual void received(Message& msg){
-        if (verbose)
-            std::cout << "Received message " << msg.getData()  << std::endl;
-	monitor->notify();
+struct Args : public qpid::TestOptions {
+    uint msgSize;
+    uint maxFrameSize;
+
+    Args() : msgSize(26), maxFrameSize(65535) 
+    {
+        addOptions()            
+            ("size", optValue(msgSize, "N"), "message size")
+            ("max-frame-size", optValue(maxFrameSize, "N"), "max frame size");
     }
 };
 
+const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+
+std::string generateData(uint size)
+{
+    if (size < chars.length()) {
+        return chars.substr(0, size);
+    }   
+    std::string data;
+    for (uint i = 0; i < (size / chars.length()); i++) {
+        data += chars;
+    }
+    data += chars.substr(0, size % chars.length());
+    return data;
+}
+
+void print(const std::string& text, const Message& msg)
+{
+    std::cout << text;
+    if (msg.getData().size() > 16) {
+        std::cout << msg.getData().substr(0, 16) << "...";
+    } else {
+        std::cout << msg.getData();
+    }
+    std::cout << std::endl;
+}
+
 int main(int argc, char** argv)
 {
     try {
-        qpid::TestOptions opts;
+        Args opts;
         opts.parse(argc, argv);
-            
-        //Use a custom exchange
-	Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE);
-        //Use a named, temporary queue
-	Queue queue("MyQueue", true);
-
- 	
-	Connection con(opts.trace);
-	con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost);
-	if (opts.trace)
-	    std::cout << "Opened connection." << std::endl;
-
-        //Create and open a channel on the connection through which
-        //most functionality is exposed
-	Channel channel;      
-	con.openChannel(channel);
-	if (opts.trace) std::cout << "Opened channel." << std::endl;	
+
+        //Connect to the broker:
+        Connection connection(opts.trace, opts.maxFrameSize);
+        opts.open(connection);
+	if (opts.trace) std::cout << "Opened connection." << std::endl;
+
+        //Create and open a session on the connection through which
+        //most functionality is exposed:
+        Session_0_10 session = connection.newSession();
+	if (opts.trace) std::cout << "Opened session." << std::endl;	
+
 
         //'declare' the exchange and the queue, which will create them
         //as they don't exist
-	channel.declareExchange(exchange);
+	session.exchangeDeclare(arg::exchange="MyExchange", arg::type="direct");
 	if (opts.trace) std::cout << "Declared exchange." << std::endl;
-	channel.declareQueue(queue);
+	session.queueDeclare(arg::queue="MyQueue", arg::autoDelete=true, arg::exclusive=true);
 	if (opts.trace) std::cout << "Declared queue." << std::endl;
 
         //now bind the queue to the exchange
-	channel.bind(exchange, queue, "MyTopic");
+	session.queueBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::routingKey="MyKey");
 	if (opts.trace) std::cout << "Bound queue to exchange." << std::endl;
 
-	//Set up a message listener to receive any messages that
-	//arrive in our queue on the broker. We only expect one, and
-	//as it will be received on another thread, we create a
-	//montior to use to notify the main thread when that message
-	//is received.
-	Monitor monitor;
-	SimpleListener listener(&monitor, opts.trace);
-	string tag("MyTag");
-	channel.consume(queue, tag, &listener);
-	if (opts.trace) std::cout << "Registered consumer." << std::endl;
-
-        //we need to enable the message dispatching for this channel
-        //and we want that to occur on another thread so we call
-        //start().
-	channel.start();
-
-        //Now we create and publish a message to our exchange with a
-        //routing key that will cause it to be routed to our queue
-	Message msg;
-	string data("MyMessage");
-	msg.setData(data);
-	channel.publish(msg, exchange, "MyTopic");
-	if (opts.trace) std::cout << "Published message: " << data << std::endl;
-
-	{
-            Monitor::ScopedLock l(monitor);
-            //now we wait until we receive notification that the
-            //message was received
-            monitor.wait();
+        //create and send a message to the exchange using the routing
+        //key we bound our queue with:
+	Message msgOut(generateData(opts.msgSize));
+        msgOut.getDeliveryProperties().setRoutingKey("MyKey");
+        session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut);
+	if (opts.trace) print("Published message: ", msgOut);
+
+        //subscribe to the queue, add sufficient credit and then get
+        //incoming 'frameset', check that its a message transfer and
+        //then convert it to a message (see Dispatcher and
+        //SubscriptionManager utilties for common reusable patterns at
+        //a higher level)
+        session.messageSubscribe(arg::queue="MyQueue", arg::destination="MyId");
+        session.messageFlow(arg::destination="MyId", arg::unit=0, arg::value=1); //credit for one message
+        session.messageFlow(arg::destination="MyId", arg::unit=1, arg::value=0xFFFFFFFF); //credit for infinite bytes
+	if (opts.trace) std::cout << "Subscribed to queue." << std::endl;
+        FrameSet::shared_ptr incoming = session.get();
+        if (incoming->isA<MessageTransferBody>()) {
+            Message msgIn(*incoming, session);
+            if (msgIn.getData() == msgOut.getData()) {
+                if (opts.trace) std::cout << "Received the exepected message." << std::endl;
+                msgIn.acknowledge();
+            } else {
+                print("Received an unexepected message: ", msgIn);
+            }
         }
         
-        //close the channel & connection
-	channel.close();
-	if (opts.trace) std::cout << "Closed channel." << std::endl;
-	con.close();	
+        //close the session & connection
+	session.close();
+	if (opts.trace) std::cout << "Closed session." << std::endl;
+	connection.close();	
 	if (opts.trace) std::cout << "Closed connection." << std::endl;
     return 0;
     } catch(const std::exception& e) {