You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/08/15 16:56:53 UTC

svn commit: r566211 - in /incubator/qpid/trunk/qpid/cpp: perftest/ src/tests/ src/tests/Makefile.am src/tests/TestOptions.h src/tests/perftest.cpp

Author: aconway
Date: Wed Aug 15 07:56:51 2007
New Revision: 566211

URL: http://svn.apache.org/viewvc?view=rev&rev=566211
Log:

	* perftest/topic_publisher.cpp, topic_listener.cpp:
	Combined into a single preftest.cpp executable and moved to
	src/tests.
	New perftest:
	- Supports all client-side options (--host, --port etc.)
	- Can be run as producer (--listen), consumer (--publish) or both.
	- --count specifies number of messages (default 500000 as before)

Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/perftest/
Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Aug 15 07:56:51 2007
@@ -33,3 +33,4 @@
 Visitor
 qpidd.vglog
 Frame
+perftest

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=566211&r1=566210&r2=566211
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Aug 15 07:56:51 2007
@@ -60,6 +60,13 @@
 
 include cluster.mk
 
+#
+# Other test programs
+#
+check_PROGRAMS+=perftest
+perftest_SOURCES=perftest.cpp test_tools.h
+perftest_LDADD=$(lib_client) 
+
 # NB: CppUnit test libraries below will be migrated to boost test programs.
 #
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h?view=diff&rev=566211&r1=566210&r2=566211
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h Wed Aug 15 07:56:51 2007
@@ -25,6 +25,7 @@
 #include "qpid/log/Options.h"
 #include "qpid/Url.h"
 #include "qpid/log/Logger.h"
+#include "qpid/client/Connection.h"
 
 #include <iostream>
 #include <exception>
@@ -63,6 +64,12 @@
         trace = log.trace;
         qpid::log::Logger::instance().configure(log, argv[0]);
     }
+
+    /** Open a connection usin option values */
+    void open(qpid::client::Connection& connection) {
+        connection.open(host, port, username, password, virtualhost);
+    }
+
     
     std::string host;
     uint16_t port;

Added: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?view=auto&rev=566211
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Wed Aug 15 07:56:51 2007
@@ -0,0 +1,298 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "TestOptions.h"
+
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/ClientExchange.h"
+#include "qpid/client/ClientQueue.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/QpidError.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
+
+#include <iostream>
+#include <cstdlib>
+#include <iomanip>
+#include <time.h>
+#include <unistd.h>
+
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::sys;
+using namespace std;
+
+struct Opts : public TestOptions {
+
+    bool listen;
+    bool publish;
+    int count;
+    
+    Opts() : listen(false), publish(false), count(500000) {
+        addOptions() 
+            ("listen", optValue(listen), "Consume messages.")
+            ("publish", optValue(publish), "Produce messages.")
+            ("count", optValue(count, "N"), "Messages to send/receive.");
+    }
+};
+
+Opts opts;
+
+struct ListenThread : public Runnable { Thread thread; void run(); };
+struct PublishThread : public Runnable { Thread thread; void run(); };
+    
+int main(int argc, char** argv) {
+    try {
+        opts.parse(argc, argv);
+        ListenThread listen;
+        PublishThread publish;
+        if (opts.listen)
+            listen.thread=Thread(listen);
+        if (opts.publish)
+            publish.thread=Thread(publish);
+        if (opts.listen)
+            listen.thread.join();
+        if (opts.publish)
+            publish.thread.join();
+    }
+    catch (const std::exception& e) {
+        cout << "Unexpected exception: " << e.what() << endl;
+    }
+}
+
+// ================================================================
+// Publish client
+//
+
+struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) {
+    timespec r;
+    r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec;
+    r.tv_sec = lhs.tv_sec - rhs.tv_sec;
+    if (r.tv_nsec < 0) {
+        r.tv_nsec += 1000000000;
+        r.tv_sec -= 1;
+    }
+    return r;
+}
+
+ostream& operator<<(ostream& o, const struct timespec& ts) {
+    o << ts.tv_sec << "." << setw(9) << setfill('0') << right << ts.tv_nsec;
+    return o;
+}
+
+double toDouble(const struct timespec& ts) {
+    return double(ts.tv_nsec)/1000000000 + ts.tv_sec;
+}
+
+class PublishListener : public MessageListener {
+
+    void set_time() {
+        timespec ts;
+        if (::clock_gettime(CLOCK_REALTIME, &ts))
+            throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
+        startTime = ts;
+    }
+
+    void print_time() {
+        timespec ts;
+        if (::clock_gettime(CLOCK_REALTIME, &ts))
+            throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
+        cout << "Total Time:" << ts-startTime << endl;
+        double rate = messageCount*2/toDouble(ts-startTime);
+        cout << "returned Messages:" << messageCount  << endl;
+        cout << "round trip Rate:" << rate  << endl;
+    }
+
+    struct timespec startTime;
+    int messageCount;
+    bool done;
+    Monitor lock;
+    
+  public:
+ 
+    PublishListener(int mcount): messageCount(mcount), done(false) {
+        set_time();
+    }
+	
+    void received(Message& msg) {
+	print_time();
+	QPID_LOG(info, "Publisher: received: " << msg.getData());
+        Mutex::ScopedLock l(lock);
+        QPID_LOG(info, "Publisher: done.");
+        done = true;
+        lock.notify();
+    }
+
+    void wait() {
+        Mutex::ScopedLock l(lock);
+        while (!done)
+            lock.wait();
+    }
+};
+
+
+void PublishThread::run() {
+    Connection connection;
+    Channel channel;
+    Message msg;
+    opts.open(connection);
+    connection.openChannel(channel);
+    channel.start();
+
+    cout << "Started publisher." << endl;
+    string queueControl = "control";
+    Queue response(queueControl);
+    channel.declareQueue(response);
+    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl);
+        
+    string  queueName ="queue01";
+    string  queueNameC =queueName+"-1";
+
+    // create publish queue
+    Queue publish(queueName);
+    channel.declareQueue(publish);
+    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName);
+  
+    // create completion queue
+    Queue completion(queueNameC);
+    channel.declareQueue(completion);
+    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC);
+      
+    // pass queue name
+    msg.setData(queueName);
+    channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl);
+
+    QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC);
+	
+    int count = opts.count;
+    PublishListener listener(count);
+    channel.consume(completion, queueNameC, &listener);
+    QPID_LOG(info, "Publisher setup consumer: "<< queueNameC);
+
+    struct timespec startTime;
+    if (::clock_gettime(CLOCK_REALTIME, &startTime))
+        throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
+
+    for (int i=0; i<count; i++) {
+        msg.setData("Message 0123456789 ");
+        channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName);
+    }
+
+    struct timespec endTime;
+    if (::clock_gettime(CLOCK_REALTIME, &endTime))
+        throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
+
+    cout << "publish Time:" << endTime-startTime << endl;
+    double rate = count/toDouble(endTime-startTime);
+    cout << "publish Messages:" << count  << endl;
+    cout << "publish Rate:" << rate  << endl;
+
+    msg.setData(queueName);  // last message to queue.
+    channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName);
+	
+    listener.wait();
+ 
+    channel.close();
+    connection.close();
+}
+
+
+
+// ================================================================
+// Listen client
+//
+
+class Listener : public MessageListener{
+    string queueName;
+    Monitor lock;
+    bool done;
+
+  public:
+    Listener (string& _queueName): queueName(_queueName), done(false) {};
+
+    void received(Message& msg) {
+        if (msg.getData() == queueName)
+        {
+            Mutex::ScopedLock l(lock);
+            QPID_LOG(info, "Listener: done. " << queueName);
+            done = true;
+            lock.notify();
+        }
+    }
+    
+    void wait() {
+        Mutex::ScopedLock l(lock);
+        while (!done)
+            lock.wait();
+    }
+};
+
+void ListenThread::run() {
+    Connection connection;
+    Channel channel;
+    Message msg;
+    Message msg1;
+    cout << "Started listener." << endl;;
+    opts.open(connection);
+    connection.openChannel(channel);
+    channel.start();
+
+    string queueControl = "control";
+    Queue response(queueControl);
+    channel.declareQueue(response);
+    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl);	
+    while (!channel.get(msg, response, AUTO_ACK))	{
+        QPID_LOG(info, "Listener: waiting for queue name.");
+        sleep(1);
+    }
+    string  queueName =msg.getData();
+    string  queueNameC =queueName+ "-1";
+
+    QPID_LOG(info, "Listener: Using Queue:" << queueName);
+    QPID_LOG(info, "Listener: Reply Queue:" << queueNameC);
+    // create consume queue
+    Queue consume(queueName);
+    channel.declareQueue(consume);
+    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName);
+ 
+    // create completion queue
+    Queue completion(queueNameC);
+    channel.declareQueue(completion);
+    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC);
+
+    Listener listener(queueName);
+    channel.consume(consume, queueName, &listener);
+    QPID_LOG(info, "Listener: consuming...");
+
+    listener.wait();
+      
+    QPID_LOG(info, "Listener: send final message.");
+    // complete.
+    msg1.setData(queueName);
+    channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC);
+
+    channel.close();
+    connection.close();
+}
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date