You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/14 16:02:12 UTC

svn commit: r507560 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/ lib/client/ lib/common/ lib/common/sys/ tests/

Author: aconway
Date: Wed Feb 14 07:02:10 2007
New Revision: 507560

URL: http://svn.apache.org/viewvc?view=rev&rev=507560
Log:

* cpp/lib/common/sys/ProducerConsumer.h:

  General-purpose producer-consumer synchronization. Anywhere we have
  producer/consumer threads in qpid we should re-use this sync object
  rather than re-inventing the synchronization each time.

* cpp/lib/common/sys/AtomicCount.h: Separated ScopedIncrement/ScopedDecrement
  into ScopedIncrement.h

* cpp/tests/InProcessBroker.h: Added class InProcessBrokerClient, a
  self contained in-process client + broker convenience for tests.

Added:
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Wed Feb 14 07:02:10 2007
@@ -327,9 +327,9 @@
 )
 {
     try{
-        if(id != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
+        if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
             std::stringstream out;
-            out << "Attempt to use unopened channel: " << id;
+            out << "Attempt to use unopened channel: " << getId();
             throw ConnectionException(504, out.str());
         } else {
             method->invoke(*adapter, context);

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Wed Feb 14 07:02:10 2007
@@ -78,7 +78,6 @@
     typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
 
     Connection& connection;
-    u_int16_t id;
     u_int64_t currentDeliveryTag;
     Queue::shared_ptr defaultQueue;
     bool transactional;

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Wed Feb 14 07:02:10 2007
@@ -54,11 +54,6 @@
     return *proxy;
 }
 
-AMQMethodBody::shared_ptr Channel::brokerResponse() {
-    // FIXME aconway 2007-02-08: implement responses.
-    return AMQMethodBody::shared_ptr();
-}
-
 void Channel::open(ChannelId id, Connection& con)
 {
     if (isOpen())
@@ -482,7 +477,6 @@
     u_int16_t code, const std::string& text,
     ClassId classId, MethodId methodId)
 {
-    // FIXME aconway 2007-01-26: Locking?
     if (getId() != 0 && isOpen()) {
         try {
             sendAndReceive<ChannelCloseOkBody>(

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Wed Feb 14 07:02:10 2007
@@ -44,6 +44,7 @@
 namespace framing {
 class ChannelCloseBody;
 class AMQP_ServerProxy;
+class AMQMethodBody;
 }
 
 namespace client {
@@ -89,10 +90,13 @@
         u_int64_t lastDeliveryTag;
     };
     typedef std::map<std::string, Consumer> ConsumerMap;
+    typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods;
+    
     static const std::string OK;
         
     Connection* connection;
     sys::Thread dispatcher;
+    IncomingMethods incomingMethods;
     IncomingMessage* incoming;
     ResponseHandler responses;
     std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
@@ -367,12 +371,12 @@
      * Returns a proxy for the "raw" AMQP broker protocol. Only for use by
      * protocol experts.
      */
-
     framing::AMQP_ServerProxy& brokerProxy();
+
     /**
      * Wait for the next method from the broker.
      */
-    framing::AMQMethodBody::shared_ptr brokerResponse();
+    framing::AMQMethodBody::shared_ptr receive();
 };
 
 }}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am Wed Feb 14 07:02:10 2007
@@ -85,7 +85,8 @@
   ExceptionHolder.cpp				\
   QpidError.cpp					\
   sys/Runnable.cpp				\
-  sys/Time.cpp
+  sys/Time.cpp					\
+  sys/ProducerConsumer.cpp
 
 nobase_pkginclude_HEADERS =			\
   $(gen)/AMQP_HighestVersion.h			\
@@ -132,7 +133,8 @@
   sys/Socket.h					\
   sys/Thread.h					\
   sys/Time.h					\
-  sys/TimeoutHandler.h
+  sys/TimeoutHandler.h				\
+  sys/ProducerConsumer.h
 
 
 # Force build during dist phase so help2man will work.

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h Wed Feb 14 07:02:10 2007
@@ -20,7 +20,7 @@
  */
 
 #include <boost/detail/atomic_count.hpp>
-#include <boost/noncopyable.hpp>
+#include "ScopedIncrement.h"
 
 namespace qpid {
 namespace sys {
@@ -30,26 +30,8 @@
  */
 class AtomicCount : boost::noncopyable {
   public:
-    class ScopedDecrement : boost::noncopyable {
-      public:
-        /** Decrement counter in constructor and increment in destructor. */
-        ScopedDecrement(AtomicCount& c) : count(c) { value = --count; }
-        ~ScopedDecrement() { ++count; }
-        /** Return the value returned by the decrement. */
-        operator long() { return value; }
-      private:
-        AtomicCount& count;
-        long value;
-    };
-
-    class ScopedIncrement : boost::noncopyable {
-      public:
-        /** Increment counter in constructor and increment in destructor. */
-        ScopedIncrement(AtomicCount& c) : count(c) { ++count; }
-        ~ScopedIncrement() { --count; }
-      private:
-        AtomicCount& count;
-    };
+    typedef ScopedDecrement<AtomicCount> ScopedDecrement;
+    typedef ScopedIncrement<AtomicCount> ScopedIncrement;
 
     AtomicCount(long value = 0) : count(value) {}
     

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp?view=auto&rev=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp Wed Feb 14 07:02:10 2007
@@ -0,0 +1,141 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "QpidError.h"
+#include "ScopedIncrement.h"
+#include "ProducerConsumer.h"
+
+namespace qpid {
+namespace sys {
+
+// // ================ ProducerConsumer
+
+ProducerConsumer::ProducerConsumer(size_t init_items)
+    : items(init_items), waiters(0), stopped(false)
+{}
+
+void ProducerConsumer::stop() {
+    Mutex::ScopedLock l(monitor);
+    stopped = true;
+    monitor.notifyAll();
+    // Wait for waiting consumers to wake up.
+    while (waiters > 0)
+        monitor.wait();
+}
+
+size_t ProducerConsumer::available() const {
+    Mutex::ScopedLock l(monitor);
+    return items;
+}
+
+size_t ProducerConsumer::consumers() const {
+    Mutex::ScopedLock l(monitor);
+    return waiters;
+}
+
+// ================ Lock
+
+ProducerConsumer::Lock::Lock(ProducerConsumer& p)
+    : pc(p), lock(p.monitor), status(INCOMPLETE) {}
+
+bool ProducerConsumer::Lock::isOk() const {
+    return !pc.isStopped() && status==INCOMPLETE;
+}
+    
+void ProducerConsumer::Lock::checkOk() const {
+    assert(!pc.isStopped());
+    assert(status == INCOMPLETE);
+}
+
+ProducerConsumer::Lock::~Lock() {
+    assert(status != INCOMPLETE || pc.isStopped());
+}
+
+void ProducerConsumer::Lock::confirm() {
+    checkOk();
+    status = CONFIRMED;
+}
+    
+void ProducerConsumer::Lock::cancel() {
+    checkOk();
+    status = CANCELLED;
+}
+
+// ================ ProducerLock
+
+ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p)
+{}
+
+
+ProducerConsumer::ProducerLock::~ProducerLock() {
+    if (status == CONFIRMED) {
+        pc.items++;
+        pc.monitor.notify(); // Notify a consumer. 
+    }
+}
+
+// ================ ConsumerLock
+    
+ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p)
+{
+    if (isOk()) {
+        ScopedIncrement<size_t> inc(pc.waiters);
+        while (pc.items == 0 && !pc.stopped) {
+            pc.monitor.wait();
+        }
+    }
+}
+
+ProducerConsumer::ConsumerLock::ConsumerLock(
+    ProducerConsumer& p, const Time& timeout) : Lock(p)
+{
+    if (isOk()) {
+        // Don't wait if timeout==0
+        if (timeout == 0) {
+            if (pc.items == 0)
+                status = TIMEOUT;
+            return;
+        }
+        else {
+            Time deadline = now() + timeout;
+            ScopedIncrement<size_t> inc(pc.waiters);
+            while (pc.items == 0 && !pc.stopped) {
+                if (!pc.monitor.wait(deadline)) {
+                    status = TIMEOUT;
+                    return;
+                }
+            }
+        }
+    }
+}
+
+ProducerConsumer::ConsumerLock::~ConsumerLock() {
+    if (pc.isStopped())  {
+        if (pc.waiters == 0)
+            pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s)
+    }
+    else if (status==CONFIRMED) {
+        pc.items--;
+        if (pc.items > 0)
+            pc.monitor.notify();    // Notify another consumer.
+    }
+}
+
+
+}} // namespace qpid::sys

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h?view=auto&rev=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h Wed Feb 14 07:02:10 2007
@@ -0,0 +1,165 @@
+#ifndef _sys_ProducerConsumer_h
+#define _sys_ProducerConsumer_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include "Exception.h"
+#include "sys/Monitor.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Producer-consumer synchronisation.
+ * 
+ * Producers increase the number of available items, consumers reduce it.
+ * Consumers wait till an item is available. Waiting threads can be
+ * woken for shutdown using stop().
+ *
+ * Note: Currently implements unbounded producer-consumer, i.e. no limit
+ * to available items, producers never block. Can be extended to support
+ * bounded PC if required.
+ *
+ // TODO aconway 2007-02-13: example, from tests.
+*/
+class ProducerConsumer
+{
+  public:
+    ProducerConsumer(size_t init_items=0);
+
+    ~ProducerConsumer() { stop(); }
+
+    /**
+     * Wake any threads waiting for ProducerLock or ConsumerLock.
+     *@post No threads are waiting in Producer or Consumer locks.
+     */
+    void stop();
+
+    /** True if queue is stopped */
+    bool isStopped() { return stopped; }
+
+    /** Number of items available for consumers */
+    size_t available() const;
+
+    /** Number of consumers waiting for items */
+    size_t consumers() const;
+
+    /** True if available == 0 */
+    bool empty() const { return available() == 0; }
+
+    /**
+     * Base class for producer and consumer locks.
+     */
+    class Lock : private boost::noncopyable {
+      public:
+
+        /**
+         * You must call isOk() after creating a lock to verify its state.
+         * 
+         *@return true means the lock succeeded. You MUST call either
+         *confirm() or cancel() before the lock goes out of scope.
+         *
+         * false means the lock failed - timed out or the
+         * ProducerConsumer is stopped. You should not do anything in
+         * the scope of the lock.
+         */
+        bool isOk() const;
+
+        /**
+         * Confirm that an item was produced/consumed.
+         *@pre isOk()
+         */
+        void confirm();
+
+        /**
+         * Cancel the lock to indicate nothing was produced/consumed.
+         * Note that locks are not actually released until destroyed.
+         *
+         *@pre isOk()
+         */
+        void cancel();
+
+        /** True if this lock experienced a timeout */
+        bool isTimedOut() const { return status == TIMEOUT; }
+
+        /** True if we have been stopped */
+        bool isStopped() const { return pc.isStopped(); }
+        
+        ProducerConsumer& pc;
+
+      protected:
+        /** Lock status */
+        enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT };
+
+        Lock(ProducerConsumer& p);
+        ~Lock();
+        void checkOk() const;
+        Mutex::ScopedLock lock;
+        Status status;
+    };
+    
+    /** Lock for code that produces items. */
+    struct ProducerLock : public Lock {
+        /**
+         * Acquire locks to produce an item.
+         *@post If isOk() the calling thread has exclusive access
+         * to produce an item.
+         */
+        ProducerLock(ProducerConsumer& p);
+
+        /** Release locks, signal waiting consumers if confirm() was called. */
+        ~ProducerLock();
+    };
+
+    /** Lock for code that consumes items */
+    struct ConsumerLock : public Lock {
+        /**
+         * Wait for an item to consume and acquire locks.
+         * 
+         *@post If isOk() there is at least one item available and the
+         *calling thread has exclusive access to consume it.
+         */
+        ConsumerLock(ProducerConsumer& p);
+
+        /**
+         * Wait up to timeout to acquire lock.
+         *@post If isOk() caller has a producer lock.
+         * If isTimedOut() there was a timeout.
+         * If neither then we were stopped.
+         */
+        ConsumerLock(ProducerConsumer& p, const Time& timeout);
+
+        /** Release locks */
+        ~ConsumerLock();
+    };
+
+  private:
+    mutable Monitor monitor;
+    size_t items;
+    size_t waiters;
+    bool stopped;
+
+  friend class Lock;
+  friend class ProducerLock;
+  friend class ConsumerLock;
+};
+
+}} // namespace qpid::sys
+
+#endif  /*!_sys_ProducerConsumer_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h?view=auto&rev=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h Wed Feb 14 07:02:10 2007
@@ -0,0 +1,59 @@
+#ifndef _posix_ScopedIncrement_h
+#define _posix_ScopedIncrement_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/** Increment counter in constructor and decrement in destructor. */
+template <class T>
+class ScopedIncrement : boost::noncopyable
+{
+  public:
+    ScopedIncrement(T& c) : count(c) { ++count; }
+    ~ScopedIncrement() { --count; }
+  private:
+    T& count;
+};
+
+
+/** Decrement counter in constructor and increment in destructor. */
+template <class T>
+class ScopedDecrement : boost::noncopyable
+{
+  public:
+    ScopedDecrement(T& c) : count(c) { value = --count; }
+    ~ScopedDecrement() { ++count; }
+
+    /** Return the value after the decrement. */
+    operator long() { return value; }
+
+  private:
+    T& count;
+    long value;
+};
+
+
+}}
+
+
+#endif // _posix_ScopedIncrement_h

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h Wed Feb 14 07:02:10 2007
@@ -26,6 +26,7 @@
 #include "broker/Broker.h"
 #include "broker/Connection.h"
 #include "client/Connector.h"
+#include "client/Connection.h"
 
 namespace qpid {
 namespace broker {
@@ -54,6 +55,7 @@
 class InProcessBroker : public client::Connector {
   public:
     enum Sender {CLIENT,BROKER};
+    
     struct Frame : public framing::AMQFrame {
         Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
         bool fromBroker() const { return from == BROKER; }
@@ -68,7 +70,7 @@
     };
     typedef std::vector<Frame> Conversation;
 
-    InProcessBroker(const framing::ProtocolVersion& ver) :
+    InProcessBroker(framing::ProtocolVersion ver) :
         Connector(ver),
         protocolInit(ver),
         broker(broker::Broker::create()),
@@ -77,6 +79,8 @@
         clientOut(CLIENT, conversation, &brokerConnection)
     {}
 
+    ~InProcessBroker() { broker->shutdown(); }
+
     void connect(const std::string& /*host*/, int /*port*/) {}
     void init() { brokerConnection.initiated(&protocolInit); }
     void close() {}
@@ -141,157 +145,22 @@
 
 }} // namespace qpid::broker
 
-#endif  /*!_tests_InProcessBroker_h*/
-#ifndef _tests_InProcessBroker_h
-#define _tests_InProcessBroker_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <vector>
-#include <iostream>
-#include <algorithm>
-
-#include "framing/AMQFrame.h"
-#include "broker/Broker.h"
-#include "broker/Connection.h"
-#include "client/Connector.h"
-
-namespace qpid {
-namespace broker {
-
-/** Make a copy of a frame body. Inefficient, only intended for tests. */
-// TODO aconway 2007-01-29: from should be const, need to fix
-// AMQPFrame::encode as const.
-framing::AMQFrame copy(framing::AMQFrame& from) {
-    framing::Buffer buffer(from.size());
-    from.encode(buffer);
-    buffer.flip();
-    framing::AMQFrame result;
-    result.decode(buffer);
-    return result;
-}
-
-/**
- * A broker that implements client::Connector allowing direct
- * in-process connection of client to broker. Used to write round-trip
- * tests without requiring an external broker process.
- *
- * Also allows you to "snoop" on frames exchanged between client & broker.
- * 
- * Use as follows:
- *
- \code
-        broker::InProcessBroker ibroker(version);
-        client::Connection clientConnection;
-        clientConnection.setConnector(ibroker);
-        clientConnection.open("");
-        ... use as normal
- \endcode
- *
- */
-class InProcessBroker : public client::Connector {
+/** An in-process client+broker all in one. */
+class InProcessBrokerClient : public qpid::client::Connection {
   public:
-    enum Sender {CLIENT,BROKER};
-    struct Frame : public framing::AMQFrame {
-        Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
-        bool fromBroker() const { return from == BROKER; }
-        bool fromClient() const { return from == CLIENT; }
-
-        template <class MethodType>
-        MethodType* asMethod() {
-            return dynamic_cast<MethodType*>(getBody().get());
-        }
-
-        Sender from;
-    };
-    typedef std::vector<Frame> Conversation;
-
-    InProcessBroker(const framing::ProtocolVersion& ver) :
-        Connector(ver),
-        protocolInit(ver),
-        broker(broker::Broker::create()),
-        brokerOut(BROKER, conversation),
-        brokerConnection(&brokerOut, *broker),
-        clientOut(CLIENT, conversation, &brokerConnection)
-    {}
-
-    void connect(const std::string& /*host*/, int /*port*/) {}
-    void init() { brokerConnection.initiated(&protocolInit); }
-    void close() {}
-
-    /** Client's input handler. */
-    void setInputHandler(framing::InputHandler* handler) {
-        brokerOut.in = handler;
+    qpid::broker::InProcessBroker broker;
+    
+    /** Constructor creates broker and opens client connection. */
+    InProcessBrokerClient(qpid::framing::ProtocolVersion version)
+        : broker(version)
+    {
+        setConnector(broker);
+        open("");
     }
 
-    /** Called by client to send a frame */
-    void send(framing::AMQFrame* frame) {
-        clientOut.send(frame);
+    ~InProcessBrokerClient() {
+        close();                // close before broker is deleted.
     }
-
-    /** Entire client-broker conversation is recorded here */
-    Conversation conversation;
-
-  private:
-    /** OutputHandler that forwards data to an InputHandler */
-    struct OutputToInputHandler : public sys::ConnectionOutputHandler {
-        OutputToInputHandler(
-            Sender from_, Conversation& conversation_,
-            framing::InputHandler* ih=0
-        ) : from(from_), conversation(conversation_), in(ih) {}
-
-        void send(framing::AMQFrame* frame) {
-            conversation.push_back(Frame(from, copy(*frame)));
-            in->received(frame);
-        }
-
-        void close() {}
-        
-        Sender from;
-        Conversation& conversation;
-        framing::InputHandler* in;
-    };
-
-    framing::ProtocolInitiation protocolInit;
-    Broker::shared_ptr  broker;
-    OutputToInputHandler brokerOut;
-    broker::Connection brokerConnection;
-    OutputToInputHandler clientOut;
 };
 
-std::ostream& operator<<(
-    std::ostream& out, const InProcessBroker::Frame& frame)
-{
-    return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") <<
-        static_cast<const framing::AMQFrame&>(frame);
-}
-std::ostream& operator<<(
-    std::ostream& out, const InProcessBroker::Conversation& conv)
-{
-    for (InProcessBroker::Conversation::const_iterator i = conv.begin();
-         i != conv.end(); ++i)
-    {
-        out << *i << std::endl;
-    }
-    return out;
-}
-
-
-}} // namespace qpid::broker
-
-#endif  /*!_tests_InProcessBroker_h*/
+#endif // _tests_InProcessBroker_h

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am Wed Feb 14 07:02:10 2007
@@ -47,14 +47,15 @@
   HeaderTest
 
 misc_tests =		\
-  ExceptionTest
+  ExceptionTest		\
+	  ProducerConsumerTest
 
 posix_tests =		\
   EventChannelTest	\
   EventChannelThreadsTest
 
 unit_tests =		\
-  $(broker_tests)	\
+b  $(broker_tests)	\
   $(framing_tests)	\
   $(misc_tests) 	\
   $(round_trip_tests)

Added: incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp?view=auto&rev=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp Wed Feb 14 07:02:10 2007
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <vector>
+#include <iostream>
+
+#include <boost/bind.hpp>
+
+#include <qpid_test_plugin.h>
+#include "InProcessBroker.h"
+#include "sys/ProducerConsumer.h"
+#include "sys/Thread.h"
+#include "AMQP_HighestVersion.h"
+#include "sys/AtomicCount.h"
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace boost;
+using namespace std;
+
+/** A counter that notifies a monitor when changed */
+class WatchedCounter : public Monitor {
+  public:
+    WatchedCounter(int i=0) : count(i) {}
+    WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {}
+
+    WatchedCounter& operator=(const WatchedCounter& x) {
+        return *this = int(x);
+    }
+    
+    WatchedCounter& operator=(int i) {
+        Lock l(*this);
+        count = i;
+        return *this;
+    }
+
+    int operator++() {
+        Lock l(*this);
+        notifyAll();
+        return ++count;
+    }
+
+    int operator++(int) {
+        Lock l(*this);
+        notifyAll();
+        return count++;
+    }
+
+    bool operator==(int i) const {
+        Lock l(const_cast<WatchedCounter&>(*this));
+        return i == count;
+    }
+
+    operator int() const {
+        Lock l(const_cast<WatchedCounter&>(*this));
+        return count;
+    }
+
+    bool waitFor(int i, Time timeout=TIME_SEC) {
+        Lock l(*this);
+        Time deadline = timeout+now();
+        while (count != i) {
+            if (!wait(deadline))
+                return false;
+        }
+        assert(count == i);
+        return true;
+    }
+    
+  private:
+    typedef Mutex::ScopedLock Lock;
+    int count;
+};
+    
+class ProducerConsumerTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(ProducerConsumerTest);
+    CPPUNIT_TEST(testProduceConsume);
+    CPPUNIT_TEST(testTimeout);
+    CPPUNIT_TEST(testStop);
+    CPPUNIT_TEST(testCancel);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+    InProcessBrokerClient client;
+    ProducerConsumer pc;
+
+    WatchedCounter stopped;
+    WatchedCounter timeout;
+    WatchedCounter consumed;
+    WatchedCounter produced;
+
+    struct ConsumeRunnable : public Runnable {
+        ProducerConsumerTest& test;
+        ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {}
+        void run() { test.consume(); }
+    };
+
+    struct ConsumeTimeoutRunnable : public Runnable {
+        ProducerConsumerTest& test;
+        Time timeout;
+        ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t)
+            : test(test_), timeout(t) {}
+        void run() { test.consumeTimeout(timeout); }
+    };
+
+
+    void consumeInternal(ProducerConsumer::ConsumerLock& consumer) {
+        if (pc.isStopped()) {
+            ++stopped;
+            return;
+        }
+        if (consumer.isTimedOut()) {
+            ++timeout;
+            return;
+        }
+        CPPUNIT_ASSERT(consumer.isOk());
+        CPPUNIT_ASSERT(pc.available() > 0);
+        consumer.confirm();
+        consumed++;
+    }
+    
+    void consume() {
+        ProducerConsumer::ConsumerLock consumer(pc);
+        consumeInternal(consumer);
+    };
+
+    void consumeTimeout(const Time& timeout) {
+        ProducerConsumer::ConsumerLock consumer(pc, timeout);
+        consumeInternal(consumer);
+    };
+
+    void produce() {
+        ProducerConsumer::ProducerLock producer(pc);
+        CPPUNIT_ASSERT(producer.isOk());
+        producer.confirm();
+        produced++;
+    }
+
+    void join(vector<Thread>& threads) {
+        for_each(threads.begin(), threads.end(), bind(&Thread::join,_1));
+    }
+
+    vector<Thread> startThreads(size_t n, Runnable& runnable) {
+        vector<Thread> threads(n);
+        while (n > 0)
+            threads[--n] = Thread(runnable);
+        return  threads;
+    }
+
+public:
+    ProducerConsumerTest() : client(highestProtocolVersion) {}
+
+    void testProduceConsume() {
+        ConsumeRunnable runMe(*this);
+        produce();
+        produce();
+        CPPUNIT_ASSERT(produced.waitFor(2));
+        vector<Thread> threads = startThreads(1, runMe);
+        CPPUNIT_ASSERT(consumed.waitFor(1));
+        join(threads);
+
+        threads = startThreads(1, runMe);
+        CPPUNIT_ASSERT(consumed.waitFor(2));
+        join(threads);
+        
+        threads = startThreads(3, runMe);
+        produce();
+        produce();
+        CPPUNIT_ASSERT(consumed.waitFor(4));
+        produce();
+        CPPUNIT_ASSERT(consumed.waitFor(5));
+        join(threads);
+        CPPUNIT_ASSERT_EQUAL(0, int(stopped));
+    }
+
+    void testTimeout() {
+        try {
+            // 0 timeout no items available throws exception
+            ProducerConsumer::ConsumerLock consumer(pc, 0);
+            CPPUNIT_FAIL("Expected exception");
+        } catch(...){}
+
+        produce();
+        CPPUNIT_ASSERT(produced.waitFor(1));
+        CPPUNIT_ASSERT_EQUAL(1, int(pc.available()));
+        {
+            // 0 timeout succeeds if there's an item available.
+            ProducerConsumer::ConsumerLock consume(pc, 0);
+            CPPUNIT_ASSERT(consume.isOk());
+            consume.confirm();
+        }
+        CPPUNIT_ASSERT_EQUAL(0, int(pc.available()));
+
+        // Produce an item within the timeout.
+        ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC);
+        vector<Thread> threads = startThreads(1, runMe);
+        produce();
+        CPPUNIT_ASSERT(consumed.waitFor(1));
+        join(threads);
+    }
+
+    
+    void testStop() {
+        ConsumeRunnable runMe(*this);
+        vector<Thread> threads = startThreads(2, runMe);
+        while (pc.consumers() != 2)
+            Thread::yield(); 
+        pc.stop();
+        CPPUNIT_ASSERT(stopped.waitFor(2));
+        join(threads);
+
+        threads = startThreads(1, runMe); // Should stop immediately.
+        CPPUNIT_ASSERT(stopped.waitFor(3));
+        join(threads);
+
+        // Produce/consume while stopped should return isStopped and
+        // throw on confirm.
+        try {
+            ProducerConsumer::ProducerLock p(pc);
+            CPPUNIT_ASSERT(pc.isStopped());
+            CPPUNIT_FAIL("Expected exception");
+        }
+        catch (...) {}          // Expected
+        try {
+            ProducerConsumer::ConsumerLock c(pc);
+            CPPUNIT_ASSERT(pc.isStopped());
+            CPPUNIT_FAIL("Expected exception");
+        }
+        catch (...) {}          // Expected
+    }
+
+    void testCancel() {
+        CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
+        {
+            ProducerConsumer::ProducerLock p(pc);
+            CPPUNIT_ASSERT(p.isOk());
+            p.cancel();
+        }
+        // Nothing was produced.
+        CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
+        {
+            ProducerConsumer::ConsumerLock c(pc, 0);
+            CPPUNIT_ASSERT(c.isTimedOut());
+        }
+        // Now produce but cancel the consume
+        {
+            ProducerConsumer::ProducerLock p(pc);
+            CPPUNIT_ASSERT(p.isOk());
+            p.confirm();
+        }
+        CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
+        {
+            ProducerConsumer::ConsumerLock c(pc);
+            CPPUNIT_ASSERT(c.isOk());
+            c.cancel();
+        }
+        CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
+    }
+};
+
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest);
+

Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date