You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/11/30 15:37:52 UTC

svn commit: r599832 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/client/Dispatcher.cpp qpid/client/SubscriptionManager.cpp qpid/client/SubscriptionManager.h tests/topic_listener.cpp tests/topic_publisher.cpp

Author: gsim
Date: Fri Nov 30 06:37:45 2007
New Revision: 599832

URL: http://svn.apache.org/viewvc?rev=599832&view=rev
Log:
Altered topic test to use the new session api.
Exposed start() through the subscription manager in addition to run(). 


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Fri Nov 30 06:37:45 2007
@@ -69,18 +69,22 @@
     boost::state_saver<bool>  reset(running); // Reset to false on exit.
     running = true;
     queue->open();
-    while (!queue->isClosed()) {
-        Mutex::ScopedUnlock u(lock);
-        FrameSet::shared_ptr content = queue->pop();
-        if (content->isA<MessageTransferBody>()) {
-            Message msg(*content, session);
-            Subscriber::shared_ptr listener = find(msg.getDestination());
-            assert(listener);
-            listener->received(msg);
-        } else {
-            assert (handler.get());
-            handler->handle(*content);
+    try {
+        while (!queue->isClosed()) {
+            Mutex::ScopedUnlock u(lock);
+            FrameSet::shared_ptr content = queue->pop();
+            if (content->isA<MessageTransferBody>()) {
+                Message msg(*content, session);
+                Subscriber::shared_ptr listener = find(msg.getDestination());
+                assert(listener);
+                listener->received(msg);
+            } else {
+                assert (handler.get());
+                handler->handle(*content);
+            }
         }
+    } catch (const ClosedException&) {
+        //ignore it and return
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Fri Nov 30 06:37:45 2007
@@ -102,6 +102,11 @@
     dispatcher.stop();
 }
 
+void SubscriptionManager::start()
+{
+    dispatcher.start();
+}
+
 }} // namespace qpid::client
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Fri Nov 30 06:37:45 2007
@@ -82,8 +82,12 @@
      */
     void run(bool autoStop=true);
 
+    /** Deliver messages in another thread. */
+    void start();
+
     /** Cause run() to return */
     void stop();
+
 
     static const uint32_t UNLIMITED=0xFFFFFFFF;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Fri Nov 30 06:37:45 2007
@@ -33,11 +33,10 @@
  */
 
 #include "TestOptions.h"
-#include "qpid/client/Channel.h"
 #include "qpid/client/Connection.h"
-#include "qpid/client/Exchange.h"
 #include "qpid/client/MessageListener.h"
-#include "qpid/client/Queue.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/FieldValue.h"
 #include <iostream>
@@ -54,7 +53,8 @@
  * defined.
  */
 class Listener : public MessageListener{    
-    Channel* const channel;
+    Session_0_10& session;
+    SubscriptionManager& mgr;
     const string responseQueue;
     const bool transactional;
     bool init;
@@ -64,7 +64,7 @@
     void shutdown();
     void report();
 public:
-    Listener(Channel* channel, const string& reponseQueue, bool tx);
+    Listener(Session_0_10& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
     virtual void received(Message& msg);
 };
 
@@ -72,14 +72,14 @@
  * A utility class for managing the options passed in.
  */
 struct Args : public qpid::TestOptions {
-    int ackmode;
+    int ack;
     bool transactional;
     int prefetch;
-    Args() : ackmode(NO_ACK), transactional(false), prefetch(1000) {
+    Args() : ack(0), transactional(false), prefetch(0) {
         addOptions()
-            ("ack", optValue(ackmode, "MODE"), "Ack mode: 0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
+            ("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)")
             ("transactional", optValue(transactional), "Use transactions")
-            ("prefetch", optValue(prefetch, "N"), "prefetch count");
+            ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)");
     }
 };
 
@@ -98,24 +98,35 @@
         else {
             cout << "topic_listener: Started." << endl;
             Connection connection(args.trace);
-            connection.open(args.host, args.port, args.username, args.password, args.virtualhost);
-            Channel channel(args.transactional, args.prefetch);
-            connection.openChannel(channel);
-        
+            args.open(connection);
+            Session_0_10 session = connection.newSession();
+            if (args.transactional) {
+                session.txSelect();
+            }
+
             //declare exchange, queue and bind them:
-            Queue response("response");
-            channel.declareQueue(response);
-        
-            Queue control;
-            channel.declareQueue(control);
-            qpid::framing::FieldTable bindArgs;
-            channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
+            session.queueDeclare(arg::queue="response");
+            std::string control = "control_" + session.getId().str();
+            session.queueDeclare(arg::queue=control);
+            session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control");
+
             //set up listener
-            Listener listener(&channel, response.getName(), args.transactional);
-            channel.consume(control, "c1", &listener, AckMode(args.ackmode));
+            SubscriptionManager mgr(session);
+            Listener listener(session, mgr, "response", args.transactional);
+            if (args.prefetch) {
+                mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2)));
+                mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true);
+            } else {
+                mgr.setConfirmMode(false);
+                mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+            }
+            mgr.subscribe(listener, control);
+
             cout << "topic_listener: Consuming." << endl;
-            channel.run();
-            cout << "topic_listener: run returned, closing connection" << endl;
+            mgr.run();
+            cout << "topic_listener: run returned, closing session" << endl;
+            session.close();
+            cout << "closing connection" << endl;
             connection.close();
             cout << "topic_listener: normal exit" << endl;
         }
@@ -126,8 +137,8 @@
     return 1;
 }
 
-Listener::Listener(Channel* _channel, const string& _responseq, bool tx) : 
-    channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){}
+Listener::Listener(Session_0_10& s, SubscriptionManager& m, const string& _responseq, bool tx) : 
+    session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){}
 
 void Listener::received(Message& message){
     if(!init){        
@@ -149,7 +160,7 @@
 }
 
 void Listener::shutdown(){
-    channel->close();
+    mgr.stop();
 }
 
 void Listener::report(){
@@ -158,11 +169,11 @@
     stringstream reportstr;
     reportstr << "Received " << count << " messages in "
               << time/TIME_MSEC << " ms.";
-    Message msg(reportstr.str());
+    Message msg(reportstr.str(), responseQueue);
     msg.getHeaders().setString("TYPE", "REPORT");
-    channel->publish(msg, string(), responseQueue);
+    session.messageTransfer(arg::content=msg);
     if(transactional){
-        channel->commit();
+        session.txCommit();
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp?rev=599832&r1=599831&r2=599832&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp Fri Nov 30 06:37:45 2007
@@ -35,11 +35,10 @@
  */
 
 #include "TestOptions.h"
-#include "qpid/client/Channel.h"
 #include "qpid/client/Connection.h"
-#include "qpid/client/Exchange.h"
 #include "qpid/client/MessageListener.h"
-#include "qpid/client/Queue.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
 #include "qpid/sys/Monitor.h"
 #include <unistd.h>
 #include "qpid/sys/Time.h"
@@ -57,7 +56,7 @@
  * back by the subscribers.
  */
 class Publisher : public MessageListener{    
-    Channel* const channel;
+    Session_0_10& session;
     const string controlTopic;
     const bool transactional;
     Monitor monitor;
@@ -67,7 +66,7 @@
     string generateData(int size);
 
 public:
-    Publisher(Channel* channel, const string& controlTopic, bool tx);
+    Publisher(Session_0_10& session, const string& controlTopic, bool tx);
     virtual void received(Message& msg);
     int64_t publish(int msgs, int listeners, int size);
     void terminate();
@@ -79,7 +78,7 @@
 struct Args : public TestOptions {
     int messages;
     int subscribers;
-    int ackmode;
+    int ack;
     bool transactional;
     int prefetch;
     int batches;
@@ -87,13 +86,13 @@
     int size;
 
     Args() : messages(1000), subscribers(1),
-             ackmode(NO_ACK), transactional(false), prefetch(1000),
+             ack(500), transactional(false), prefetch(1000),
              batches(1), delay(0), size(256)
     {
         addOptions()
             ("messages", optValue(messages, "N"), "how many messages to send")
             ("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from")
-            ("ackmode", optValue(ackmode, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
+            ("ack", optValue(ack, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
             ("transactional", optValue(transactional), "client should use transactions")
             ("prefetch", optValue(prefetch, "N"), "prefetch count")
             ("batches", optValue(batches, "N"), "how many batches to run")
@@ -110,18 +109,21 @@
             cout << args << endl;
         else {
             Connection connection(args.trace);
-            connection.open(args.host, args.port, args.username, args.password, args.virtualhost);
-            Channel channel(args.transactional, args.prefetch);
-            connection.openChannel(channel);
+            args.open(connection);
+            Session_0_10 session = connection.newSession();
+            if (args.transactional) {
+                session.txSelect();
+            }
+
 
             //declare queue (relying on default binding):
-            Queue response("response");
-            channel.declareQueue(response);
+            session.queueDeclare(arg::queue="response");
 
             //set up listener
-            Publisher publisher(&channel, "topic_control", args.transactional);
-            channel.consume(response, "mytag", &publisher, AckMode(args.ackmode));
-            channel.start();
+            SubscriptionManager mgr(session);
+            Publisher publisher(session, "topic_control", args.transactional);
+            mgr.subscribe(publisher, "response");
+            mgr.start();
 
             int batchSize(args.batches);
             int64_t max(0);
@@ -140,12 +142,13 @@
                           << " in " << msecs << "ms" << endl;
             }
             publisher.terminate();
+            mgr.stop();
             int64_t avg = sum / batchSize;
             if(batchSize > 1){
                 cout << batchSize << " batches completed. avg=" << avg << 
                     ", max=" << max << ", min=" << min << endl;
             }
-            channel.close();
+            session.close();
             connection.close();
         }
         return 0;
@@ -155,8 +158,8 @@
     return 1;
 }
 
-Publisher::Publisher(Channel* _channel, const string& _controlTopic, bool tx) : 
-    channel(_channel), controlTopic(_controlTopic), transactional(tx){}
+Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx) : 
+    session(_session), controlTopic(_controlTopic), transactional(tx){}
 
 void Publisher::received(Message& ){
     //count responses and when all are received end the current batch
@@ -172,21 +175,19 @@
 }
 
 int64_t Publisher::publish(int msgs, int listeners, int size){
-    Message msg;
-    msg.setData(generateData(size));
+    Message msg(generateData(size), controlTopic);
     AbsTime start = now();
     {
         Monitor::ScopedLock l(monitor);
         for(int i = 0; i < msgs; i++){
-            channel->publish(
-                msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+            session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
         }
         //send report request
-        Message reportRequest;
+        Message reportRequest("", controlTopic);
         reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
-        channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+        session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
         if(transactional){
-            channel->commit();
+            session.txCommit();
         }
 
         waitForCompletion(listeners);
@@ -206,11 +207,11 @@
 
 void Publisher::terminate(){
     //send termination request
-    Message terminationRequest;
+    Message terminationRequest("", controlTopic);
     terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
-    channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+    session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic");
     if(transactional){
-        channel->commit();
+        session.txCommit();
     }
 }