You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/12/04 15:19:07 UTC

svn commit: r600962 - in /incubator/qpid/trunk/qpid/cpp/src/tests: topic_listener.cpp topic_publisher.cpp

Author: gsim
Date: Tue Dec  4 06:19:06 2007
New Revision: 600962

URL: http://svn.apache.org/viewvc?rev=600962&view=rev
Log:
Updates to topic test


Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=600962&r1=600961&r2=600962&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Tue Dec  4 06:19:06 2007
@@ -99,7 +99,6 @@
         if(args.help)
             cout << args << endl;
         else {
-            cout << "topic_listener: Started." << endl;
             Connection connection(args.trace);
             args.open(connection);
             Session_0_10 session = connection.newSession();
@@ -110,7 +109,11 @@
             //declare exchange, queue and bind them:
             session.queueDeclare(arg::queue="response");
             std::string control = "control_" + session.getId().str();
-            session.queueDeclare(arg::queue=control, arg::durable=args.durable);
+            if (args.durable) {
+                session.queueDeclare(arg::queue=control, arg::durable=true);
+            } else {
+                session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true);
+            }
             session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control");
 
             //set up listener
@@ -125,13 +128,14 @@
             }
             mgr.subscribe(listener, control);
 
-            cout << "topic_listener: Consuming." << endl;
+            cout << "topic_listener: listening..." << endl;
             mgr.run();
-            cout << "topic_listener: run returned, closing session" << endl;
+            if (args.durable) {
+                session.queueDelete(arg::queue=control);
+            }
             session.close();
             cout << "closing connection" << endl;
             connection.close();
-            cout << "topic_listener: normal exit" << endl;
         }
         return 0;
     } catch (const std::exception& error) {
@@ -148,16 +152,19 @@
         start = now();
         count = 0;
         init = true;
+        cout << "Batch started." << endl;
     }
     FieldTable::ValuePtr type(message.getHeaders().get("TYPE"));
 
     if(!!type && StringValue("TERMINATION_REQUEST") == *type){
         shutdown();
     }else if(!!type && StringValue("REPORT_REQUEST") == *type){
+        message.acknowledge();//acknowledge everything upto this point
+        cout <<"Batch ended, sending report." << endl;
         //send a report:
         report();
         init = false;
-    }else if (++count % 100 == 0){        
+    }else if (++count % 1000 == 0){        
         cout <<"Received " << count << " messages." << endl;
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp?rev=600962&r1=600961&r2=600962&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp Tue Dec  4 06:19:06 2007
@@ -55,20 +55,18 @@
  * message listener and can therfore be used to receive messages sent
  * back by the subscribers.
  */
-class Publisher : public MessageListener{    
+class Publisher {    
     Session_0_10& session;
+    SubscriptionManager mgr;
+    LocalQueue queue;
     const string controlTopic;
     const bool transactional;
     const bool durable;
-    Monitor monitor;
-    int count;
     
-    void waitForCompletion(int msgs);
     string generateData(int size);
 
 public:
     Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool durable);
-    virtual void received(Message& msg);
     int64_t publish(int msgs, int listeners, int size);
     void terminate();
 };
@@ -118,11 +116,7 @@
             //declare queue (relying on default binding):
             session.queueDeclare(arg::queue="response");
 
-            //set up listener
-            SubscriptionManager mgr(session);
             Publisher publisher(session, "topic_control", args.transactional, args.durable);
-            mgr.subscribe(publisher, "response");
-            mgr.start();
 
             int batchSize(args.batches);
             int64_t max(0);
@@ -141,7 +135,6 @@
                           << " in " << msecs << "ms" << endl;
             }
             publisher.terminate();
-            mgr.stop();
             int64_t avg = sum / batchSize;
             if(batchSize > 1){
                 cout << batchSize << " batches completed. avg=" << avg << 
@@ -158,19 +151,9 @@
 }
 
 Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx, bool d) : 
-    session(_session), controlTopic(_controlTopic), transactional(tx), durable(d), count(0) {}
-
-void Publisher::received(Message& ){
-    //count responses and when all are received end the current batch
-    Monitor::ScopedLock l(monitor);
-    if(--count == 0){
-        monitor.notify();
-    }
-}
-
-void Publisher::waitForCompletion(int msgs){
-    count = msgs;
-    monitor.wait();
+    session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d) 
+{
+    mgr.subscribe(queue, "response");
 }
 
 int64_t Publisher::publish(int msgs, int listeners, int size){
@@ -179,21 +162,25 @@
         msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
     }
     AbsTime start = now();
-    {
-        Monitor::ScopedLock l(monitor);
-        for(int i = 0; i < msgs; i++){
-            session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
-        }
-        //send report request
-        Message reportRequest("", controlTopic);
-        reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
-        session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
-        if(transactional){
-            session.txCommit();
-        }
-
-        waitForCompletion(listeners);
+    
+    for(int i = 0; i < msgs; i++){
+        session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
     }
+    //send report request
+    Message reportRequest("", controlTopic);
+    reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+    session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
+    if(transactional){
+        session.txCommit();
+    }    
+    //wait for a response from each listener (TODO, could log these)
+    for (int i = 0; i < listeners; i++) {
+        Message report = queue.pop();
+    }
+
+    if(transactional){
+        session.txCommit();
+    }    
 
     AbsTime finish = now();
     return Duration(start, finish);