You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/07/13 17:53:11 UTC

svn commit: r556045 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/sys/ConcurrentQueue.h tests/ConcurrentQueue.cpp tests/Makefile.am

Author: aconway
Date: Fri Jul 13 08:53:10 2007
New Revision: 556045

URL: http://svn.apache.org/viewvc?view=rev&rev=556045
Log:

	* src/qpid/sys/ConcurrentQueue.h: Thread-safe queue with atomic pop()

	* src/tests/ConcurrentQueue.cpp: 

	  Experimental code to compare a dual-vector, dual-lock
	  implementation with a simple locked deque. Results indicate the
	  more complex design does not perform any better, so ConcurrentQueue.h
	  uses the simpler design.

	  Not part of default test harness, run by hand to see results.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h?view=auto&rev=556045
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h Fri Jul 13 08:53:10 2007
@@ -0,0 +1,67 @@
+#ifndef QPID_SYS_CONCURRENTQUEUE_H
+#define QPID_SYS_CONCURRENTQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+
+#include <deque>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Thread-safe queue that allows threads to push items onto
+ * the queue concurrently with threads popping items off the
+ * queue.
+ */
+template <class T> class ConcurrentQueue {
+  public:
+    /** Push a data item onto the back of the queue */
+    void push(const T& data) {
+        Mutex::ScopedLock l(lock);
+        queue.push_back(data);
+    }
+
+    /** If the queue is non-empty, pop the front item into data and
+     * return true. If the queue is empty, return false
+     */
+    bool pop(T& data) {
+        Mutex::ScopedLock l(lock);
+        if (queue.empty())
+            return false;
+        else {
+            data = queue.front();
+            queue.pop_front();
+            return true;
+        }
+    }
+    
+  private:
+    Mutex lock;
+    std::deque<T> queue;
+};
+
+}} // namespace qpid::sys
+
+
+#endif  /*!QPID_SYS_CONCURRENTQUEUE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp?view=auto&rev=556045
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp Fri Jul 13 08:53:10 2007
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file
+ * Compare alternative implementations for ConcurrentQueue.
+ */
+
+#include "qpid/sys/ConcurrentQueue.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Time.h"
+
+#include <boost/test/test_tools.hpp>
+#include <boost/bind.hpp>
+
+#include <deque>
+#include <vector>
+#include <iostream>
+
+#include "time.h"
+
+using namespace qpid::sys;
+using namespace std;
+
+template <class T> class DualVectorDualLockQueue {
+  public:
+    /** Optionally specify initial capacity of the queue to minimize
+     * re-allocation.
+     */
+    DualVectorDualLockQueue(size_t capacity=16) {
+        pushVec.reserve(capacity);
+        popVec.reserve(capacity);
+        popIter = popVec.end();
+    }
+    
+    /** Push a data item onto the back of the queue */
+    void push(const T& data) {
+        Mutex::ScopedLock l(pushLock);
+        pushVec.push_back(data);
+    }
+
+    /** If the queue is non-empty, pop the front item into data and
+     * return true. If the queue is empty, return false
+     */
+    bool pop(T& data) {
+        Mutex::ScopedLock l(popLock);
+        if (popIter == popVec.end()) {
+            popVec.clear();
+            Mutex::ScopedLock l(pushLock);
+            pushVec.swap(popVec);
+            popIter = popVec.begin();
+        }
+        if (popIter == popVec.end())
+            return false;
+        else {
+            data = *popIter++;
+            return true;
+        }
+    }
+
+  private:
+    Mutex pushLock, popLock;
+    std::vector<T> pushVec, popVec;
+    typename std::vector<T>::iterator popIter;
+};
+
+template <class T> struct LockedDequeQueue : public ConcurrentQueue<T> {
+    /** size_t ignored, can't pre-allocate space in a dequeue */
+    LockedDequeQueue(size_t=0) {};
+};
+
+// ================ Test code.
+
+/** Pause by sleeping */
+void nsleep(const Duration& delay) {
+    static Monitor m;
+    AbsTime stop(now(), delay);
+    while (now() < stop)
+        m.wait(stop);
+}
+
+/** Pause by spinning */
+void nspin(const Duration& delay) {
+    AbsTime stop(now(), delay);
+    while (now() < stop)
+        ;
+}
+
+/** Unlocked fake queue for comparison */
+struct NullQueue {
+    NullQueue(int items=0) : npush(items), npop(items) {}
+    void push(int) { --npush; }
+    bool pop(int& n) {
+        if (npop == 0)
+            return false;
+        else {
+            n=npop--;
+            return true;
+        }
+    }
+    volatile int npush, npop;
+};
+
+
+// Global test parameters.
+int items;
+Duration delay(0);
+boost::function<void()> npause;
+
+template <class Q>
+struct Pusher : public Runnable {
+    Pusher(Q& q) : queue(q) {}
+    void run() {
+        for (int i=items; i > 0; i--) {
+            queue.push(i);
+            npause();
+        }
+    }
+    Q& queue;
+};
+
+template <class Q>
+struct Popper : public Runnable {
+    Popper(Q& q) : queue(q) {}
+    void run() {
+        for (int i=items; i > 0; i--) {
+            int n;
+            if (queue.pop(n))
+                BOOST_REQUIRE_EQUAL(i,n);
+            npause();
+        }
+    }
+    Q& queue;
+};
+
+ostream& operator<<(ostream& out, const Duration& d) {
+    return out << double(d)/TIME_MSEC << " msecs";
+}
+
+void report(const char* s, const Duration &d) {
+    cout << s << ": " << d
+         << " (" << (double(items)*TIME_SEC)/d << " push-pops/sec" << ")"
+         << endl;
+}
+
+template <class Q, class PusherT=Pusher<Q>, class PopperT=Popper<Q> >
+struct Timer {
+    static Duration time() {
+        cout << endl << "==" << typeid(Q).name() << endl;
+    
+        Q queue(items);
+        PusherT pusher(queue);
+        PopperT popper(queue);
+
+        // Serial
+        AbsTime start=now();
+        pusher.run();
+        popper.run();
+        Duration serial(start,now());
+        report ("Serial", serial);
+
+        // Concurrent
+        start=now();
+        Thread pushThread(pusher);
+        Thread popThread(popper);
+        pushThread.join();
+        popThread.join();
+        Duration concurrent(start,now());
+        report ("Concurrent", concurrent);
+
+        cout << "Serial/concurrent: " << double(serial)/concurrent << endl;
+        return concurrent;
+    }
+};
+
+int test_main(int argc, char** argv) {
+    items = (argc > 1) ? atoi(argv[1]) : 250*1000;
+    delay = (argc > 2) ? atoi(argv[2]) : 4*1000; 
+    npause=boost::bind(nspin, delay);
+
+    cout << "Push/pop " << items << " items, delay=" <<  delay << endl;
+    Timer<NullQueue>::time();
+    Duration dv = Timer<DualVectorDualLockQueue<int> >::time();
+    Duration d = Timer<LockedDequeQueue<int> >::time();
+    cout << endl;
+    cout << "Ratio deque/dual vector=" << double(d)/dv << endl;
+    return 0;
+}
+// namespace 

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=556045&r1=556044&r2=556045
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Jul 13 08:53:10 2007
@@ -42,6 +42,10 @@
 Shlib_SOURCES=Shlib.cpp
 Shlib_LDADD=-lboost_unit_test_framework $(lib_common)
 
+check_PROGRAMS+=ConcurrentQueue
+ConcurrentQueue_SOURCES=ConcurrentQueue.cpp
+ConcurrentQueue_LDADD=-lboost_test_exec_monitor $(lib_common)
+
 include cluster.mk
 
 # NB: CppUnit test libraries below will be migrated to boost test programs.